|
WebSocket是一种网络通信协议,它在单个TCP连接上提供全双工的通信信道。在本篇文章中,我们将探讨如何在Python中使用WebSocket实现实时通信。websockets是Python中最常用的网络库之一,也是websocket协议的Python实现。它不仅作为基础组件在众多项目中发挥着重要作用,其源码也值得广大“Python玩家”研究。官网:https://github.com/python-websockets/websockets1.什么是WebSocket?WebSocket协议是在2008年由Web应用程序设计师和开发人员创建的,目的是为了在Web浏览器和服务器之间提供更高效、更低延迟的双向通信。它允许客户端和服务器在任何时候发送消息,无需重新建立TCP连接。WebSocket可以在Web浏览器和服务器之间传输文本和二进制数据,使得构建实时Web应用程序变得更加简单。2.在Python中使用WebSocketPython中有多个库可以帮助我们使用WebSocket,如:websockets、aiohttp等。在本文中,我们将使用websockets库来演示WebSocket编程。要安装websockets库,你可以使用pip:pipinstallwebsockets13.创建WebSocket服务器使用websockets库,我们可以轻松地创建一个WebSocket服务器。以下是一个简单的示例:importasyncioimportwebsocketsasyncdefecho(websocket,path):asyncformessageinwebsocket:print(f"Receivedmessage:{message}")awaitwebsocket.send(f"Echo:{message}")start_server=websockets.serve(echo,"localhost",8765)asyncio.get_event_loop().run_until_complete(start_server)asyncio.get_event_loop().run_forever()123456789101112在这个示例中,我们定义了一个名为echo的协程函数,它接收两个参数:websocket和path。该函数使用asyncfor循环读取客户端发送的消息,并将消息发送回客户端。然后,我们使用websockets.serve()函数创建一个WebSocket服务器,监听本地主机的8765端口。最后,我们使用asyncio的事件循环启动服务器。4.创建WebSocket客户端要创建一个WebSocket客户端,我们同样可以使用websockets库。以下是一个简单的客户端示例:importasyncioimportwebsocketsasyncdefmain():asyncwithwebsockets.connect("ws://localhost:8765")aswebsocket:message="Hello,server!"awaitwebsocket.send(message)print(f"Sent:{message}")response=awaitwebsocket.recv()print(f"Received:{response}")asyncio.run(main())12345678910111213在这个示例中,我们使用websockets.connect()函数建立与WebSocket服务器的连接。然后,我们使用send()方法向服务器发送消息,并使用recv()方法接收服务器的响应。5.总结WebSocket协议为Web浏览器和服务器之间提供了实时双向通信的能力,使得构建实时Web应用程序变得更加容易。在Python中,我们可以使用websockets库轻松地实现WebSocket编程。6.通过websockets这个项目,从大型开源项目中学习asyncio库。一、asyncio.Transport在官方文档中,Transport被描述成对socket的抽象,它控制着如何传输数据。除了websockets,uvicorn、daphne等ASGI实现都会用到Transport。Transport继承于ReadTransport和WriteTransport,两者都继承于BaseTransport。顾名思义,Transport兼备读和写的功能,可以类比为读写socket对象。Transport对象提供以下常用函数——is_reading:判断该Transport是否在读。set_write_buffer_limits:设置写入Transport的高和低水位。考虑到网络状况,有时不希望写入过多的数据。write、write_eof、write_line:为当前Transport写入数据,分别表示写入二进制数据、eof和二进制行数据。其中eof写入后不会关闭Transport,但会flush数据。abort:立刻关闭Transport,不接受新的数据。留在缓冲的数据也会丢失,后续调用Protocol的connection_lost函数。在websockets中,Transport使用场景不多,一般都是通过Protocol对象的回调参数使用的。在websocket的初始化过程中,会设置Transport的最高水位。同样,在这种场景下,该对象也是作为回调参数使用的。二、asyncio.Protocol如果Transport是对socket的抽象,那么Protocol就是对协议的抽象。它提供了如何使用Transport的方式。用户使用的Protocol直接继承自BaseProtocol,并提供了六个Unimplemented函数需要用户去实现——connection_made:当连接建立时会执行该函数,该函数包含一个Transport类型的参数。connection_lost:当连接丢失或者关闭时会执行该函数,该函数包含一个Exception类型的参数。pause_writing:当Transport对象写入的数据高于之前设置的高水位时被调用,一般会暂停数据的写入。resume_writing:当Transport对象写入的数据低于之前设置的低水位时被调用,一般用于恢复数据写入。data_received:当有数据被接受时回调,该函数包含一个二进制对象data,用来表示接受的数据。eof_received:当被Transport对象被调用write_eof时被调用。在websockets中,server端的connection_made实现截图如图所示。在该函数中,websockets将用户实现的handler封装成task对象,并和websocket的server绑定。而在client端中实现如第一节截图所示,只是在reader中注册该Transport对象。websockets的connection_lost函数实现方式如下。主要操作即更新状态、关闭pings、更新对应的waiter状态,以及维护reader对象。在其他函数的实现中,websockets也主要用到了reader对象完成数据流的暂停和恢复,以及数据的写入。从上面代码实现可以看出,websockets通过reader代理完成数据流的操作。这个reader是一个asyncio.StreamReader对象。这个对象具体如何使用将在下一篇介绍。附录:进阶版本:python使用websockets库serve:在server端使用,等待客户端的连接。如果连接成功,返回一个websocket。connect:在client端使用,用于建立连接。send:发送数据recv:接收数据close:关闭连接服务端#!/usr/bin/python3#主要功能:创建1个基本的websocketserver,符合asyncio开发要求importasyncioimportwebsocketsfromdatetimeimportdatetimeasyncdefhandler(websocket):data=awaitwebsocket.recv()reply=f"Datareceivedas\"{data}\".time:{datetime.now()}"print(reply)awaitwebsocket.send(reply)print("Sendreply")asyncdefmain():asyncwithwebsockets.serve(handler,"localhost",9999):awaitasyncio.Future()#runforeverif__name__=="__main__":asyncio.run(main())123456789101112131415161718192021客户端importasyncioimportwebsocketsimporttimeasyncdefws_client(url):foriinrange(1,40):asyncwithwebsockets.connect(url)aswebsocket:awaitwebsocket.send("Hello,IamPyPy.")response=awaitwebsocket.recv()print(response)time.sleep(1)asyncio.run(ws_client('ws://localhost:9999'))1234567891011121314服务端importasyncioimportwebsocketsIP_ADDR="127.0.0.1"IP_PORT="9090"#握手,通过接收Hi,发送"success"来进行双方的握手。asyncdefserverHands(websocket):whileTrue:recv_text=awaitwebsocket.recv()print("recv_text="+recv_text)ifrecv_text=="Hi":print("connectedsuccess")awaitwebsocket.send("success")returnTrueelse:awaitwebsocket.send("connectedfail")#接收从客户端发来的消息并处理,再返给客户端successasyncdefserverRecv(websocket):whileTrue:recv_text=awaitwebsocket.recv()print("recv:",recv_text)awaitwebsocket.send("success,getmess:"+recv_text)#握手并且接收数据asyncdefserverRun(websocket,path):print(path)awaitserverHands(websocket)awaitserverRecv(websocket)#mainfunctionif__name__=='__main__':print("======server======")server=websockets.serve(serverRun,IP_ADDR,IP_PORT)asyncio.get_event_loop().run_until_complete(server)asyncio.get_event_loop().run_forever()1234567891011121314151617181920212223242526272829303132333435363738394041客户端importasyncioimportwebsocketsIP_ADDR="127.0.0.1"IP_PORT="9090"asyncdefclientHands(websocket):whileTrue:#通过发送hello握手awaitwebsocket.send("Hi")response_str=awaitwebsocket.recv()#接收"success"来进行双方的握手if"success"inresponse_str:print("握手成功")returnTrue#向服务器端发送消息asyncdefclientSend(websocket):whileTrue:input_text=input("inputtext:")ifinput_text=="exit":print(f'"exit",bye!')awaitwebsocket.close(reason="exit")returnFalseawaitwebsocket.send(input_text)recv_text=awaitwebsocket.recv()print(f"{recv_text}")#进行websocket连接asyncdefclientRun():ipaddress=IP_ADDR+":"+IP_PORTasyncwithwebsockets.connect("ws://"+ipaddress)aswebsocket:awaitclientHands(websocket)awaitclientSend(websocket)#mainfunctionif__name__=='__main__':print("======client======")asyncio.get_event_loop().run_until_complete(clientRun())1234567891011121314151617181920212223242526272829303132333435363738394041424344服务端#-*-coding:utf8-*-importjsonimportsocketimportasyncioimportloggingimportwebsocketsimportmultiprocessingIP='127.0.0.1'PORT_CHAT=9090USERS={}#提供聊天的后台asyncdefServerWs(websocket,path):logging.basicConfig(format='%(asctime)s-%(pathname)s[line:%(lineno)d]-%(levelname)s:%(message)s',filename="chat.log",level=logging.INFO)#握手awaitwebsocket.send(json.dumps({"type":"handshake"}))asyncformessageinwebsocket:data=json.loads(message)message=''#用户发信息ifdata["type"]=='send':name='404'fork,vinUSERS.items():ifv==websocket:name=kdata["from"]=nameiflen(USERS)!=0:#asyncio.waitdoesn'tacceptanemptylistmessage=json.dumps({"type":"user","content":data["content"],"from":name})#用户注册elifdata["type"]=='register':try:USERS[data["uuid"]]=websocketiflen(USERS)!=0:#asyncio.waitdoesn'tacceptanemptylistmessage=json.dumps({"type":"login","content":data["content"],"user_list":list(USERS.keys())})exceptExceptionasexp:print(exp)#用户注销elifdata["type"]=='unregister':delUSERS[data["uuid"]]iflen(USERS)!=0:#asyncio.waitdoesn'tacceptanemptylistmessage=json.dumps({"type":"logout","content":data["content"],"user_list":list(USERS.keys())})#打印日志logging.info(data)#群发awaitasyncio.wait([user.send(message)foruserinUSERS.values()])defserver_run():print("server")start_server=websockets.serve(ServerWs,'0.0.0.0',PORT_CHAT)asyncio.get_event_loop().run_until_complete(start_server)asyncio.get_event_loop().run_forever()if__name__=="__main__":frommultiprocessingimportProcessmultiprocessing.freeze_support()server=Process(target=server_run,daemon=False)server.start()12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667服务端importasyncioimportwebsocketsimporttimeimportjsonimportthreading#功能模块classOutputHandler():asyncdefrun(self,message,send_ms,websocket):#用户发信息awaitsend_ms(message,websocket)#单发消息#awaitsend_ms(message,websocket)#群发消息#awaits('hi起来')#存储所有的客户端Clients={}#服务端classWS_Server():def__init__(self):self.ip="127.0.0.1"self.port=9090#回调函数(发消息给客户端)asyncdefcallback_send(self,msg,websocket=None):awaitself.sendMsg(msg,websocket)#发送消息asyncdefsendMsg(self,msg,websocket):print('sendMsg:',msg)#websocket不为空,单发,为空,群发消息ifwebsocket!=None:awaitwebsocket.send(msg)else:#群发消息awaitself.broadcastMsg(msg)#避免被卡线程awaitasyncio.sleep(0.2)#群发消息asyncdefbroadcastMsg(self,msg):foruserinClients:awaituser.send(msg)#针对不同的信息进行请求,可以考虑json文本asyncdefrunCaseX(self,jsonMsg,websocket):print('runCase')op=OutputHandler()#参数:消息、方法、socketawaitop.run(jsonMsg,self.callback_send,websocket)#连接一个客户端,起一个循环监听asyncdefecho(self,websocket,path):#添加到客户端列表#Clients.append(websocket)#握手awaitwebsocket.send(json.dumps({"type":"handshake"}))#循环监听whileTrue:#接受信息try:#接受文本recv_text=awaitwebsocket.recv()message="Getmessage:{}".format(recv_text)#返回客户端信息awaitwebsocket.send(message)#转jsondata=json.loads(recv_text)#用户发信息ifdata["type"]=='send':name='404'fork,vinClients.items():ifv==websocket:name=kdata["from"]=nameiflen(Clients)!=0:#asyncio.waitdoesn'tacceptanemptylistmessage=json.dumps({"type":"send","content":data["content"],"from":name})awaitself.runCaseX(jsonMsg=message,websocket=websocket)#用户注册elifdata["type"]=='register':try:Clients[data["uuid"]]=websocketiflen(Clients)!=0:#asyncio.waitdoesn'tacceptanemptylistmessage=json.dumps({"type":"register","content":data["content"],"user_list":list(Clients.keys())})awaitself.runCaseX(jsonMsg=message,websocket=websocket)exceptExceptionasexp:print(exp)#用户注销elifdata["type"]=='unregister':delClients[data["uuid"]]#对message进行解析,跳进不同功能区#awaitself.runCaseX(jsonMsg=data,websocket=websocket)#链接断开exceptwebsockets.ConnectionClosed:print("ConnectionClosed...",path)#delClientsbreak#无效状态exceptwebsockets.InvalidState:print("InvalidState...")#delClientsbreak#报错exceptExceptionase:print("ws连接报错",e)#delClientsbreak#启动服务器asyncdefrunServer(self):asyncwithwebsockets.serve(self.echo,self.ip,self.port):awaitasyncio.Future()#runforever #多协程模式,防止阻塞主线程无法做其他事情defWebSocketServer(self):asyncio.run(self.runServer())#多线程启动defstartServer(self):#多线程启动,否则会堵塞thread=threading.Thread(target=self.WebSocketServer)thread.start()#thread.join()if__name__=='__main__':print("server")s=WS_Server()s.startServer()123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138
|
|