Python WebSocket长连接心跳与短连接

 

5 实例

协议说明容易让人有点迷糊,websocket,engine.io,socket.io,各自协议是如何工作的,看看实例可能会比较清晰,为了方便测试,我写了个Dockerfile,安装了docker的童鞋可以拉取代码执行
bin/start.sh 即可启动拥有完整的
nginx+uwsgi+gevent+flask_socketio测试环境的容器开始测试,浏览器打开http://127.0.0.1即可测试。async_mode用的是gevent_uwsgi,完整代码见
这里。

对于不支持websocket的低版本浏览器,socket.io会退化为长轮询的方式,通过定期的发送GET,
POST请求来拉取数据。没有数据时,会将请求数据的GET请求hang住,直到服务端有数据产生或者客户端的POST请求将GET请求释放,释放之后会紧接着再次发送一个GET请求,除此之外,协议解析和处理流程与websocket方式基本一致。实例只针对使用websocket的进行分析

为了观察socket.io客户端的调用流程,可以设置localStorage.debug = '*';,测试的前段代码片段如下(完整代码见仓库):

 <script type="text/javascript" charset="utf-8">
    var socket = io.connect('/', {
        "reconnectionDelayMax": 10000,
        "reconnectionAttempts": 10
    });
    socket.on('connect', function() {
        $('#log').append('<br>' + $('<div/>').text('connected').html());
    })

    $(document).ready(function() {

        socket.on('server_response', function(msg) {
            $('#log').append('<br>' + $('<div/>').text('Received from server: ' + ': ' + msg.data).html());
        });

        $('form#emit').submit(function(event) {
            socket.emit('client_event', {data: $('#emit_data').val()});
            return false;
        });
    });

 </script>

测试代码比较简单,引入socket.io的js库文件,然后在连接成功后在页面显示“connected”,在输入框输入文字,可以通过连接发送至服务器,然后服务器将浏览器发送的字符串加上server标识回显回来。

python3知识点

jquery.min.js

图片 1

(5)on_error:这个对象在遇到错误时调用,有两个参数,第一个是该类本身,第二个是异常对象。

关闭连接

客户端要主动关闭连接,在JS中调用 socket.close()
即可,此时发送的数据包为
41,其中4代表的是engine.io的消息类型message,而数据1则是指的socket.io的消息类型disconnect,关闭流程见上一章的说明。

HTML代码:python3知识点

微信QQ

#chatcontent{

/*显示内容使用的*/

width:500px;

height:200px;

background-color:pink;

overflow-y:scroll;

overflow-x:scroll;

}

发送

