
TCP聊天服务器套接字v1.2
所有版本记录:
v1.0: TCP聊天服务器套接字|PyQt5+socket(TCP端口映射+端口放行)+logging+Thread(含日志,html)+anaconda打包32位exe(3.4万字)|python高阶v1.1: python TCP套接字服务器v1.1-新增服务端命令功能及修改bug(socket+PyQt5)
(v1.2只改了服务端文件夹)
文件目录:

警告! : clients子目录自动生成, 无需新建.你新建 clients了也行, 但别建无内容的 data.json
log.txt日志文件自动生成
from json import load, dump
from os import path, mkdir
from hashlib import md5
from time import time#from hashlib import md5(上文)
def encode(data:str):
m = md5()
m.update(data.encode('utf8'))
return m.hexdigest()file = '.\clients\data.json'
folder = '.\clients'
if not path.exists(folder):
mkdir(folder)
class user():
def __init__(self):
if path.exists(file):
with open(file, 'r') as f:
self.data = load(f)
else:
self.data = {}
def __get__(self, username, default=None) -> tuple:
return self.data.get(username, default)
def __in__(self, username) -> bool:
return username in self.data.keys()
def __write__(self) -> None:
with open(file, 'w') as f:
dump(self.data, f)
def __register__(self, username, password, time:(int, float)) -> None:
self.data[username] = (encode(password), int(time))
self.__write__()
def __login__(self, username, password) -> bool:
return self.data[username][0] == encode(username)
def handler(self, username, password) -> bool:
if self.__in__(username):
return self.__login__(username, password)
else:
self.__register__(username, password, time())
return True
def get_time(self, username):
return self.data[username][1]import dataclass Client():
...
def login(self):
self._send(f'<font color="red">欢迎来到服务器[{self.server.address[0]}].您的ip地址为{self.socket.getpeername()[0]}')
self.username = self.recv()[:8]
if self.server.user_record.__in__(self.username):
self._send("<font color='red'>请输入您的密码: (右下[send]键发送)</font>")
i = self.recv()
if self.server.user_record.handler(self.username, i):
self._send(f'<font color="green">欢迎回来, {self.username}.</font>')
else:
self._send('<font color="red">密码错误,请重试.</font>')
self.__del__()
else:
def normal(string):
return (4 <= len(string) <= 10) and not ('\n' in string)
while True:
self._send("<font color='blue'>[i]提示: 密码需在4 ~ 10位之间, 且不能换行.</font>\n<font color='red'>请输入您的密码: (右下[send]键发送)</font>")
p1 = self.recv()
if normal(p1):
break
while True:
self._send("<font color='red'>再次输入您的密码: (右下[send]键发送)</font>")
p2 = self.recv()
if p1 == p2:
break
else:
self._send("<font color='red'>密码与前次不符!</font>")
self.server.user_record.handler(self.username, p1)
self._send(f'初来乍到, {self.username}')
self._login = True
self.server.login(self.username,self.addr)将用户处理的线程改为Client类,有助于管理, 处理报错, 并且Server类可以减少不确定的错误
class Client(object):
class QuitError(Exception):
def __init__(self, *args):
super().__init__(*args)
def __init__(self, socket, addr, server:Server):
self.socket = socket
self.addr = addr
if not isinstance(server, Server):
raise ValueError
self.server = server
self.encode = self.server.encode
self.com = Command_Handler(self)
@self.error
def _recv(self) -> bytes:
return self.socket.recv(bytecount **2).decode(encoding=self.encode).strip()
self._recv = lambda : _recv(self)
@self.error
def _send(self, message=str()) -> None:
self.socket.sendall(message.encode(self.encode))
self._send = lambda m: _send(self, m)
def __del__(self):
self.socket.close()
def isLogin(self) -> bool:
return hasattr(self,"_login") and self._login
def isOpen(self) -> bool:
return not getattr(self.socket, "_closed",True)
def __filter__(self) -> bool:
"""返回是否在线并已可接受消息"""
return self.isLogin() and self.isOpen()
def recv(self) -> str:
data = self._recv()
while not data:
data = self._recv()
#while not (data := self._recv()):
# pass
#我的PythonIDE是3.8, PyCharm是3.7(anaconda 32x),而赋值表达式是3.8加进来的.
return data
@ignore
def login(self):
self._send(f'<font color="red">欢迎来到服务器[{self.server.address[0]}].您的ip地址为{self.socket.getpeername()[0]}')
self.username = self.recv()[:8]
if self.server.user_record.__in__(self.username):
self._send("<font color='red'>请输入您的密码: (右下[send]键发送)</font>")
i = self.recv()
if self.server.user_record.handler(self.username, i):
self._send(f'<font color="green">欢迎回来, {self.username}.</font>')
else:
self._send('<font color="red">密码错误,请重试.</font>')
self.__del__()
else:
def normal(string):
return (4 <= len(string) <= 10) and not ('\n' in string)
while True:
self._send("<font color='blue'>[i]提示: 密码需在4 ~ 10位之间, 且不能换行.</font>\n<font color='red'>请输入您的密码: (右下[send]键发送)</font>")
p1 = self.recv()
if normal(p1):
break
while True:
self._send("<font color='red'>再次输入您的密码: (右下[send]键发送)</font>")
p2 = self.recv()
if p1 == p2:
break
else:
self._send("<font color='red'>密码与前次不符!</font>")
self.server.user_record.handler(self.username, p1)
self._send(f'初来乍到, {self.username}')
self._login = True
self.server.login(self.username,self.addr)
def quit(self) -> None:
if self.isOpen() is True:
self.socket.close()
self.server.quit(self.username,self.addr)
@ignore
def forever_receive(self):
self.login()
while self.__filter__():
string = self.recv()
if string == Client.QuitError:
return
if self.com.iscommand(string):
self._send(self.com.handler(string))
else:
self.server.UserMessage(self.addr, self.username, string)
def error(self, func):
def function(*args,**kwargs):
try:
res = func(*args,**kwargs)
return res
except ConnectionAbortedError as e:
self.quit()
except Exception:
logger.exception("error")
return Client.QuitError
return function
def run(self):
self.thread = threading(True, target=self.forever_receive)那么Server类改为:
class Server(object):
join_message = "<font color='red'>Server></font> <font color='blue'>%s(%s)</font> 连接服务器. 当前在线人数: <font color='red'>%s</font>"
user_message = "<font color='%s'>%s(%s)%s></font> %s"
quit_message = "%s(%s) 下线了, %s"
def __init__(self, addr, port, backlog = 10, encode = 'utf8'):
self.address = addr, port
self.backlog = backlog
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.bind(self.address)
self.socket.listen(backlog)
self.connect = []
self.encode = encode
self.user_record = data.user()
def run(self):
logger.info("Server [%s] on, port [%s].Format is %s" % (*self.address,
':'.join(map(lambda i: str(i), self.address))))
logger.info("Backlog number: " + str(self.backlog))
logger.info('The CODEC is sent as '+self.encode)
self.accept_client()
def _get_Clients(self) -> list:
def func(c):
return c.__filter__()
return list(filter(func, self.connect))
def _get_sockets(self): #return int
return len(self._get_Clients())
def _str_sockets(self):
return f"当前人数 {self._get_sockets()}"
def ServerMessage(self, mes, inc=True):
for user in self._get_Clients():
if user.__filter__():
user._send(mes)
def UserMessage(self, address, _user, mes,inc=True):
if not mes:
return
for user in self.connect:
if user.__filter__():
username = user.username
send_message = Server.user_message % ("brown" if _user == username else "red",
_user,
address,
"(我自己)" if _user == username else "",
mes)
user._send(send_message)
logger.info(f"{address}[{_user}] : {mes}")
def error_handle(self):
for user in filter(lambda c: not c.isOpen(), self.connect ):
print(user)
self.connect.remove(user)
def accept_client(self):
while True:
logger.info("The server is listening on the port.")
client, address = self.socket.accept() # 阻塞,等待客户端连接
NewClient = Client(client, address[0], self)
self.connect.append(NewClient)
NewClient.run()
logger.info(f'The address {address[0]} is connected to the server')
def quit(self,username,address):
QuitMessage = Server.quit_message % (username, address, self._str_sockets())
logger.info(QuitMessage)
self.ServerMessage(QuitMessage, False)
def login(self,username,address):
logger.info(f"{address}[{username}] 登录服务器 , " + self._str_sockets())
self.ServerMessage(Server.join_message % (username, address, self._get_sockets()) )server.py
import socket # 导入 socket 模块
from threading import Thread
import logging
#from color import Text, Background, Print
#from cProfile import run
import data
__version__ = 1.2
def threading(Daemon, **kwargs):
thread = Thread(**kwargs)
thread.setDaemon(Daemon)
thread.start()
return thread
def ignore(function):
def i(*args, **kwargs):
try:
function(*args, **kwargs)
except:
return
return i
logger = logging.getLogger(__name__)
logger.setLevel(level = logging.INFO)
handler = logging.FileHandler("log.txt")
handler.setLevel(logging.INFO)
handler.setFormatter(logging.Formatter("[%(asctime)s(%(levelname)s)]: %(message)s"))
logger.addHandler(handler)
console = logging.StreamHandler()
console.setLevel(logging.INFO)
console.setFormatter(logging.Formatter("[%(asctime)s(%(levelname)s)]: %(message)s"))
logger.addHandler(handler)
logger.addHandler(console)
bytecount = 1024
class Command_Handler(object):
def __init__(self, bind):
"""Bind Client class"""
assert isinstance(bind, Client)
self.client = bind
def _function(self, _list):
data = {"/info" : {"-v": self.get_version(),
"-id" : self.get_id(),
"-i" : self.info(),
"-h" : self.help(),
"-name" : self.name()},
}
_dict = data
for n in range(len(_list)):
if type(_dict) == dict:
_dict = _dict.get(_list[n], self.unknown(" ".join(_list)))
else:
break
if type(_dict) == dict:
_dict = "Error:\n<font color='blue'>This command must take more arguments. Such as %s.</font>" % list(_dict.keys())
return _dict
@staticmethod
def help():
return """/info [-v] [-id] [-i]
-v : get version of program.
-id : get your id.
-i : get information.
-h : help.
-name : get your name
For example, <font color=red>/info -id</font>"""
@staticmethod
def get_version():
return "version : " + str(__version__)
def get_id(self):
return "Your id is {}.".format(id(self.client))
def name(self):
return "Your name is {}.".format(self.client.username)
def info(self):
return f"Socket Server[version {self.get_version()}] By zmh."
def unknown(self,s):
return """Error:
No command named "%s". Please search [/info -h] to help.
%s""" % (s, self.help())
def cut(self, string):
return string.strip().split()
def handler(self, c):
return "<font color='gray'>[command]</font><font color='brown'>%s</font>\n%s" % (c,str(self._function(self.cut(c))))
def iscommand(self,i):
return i.strip().startswith("/")
class Server(object):
join_message = "<font color='red'>Server></font> <font color='blue'>%s(%s)</font> 连接服务器. 当前在线人数: <font color='red'>%s</font>"
user_message = "<font color='%s'>%s(%s)%s></font> %s"
quit_message = "%s(%s) 下线了, %s"
def __init__(self, addr, port, backlog = 10, encode = 'utf8'):
self.address = addr, port
self.backlog = backlog
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.bind(self.address)
self.socket.listen(backlog)
self.connect = []
self.encode = encode
self.user_record = data.user()
def run(self):
logger.info("Server [%s] on, port [%s].Format is %s" % (*self.address,
':'.join(map(lambda i: str(i), self.address))))
logger.info("Backlog number: " + str(self.backlog))
logger.info('The CODEC is sent as '+self.encode)
self.accept_client()
def _get_Clients(self) -> list:
def func(c):
return c.__filter__()
return list(filter(func, self.connect))
def _get_sockets(self): #return int
return len(self._get_Clients())
def _str_sockets(self):
return f"当前人数 {self._get_sockets()}"
def ServerMessage(self, mes, inc=True):
for user in self._get_Clients():
if user.__filter__():
user._send(mes)
def UserMessage(self, address, _user, mes,inc=True):
if not mes:
return
for user in self.connect:
if user.__filter__():
username = user.username
send_message = Server.user_message % ("brown" if _user == username else "red",
_user,
address,
"(我自己)" if _user == username else "",
mes)
user._send(send_message)
logger.info(f"{address}[{_user}] : {mes}")
def error_handle(self):
for user in filter(lambda c: not c.isOpen(), self.connect ):
print(user)
self.connect.remove(user)
def accept_client(self):
while True:
logger.info("The server is listening on the port.")
client, address = self.socket.accept() # 阻塞,等待客户端连接
NewClient = Client(client, address[0], self)
self.connect.append(NewClient)
NewClient.run()
logger.info(f'The address {address[0]} is connected to the server')
def quit(self,username,address):
QuitMessage = Server.quit_message % (username, address, self._str_sockets())
logger.info(QuitMessage)
self.ServerMessage(QuitMessage, False)
def login(self,username,address):
logger.info(f"{address}[{username}] 登录服务器 , " + self._str_sockets())
self.ServerMessage(Server.join_message % (username, address, self._get_sockets()) )
class Client(object):
class QuitError(Exception):
def __init__(self, *args):
super().__init__(*args)
def __init__(self, socket, addr, server:Server):
self.socket = socket
self.addr = addr
if not isinstance(server, Server):
raise ValueError
self.server = server
self.encode = self.server.encode
self.com = Command_Handler(self)
@self.error
def _recv(self) -> bytes:
return self.socket.recv(bytecount **2).decode(encoding=self.encode).strip()
self._recv = lambda : _recv(self)
@self.error
def _send(self, message=str()) -> None:
self.socket.sendall(message.encode(self.encode))
self._send = lambda m: _send(self, m)
def __del__(self):
self.socket.close()
def isLogin(self) -> bool:
return hasattr(self,"_login") and self._login
def isOpen(self) -> bool:
return not getattr(self.socket, "_closed",True)
def __filter__(self) -> bool:
"""返回是否在线并已可接受消息"""
return self.isLogin() and self.isOpen()
def recv(self) -> str:
data = self._recv()
while not data:
data = self._recv()
#while not (data := self._recv()):
# pass
#我的PythonIDE是3.8, PyCharm是3.7(anaconda 32x),而赋值表达式是3.8加进来的.
return data
@ignore
def login(self):
self._send(f'<font color="red">欢迎来到服务器[{self.server.address[0]}].您的ip地址为{self.socket.getpeername()[0]}')
self.username = self.recv()[:8]
if self.server.user_record.__in__(self.username):
self._send("<font color='red'>请输入您的密码: (右下[send]键发送)</font>")
i = self.recv()
if self.server.user_record.handler(self.username, i):
self._send(f'<font color="green">欢迎回来, {self.username}.</font>')
else:
self._send('<font color="red">密码错误,请重试.</font>')
self.__del__()
else:
def normal(string):
return (4 <= len(string) <= 10) and not ('\n' in string)
while True:
self._send("<font color='blue'>[i]提示: 密码需在4 ~ 10位之间, 且不能换行.</font>\n<font color='red'>请输入您的密码: (右下[send]键发送)</font>")
p1 = self.recv()
if normal(p1):
break
while True:
self._send("<font color='red'>再次输入您的密码: (右下[send]键发送)</font>")
p2 = self.recv()
if p1 == p2:
break
else:
self._send("<font color='red'>密码与前次不符!</font>")
self.server.user_record.handler(self.username, p1)
self._send(f'初来乍到, {self.username}')
self._login = True
self.server.login(self.username,self.addr)
def quit(self) -> None:
if self.isOpen() is True:
self.socket.close()
self.server.quit(self.username,self.addr)
@ignore
def forever_receive(self):
self.login()
while self.__filter__():
string = self.recv()
if string == Client.QuitError:
return
if self.com.iscommand(string):
self._send(self.com.handler(string))
else:
self.server.UserMessage(self.addr, self.username, string)
def error(self, func):
def function(*args,**kwargs):
try:
res = func(*args,**kwargs)
return res
except ConnectionAbortedError as e:
self.quit()
except Exception:
logger.exception("error")
return Client.QuitError
return function
def run(self):
self.thread = threading(True, target=self.forever_receive)
def get_host_ip() -> str:
"""get current IP address"""
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(('8.8.8.8', 80))
ip = s.getsockname()[0]
finally:
s.close()
return ip
server = Server("192.168.1.8",429)
server.run()data.py
from json import load, dump
from os import path, mkdir
from hashlib import md5
from time import time
def encode(data:str):
m = md5()
m.update(data.encode('utf8'))
return m.hexdigest()
file = '.\clients\data.json'
folder = '.\clients'
if not path.exists(folder):
mkdir(folder)
class user():
def __init__(self):
if path.exists(file):
with open(file, 'r') as f:
self.data = load(f)
else:
self.data = {}
def __get__(self, username, default=None) -> tuple:
return self.data.get(username, default)
def __in__(self, username) -> bool:
return username in self.data.keys()
def __write__(self) -> None:
with open(file, 'w') as f:
dump(self.data, f)
def __register__(self, username, password, time:(int, float)) -> None:
self.data[username] = (encode(password), int(time))
self.__write__()
def __login__(self, username, password) -> bool:
return self.data[username][0] == encode(username)
def handler(self, username, password) -> bool:
if self.__in__(username):
return self.__login__(username, password)
else:
self.__register__(username, password, time())
return True
def get_time(self, username):
return self.data[username][1]