ws=newWebSocket(‘ws://192.168.1.27:8009/chat’)

//服务器给浏览器推送消息的时候回调

ws.onmessage=function(p1) {

$(‘#chatcontent’).append(‘

‘+p1.data+’

‘)

}

functionsend() {

varcontent=$(‘#msg_container’).val()

ws.send(content)

$(‘#msg_container’).val(”)

}

下载地址1:

import websocket
from threading import Thread
import time
import sys


class MyApp(websocket.WebSocketApp):
    def on_message(self, message):
        print(message)

    def on_error(self, error):
        print(error)

    def on_close(self):
        print("### closed ###")

    def on_open(self):
        def run(*args):
            for i in range(3):
                # send the message, then wait
                # so thread doesn't exit and socket
                # isn't closed
                self.send("Hello %d" % i)
                time.sleep(1)

            time.sleep(1)
            self.close()
            print("Thread terminating...")

        Thread(target=run).start()


if __name__ == "__main__":
    websocket.enableTrace(True)
    if len(sys.argv) < 2:
        host = "ws://echo.websocket.org/"
    else:
        host = sys.argv[1]
    ws = MyApp(host)
    ws.run_forever()

在项目中用到socket.io做实时推送,遂花了点时间看了socket.io实现,做个简单分析,如有错漏,欢迎指正。

web服务器代码:

#coding=utf-8

importtornado.websocket

importtornado.web

importtornado.ioloop

importdatetime

classIndexHandler(tornado.web.RequestHandler):

defget(self, *args, **kwargs):

self.render(‘templates/index.html’)

classWebHandler(tornado.websocket.WebSocketHandler):

users =set()#存放在线用户

defopen(self, *args, **kwargs):

self.users.add(self)#把建立连接后的用户添加到用户容器中

foruserinself.users:#向在线的用户发送进入消息

user.write_message(“[%s]-[%s]-进入聊天室”%
(self.request.remote_ip,

datetime.datetime.now().strftime(“%Y-%m-%d %H:%M:%S”)))

defon_close(self):

self.users.remove(self)# 用户关闭连接后从容器中移除用户

foruserinself.users:

user.write_message(“[%s]-[%s]-离开聊天室”%
(self.request.remote_ip,

datetime.datetime.now().strftime(“%Y-%m-%d %H:%M:%S”)))

defon_message(self, message):

foruserinself.users:#向在线用户发送聊天消息

user.write_message(“[%s]-[%s]-说:%s”% (self.request.remote_ip,

datetime.datetime.now().strftime(“%Y-%m-%d %H:%M:%S”), message))

defcheck_origin(self, origin):

return True# 允许WebSocket的跨域请求

importos

BASE_DIR = os.path.dirname(__file__)

settings = {

‘static_path’:os.path.join(BASE_DIR,’static’),

“websocket_ping_interval”:1,

“websocket_ping_timeout”:10

}

app = tornado.web.Application([(r’/’,IndexHandler),

(r’/chat’,WebHandler)],

**settings)

app.listen(8009)

tornado.ioloop.IOLoop.instance().start()


 

关闭连接(只分析websocket)

websocket可能异常关闭的情况很多。比如客户端发了ping后等待pong超时关闭,服务端接收到ping跟上一个ping之间超过了pingTimeout;用的uwsgi的话,uwsgi发送ping,如果在websockets-pong-tolerance(默认3秒)内接收不到pong回应,也会关闭连接;还有如果nginx的proxy_read_timeout配置的比pingInterval小等。

只要不是客户端主动关闭连接,socket.io就会在连接出错后不断重试以建立连接。重试间隔和重试次数由reconnectionDelayMax(默认5秒)reconnectionAttempts(默认一直重连)设定。下面讨论客户端正常关闭的情况,各种异常关闭情况请具体情况具体分析。

客户端主动关闭

假定客户端调用socket.close()主动关闭websocket连接,则会先发送一个消息41(4:engine.io的message,1:socket.io的disconnect)再关闭连接。如前面提到,engine.io套接字接收到消息后会交给socket.io服务器注册的
_handle_eio_message()处理。最终是调用的socket.io的_handle_disconnect(),该函数工作包括调用socketio.on("disconnect")注册的函数,将该客户端从加入的房间中移除,清理环境变量等。

uwsgi而接收到客户端关闭websocket连接消息后会关闭服务端到客户端的连接。engine.io服务器的websocket数据接收例程ws.wait()因为连接关闭报IOError,触发服务端循环收发数据过程停止,并从维护的sockets集合中移除这个关闭的sid。然后调用engine.io套接字的close(wait=True, abort=True)方法,由于是客户端主动关闭,这里就不会再给客户端发送一个CLOSE消息。而
engine.io服务器的close方法一样会触发socket.io之前注册的disconnect事件处理函数,由于前面已经调用_handle_disconnect()处理了关闭连接事件,所以这里_handle_eio_disconnect()不需要再做其他操作(这个操作不是多余的,其作用见后一节)。

浏览器关闭

直接关闭浏览器发送的是websocket的标准CLOSE消息,opcode为8。socket.io服务端处理方式基本一致,由于这种情况下并没有发送socket.io的关闭消息41,socket.io的关闭操作需要等到engine.io触发的_handle_eio_disconnect()中处理,这就是前一节中为什么engine.io服务器后面还要多调用一次
_handle_eio_disconnect()的原因所在。

示例1:

服务端发送消息到客户端

服务端发送消息通过
flask_socketio提供的emit方法实现,如前一节分析的,最终还是通过的engine.io包装成engine.io的消息格式后发出。

42["server_response",{"data":"TEST"}]

参考链接:

(7)on_cont_message:这个对象在接收到连续帧数据时被调用,有三个参数,分别是:类本身,从服务器接受的字符串(utf-8),连续标志。

6 总结

本文示例中,为了便于分析,只用了默认的namespace和room,而在实际项目中可以根据业务需要使用namespace,room等高级特性。

nginx+uwsgi使用socket.io时,当用到websocket时,注意nginx的超时配置proxy_read_timeout和uwsgi的websocket超时配置websocket-ping-freq和websockets-pong-tolerance,配置不当会导致socke.io因为websocket的ping/pong超时而不断重连。

构建生产者对象时,可通过compression_type
参数指定由对应生产者生产的消息数据的压缩方式,或者在producer.properties配置中配置compression.type参数。

 

建立连接

在chrome中打开页面可以看到发了3个请求,分别是:

1 http://127.0.0.1/socket.io/?EIO=3&transport=polling&t=MAkXxBR
2 http://127.0.0.1/socket.io/? EIO=3&transport=polling&t=MAkXxEz&sid=9c54f9c1759c4dbab8f3ce20c1fe43a4
3 ws://127.0.0.1/socket.io/?EIO=3&transport=websocket&sid=9c54f9c1759c4dbab8f3ce20c1fe43a4

请求默认路径是/socket.io,注意命名空间并不会在路径中,而是在参数中传递。第1个请求是polling,EIO是engine.io协议的版本号,t是一个随机字符串,第一个请求时还还没有生成sid。服务端接收到消息后会调用engine.io/server.py_handle_connect()建立连接。

返回的结果是

## Response Headers: Content-Type: application/octet-stream ##
�ÿ0{"pingInterval":25000,"pingTimeout":60000,"upgrades":["websocket"],"sid":"9c54f9c1759c4dbab8f3ce20c1fe43a4"}�ÿ40

可以看到,这里返回的是字节流的payload,content-type为”application/octet-stream”。这个payload其实包含两个packet,第一个packet是engine.io的OPEN消息,类型为0,它的内容为pingInterval,pingTimeout,sid等;第二个packet类型是4(message),而它的数据内容是0,表示socket.io的CONNECT。而其中的看起来乱码的部分实则是前面提到的payload编码中的长度的编码x00x01x00x09xffx00x02xff

  • 第2个请求是轮询请求,如果websocket建立并测试成功(使用内容为probe的ping/pong帧)后,会暂停轮询请求。可以看到轮询请求一直hang住到websocket建立并测试成功后才返回,响应结果是�ÿ6,前面乱码部分是payload长度编码x00x01xff,后面的数字6是engine.io的noop消息。

  • 第3个请求是websocket握手请求,握手成功后,可以在chrome的Frames里面看到websocket的数据帧交互流程,可以看到如前面分析,确实是先发的探测帧,然后是Upgrade帧,接着就是定期的ping/pong帧了。

    2probe
    3probe
    5
    2
    3
    ...
    

1.测试环境

    ws = websocket.WebSocketApp("ws://echo.websocket.org/",
                              on_message = on_message,
                              on_error = on_error,
                              on_close = on_close)
    ws.on_open = on_open
    ws.run_forever()

1 概述

socket.io是一个基于WebSocket的CS的实时通信库,它底层基于engine.io。engine.io使用WebSocket和xhr-polling(或jsonp)封装了一套自己的协议,在不支持WebSocket的低版本浏览器中(支持websocket的浏览器版本见这里)使用了长轮询(long
polling)来代替。socket.io在engine.io的基础上增加了namespace,room,自动重连等特性。

本文接下来会先简单介绍websocket协议,然后在此基础上讲解下engine.io和socket.io协议以及源码分析,后续再通过例子说明socket.io的工作流程。

2.代码实践

 长连接,参数介绍:

服务端消息接收流程

对接收消息的则统一通过engine.io套接字的receive()函数处理:

  • 对于轮询,一旦收到了polling的POST请求,则会调用receive往该socket的消息队列里面发送消息,从而释放之前hang住的GET请求。
  • 对于websocket:
    • 收到了ping,则会马上响应一个pong。
    • 接收到了upgrade消息,则马上发送一个noop消息。
    • 接收到了message,则调用socket.io注册到engine.io的_handle_eio_message方法来处理socket.io自己定义的各种消息。

查找并设置listener,配置监听端口,格式:listeners

listener_name://host_name:port,供kafka客户端连接用的ip和端口,例中配置如下:

listeners=PLAINTEXT://127.0.0.1:9092

API及常用参数说明:

class
kafka.KafkaProducer(**configs)

bootstrap_servers–’host[:port]’字符串,或者由’host[:port]’组成的字符串,形如[‘10.202.24.5:9096’,
‘10.202.24.6:9096’,
‘10.202.24.7:9096’]),其中,host为broker(Broker:缓存代理,Kafka集群中的单台服务器)地址,默认值为
localhost,
port默认值为9092,这里可以不用填写所有broker的host和port,但必须保证至少有一个broker)

key_serializer
–用于转换用户提供的key值为字节,必须返回字节数据。
如果为None,则等同调用f。 默认值: None.

value_serializer –
用于转换用户提供的value消息值为字节,必须返回字节数据。
如果为None,则等同调用f。 默认值: None.

send(topic,
value=None, key=None, headers=None, partition=None,
timestamp_ms=None)

topic –
设置消息将要发布到的主题,即消息所属主题

value –
消息内容,必须为字节数据,或者通过value_serializer序列化后的字节数据。如果为None,则key必填,消息等同于“删除”。(
If value is None, key is required and message acts as a
‘delete’)

partition –
指定分区。如果未设置,则使用配置的partitioner

key –
和消息对应的key,可用于决定消息发送到哪个分区。如果平partition为None,则相同key的消息会被发布到相同分区(但是如果key为None,则随机选取分区)(If
partition is None (and producer’s partitioner config is left as
default), then messages with the same key will be delivered to the same
partition (but if key is None, partition is chosen randomly)).
必须为字节数据或者通过配置的key_serializer序列化后的字节数据.

headers –
设置消息header,header-value键值对表示的list。list项为元组:格式
(str_header,bytes_value)

timestamp_ms –毫秒数
(从1970 1月1日 UTC算起) ,作为消息时间戳。默认为当前时间

函数返回FutureRecordMetadata类型的RecordMetadata数据

flush(timeout=None)

发送所有可以立即获取的缓冲消息(即时linger_ms大于0),线程block直到这些记录发送完成。当一个线程等待flush调用完成而block时,其它线程可以继续发送消息。

注意:flush调用不保证记录发送成功

metrics(raw=False)

获取生产者性能指标。

参考API:

注:生产者代码是线程安全的,支持多线程,而消费者则不然

(11)subprotocols:一组可用的子协议,默认为空。

参考资料

  • https://tools.ietf.org/html/rfc6455
  • https://www.nginx.com/blog/websocket-nginx/
  • https://security.stackexchange.com/questions/36930/how-does-websocket-frame-masking-protect-against-cache-poisoning
  • https://github.com/suexcxine/blog/blob/master/source/_posts/websocket.md
  • https://github.com/abbshr/abbshr.github.io/issues/47
  • https://socket.io/docs/logging-and-debugging/
  • http://uwsgi-docs.readthedocs.io/en/latest/WebSockets.html
  • https://flask-socketio.readthedocs.io/en/latest/

下载地址2:

python
websocket

客户端发送消息给服务端

如果要发送消息给服务器,在浏览器输入框输入test,点击echo按钮,可以看到websocket发送的帧的内容如下,其中4是engine.io的message类型标识,2是socket.io的EVENT类型标识,而后面则是事件名称和数据,数据可以是字符串,字典,列表等类型。

42["client_event",{"data":"test"}]

下载地址:

长连接关键方法:ws.run_forever(ping_interval=60,ping_timeout=5)

服务端接收消息流程

而服务端接收消息并返回一个新的event为”server_response”,数据为”TEST”,代码如下,其中socketio是flask_socketio模块的SocketIO对象,它提供了装饰器方法
on将自定义的client_event和处理函数test_client_event注册到sockerio服务器的handlers中。

当接收到 client_event 消息时,会通过sockerio/server.py中的
_handle_eio_message()方法处理消息,对于socket.io的EVENT类型的消息最终会通过_trigger_event()方法处理,该方法也就是从handlers中拿到client_event对应的处理函数并调用之。

from flask_socketio import SocketIO, emit
socketio = SocketIO(...)

@socketio.on("client_event")
def test_client_event(msg):
    emit("server_response", {"data": msg["data"].upper()})

生产者

#-
encoding:utf-8 -*-*

__author__ =
‘shouke’

from kafka
import KafkaProducer

import
json

producer =
KafkaProducer(bootstrap_servers=[‘127.0.0.1:9092’])

for i in
range:

producer.send(‘MY_TOPIC1’,
value=b’lai zi shouke de msg’, key=None, headers=None,
partition=None, timestamp_ms=None)

#
Block直到单条消息发送完或者超时

future =
producer.send(‘MY_TOPIC1’, value=b’another
msg’
,key=b’othermsg’)

result =
future.get(timeout=60)

print

#
Block直到所有阻塞的消息发送到网络

# 注意:
该操作不保证传输或者消息发送成功,仅在配置了linger_ms的情况下有用。(It
is really only useful if you configure internal batching using
linger_ms

#
序列化json数据

producer =
KafkaProducer(bootstrap_servers=‘127.0.0.1:9092’,
value_serializer=lambda v: json.dumps.encode(‘utf-8’))

producer.send(‘MY_TOPIC1’,
{‘shouke’:‘kafka’})

#
序列化字符串key

producer =
KafkaProducer(bootstrap_servers=‘127.0.0.1:9092’,
key_serializer=str.encode)

producer.send(‘MY_TOPIC1’,
b’shouke’, key=‘strKey’)

producer =
KafkaProducer(bootstrap_servers=‘127.0.0.1:9092’,compression_type=‘gzip’)

for i in
range:

producer.send(‘MY_TOPIC1’,
(‘msg %d’ % i).encode(‘utf-8’))

#
消息记录携带header

producer.send(‘MY_TOPIC1’,
value=b’c29tZSB2YWx1ZQ==’, headers=[(‘content-encoding’,
b’base64′),])

#
获取性能数据(注意,实践发现分区较多的情况下,该操作比较耗时

metrics =
producer.metrics()

print

producer.flush()

实践中遇到错误:
kafka.errors.NoBrokersAvailable:
NoBrokersAvailable,解决方案如下:

进入到配置目录,编辑server.properties文件,

——

2 WebSocket协议

我们知道,在HTTP 协议开发的时候,并不是为了双向通信程序准备的,起初的
web 应用程序只需要 “请求-响应”
就够了。由于历史原因,在创建拥有双向通信机制的 web
应用程序时,就只能利用 HTTP 轮询的方式,由此产生了 “短轮询” 和
“长轮询”(注意区分短连接和长连接)。

短轮询通过客户端定期轮询来询问服务端是否有新的信息产生,缺点也是显而易见,轮询间隔大了则信息不够实时,轮询间隔过小又会消耗过多的流量,增加服务器的负担。长轮询是对短轮询的优化,需要服务端做相应的修改来支持。客户端向服务端发送请求时,如果此时服务端没有新的信息产生,并不立刻返回,而是Hang住一段时间等有新的信息或者超时再返回,客户端收到服务器的应答后继续轮询。可以看到长轮询比短轮询可以减少大量无用的请求,并且客户端接收取新消息也会实时不少。

虽然长轮询比短轮询优化了不少,但是每次请求还是都要带上HTTP请求头部,而且在长轮询的连接结束之后,服务器端积累的新消息要等到下次客户端连接时才能传递。更好的方式是只用一个TCP连接来实现客户端和服务端的双向通信,WebSocket协议正是为此而生。WebSocket是基于TCP的一个独立的协议,它与HTTP协议的唯一关系就是它的握手请求可以作为一个Upgrade request经由HTTP服务器解析,且与HTTP使用一样的端口。WebSocket默认对普通请求使用80端口,协议为ws://,对TLS加密请求使用443端口,协议为wss://

握手是通过一个HTTP Upgrade request开始的,一个请求和响应头部示例如下(去掉了无关的头部)。WebSocket握手请求头部与HTTP请求头部是兼容的(见RFC2616)。

## Request Headers ##
Connection: Upgrade
Host: socket.io.demo.com
Origin: http://socket.io.demo.com
Sec-WebSocket-Extensions: permessage-deflate; client_max_window_bits
Sec-WebSocket-Key: mupA9l2rXciZKoMNQ9LphA==
Sec-WebSocket-Version: 13
Upgrade: websocket

## Response Headers ##
101 Web Socket Protocol Handshake
Connection: upgrade
Sec-WebSocket-Accept: s4VAqh7eedG0a11ziQlwTzJUY3s=
Sec-WebSocket-Origin: http://socket.io.demo.com
Server: nginx/1.6.2
Upgrade: WebSocket
  • Upgrade
    是HTTP/1.1中规定的用于转换当前连接的应用层协议的头部,表示客户端希望用现有的连接转换到新的应用层协议WebSocket协议。

  • Origin
    用于防止跨站攻击,浏览器一般会使用这个来标识原始域,对于非浏览器的客户端应用可以根据需要使用。

  • 请求头中的 Sec-WebSocket-Version
    是WebSocket版本号,Sec-WebSocket-Key
    是用于握手的密钥。Sec-WebSocket-Extensions 和 Sec-WebSocket-Protocol
    是可选项,暂不讨论。

  • 响应头中的 Sec-WebSocket-Accept 是将请求头中的 Sec-WebSocket-Key
    的值加上一个固定魔数258EAFA5-E914-47DA-95CA-C5AB0DC85B11经SHA1+base64编码后得到。计算过程的python代码示例(uwsgi中的实现见
    core/websockets.c的 uwsgi_websocket_handshake函数):

    magic_number = '258EAFA5-E914-47DA-95CA-C5AB0DC85B11'
    key = 'mupA9l2rXciZKoMNQ9LphA=='
    accept = base64.b64encode(hashlib.sha1(key + magic_number).digest())
    assert(accept == 's4VAqh7eedG0a11ziQlwTzJUY3s=')
    
  • 客户端会检查响应头中的status code 和 Sec-WebSocket-Accept
    值是否是期待的值,如果发现Accept的值不正确或者状态码不是101,则不会建立WebSocket连接,也不会发送WebSocket数据帧。

WebSocket协议使用帧(Frame)收发数据,帧格式如下。基于安全考量,客户端发送给服务端的帧必须通过4字节的掩码(Masking-key)加密,服务端收到消息后,用掩码对数据帧的Payload
Data进行异或运算解码得到数据(详见uwsgi的 core/websockets.c
中的uwsgi_websockets_parse函数),如果服务端收到未经掩码加密的数据帧,则应该马上关闭该WebSocket。而服务端发给客户端的数据则不需要掩码加密,客户端如果收到了服务端的掩码加密的数据,则也必须关闭它。

 0                   1                   2                   3
      0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
     +-+-+-+-+-------+-+-------------+-------------------------------+
     |F|R|R|R| opcode|M| Payload len |    Extended payload length    |
     |I|S|S|S|  (4)  |A|     (7)     |             (16/64)           |
     |N|V|V|V|       |S|             |   (if payload len==126/127)   |
     | |1|2|3|       |K|             |                               |
     +-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +
     |     Extended payload length continued, if payload len == 127  |
     + - - - - - - - - - - - - - - - +-------------------------------+
     |                               |Masking-key, if MASK set to 1  |
     +-------------------------------+-------------------------------+
     | Masking-key (continued)       |          Payload Data         |
     +-------------------------------- - - - - - - - - - - - - - - - +
     :                     Payload Data continued ...                :
     +---------------------------------------------------------------+

帧分为控制帧和数据帧,控制帧不能分片,数据帧可以分片。主要字段说明如下:

  • FIN:
    没有分片的帧的FIN为1,分片帧的第一个分片的FIN为0,最后一个分片FIN为1。
  • opcode: 帧类型编号,其中控制帧:0x8 (Close), 0x9 (Ping), and 0xA
    (Pong),数据帧主要有:0x1 (Text), 0x2 (Binary)。
  • MASK:客户端发给服务端的帧MASK为1,Masking-key为加密掩码。服务端发往客户端的MASK为0,Masking-key为空。
  • Payload len和Payload Data分别是帧的数据长度和数据内容。

By: 授客
QQ:1033553122

发表评论

电子邮件地址不会被公开。 必填项已用*标注