纵有疾风起
人生不言弃

你真的会websocket吗

Websocket

WebSocket协议是基于TCP的一种新的网络协议。它实现了浏览器与服务器全双工(full-duplex)通信——允许服务器主动发送信息给客户端。
WebSocket通信协议于2011年被IETF定为标准RFC 6455,并被RFC7936所补充规范。
 
WebSocket协议支持(在受控环境中运行不受信任的代码的)客户端与(选择加入该代码的通信的)远程主机之间进行全双工通信。用于此的安全模型是Web浏览器常用的基于原始的安全模式。 协议包括一个开放的握手以及随后的TCP层上的消息帧。 该技术的目标是为基于浏览器的、需要和服务器进行双向通信的(服务器不能依赖于打开多个HTTP连接(例如,使用XMLHttpRequest或<iframe>和长轮询))应用程序提供一种通信机制。
 
这个协议目前仍是草案,只有最新的一些浏览器可以支持它。但是,它的好处是显而易见的,随着支持它的浏览器越来越多,我们将看到它越来越流行。(和以往的Web开发一样,必须谨慎地坚持依赖可用的新功能并能在必要时回滚到旧技术的务实策略。)
 

Django用法

在1.9版本之后,Django实现了对Channels的支持,他所使用的是WebSocket通信,解决了实时通信的问题,而且在使用WebSocket进行通信的同时依旧能够支持HTTP通信。

1.1目录结构

在此结构中必须有硬性要求,具体如下:

新的目录如下:|-- channels_example|    |--channels_example|        |-- __init__.py|        |-- settings.py|        |-- urls.py|        |-- wsgi.py|        |-- routing.py   #必须|        |-- consumer.py  #必须|        |-- asgi.py|    |-- manage.py

1.2配置settings.py文件

1.2.1将其添加到APP列表里

INSTALLED_APPS = [    'django.contrib.admin',    'django.contrib.auth',    'django.contrib.contenttypes',    'django.contrib.sessions',    'django.contrib.messages',    'django.contrib.staticfiles',    'channels',]

1.2.2然后,添加新的参数CHANNEL_LAYERS,如下:

CHANNEL_LAYERS = {    "default": {        "BACKEND": "asgiref.inmemory.ChannelLayer",        "ROUTING": "channels_example.routing.channel_routing",    },}

需要注意的是 ROUTING 参数,他是用来指定WebSocket表单的位置,当有WebSocket请求访问时,就会根据这个路径找到相应表单,调用相应的函数进行处理。
channels_example.routing 就是我们刚才建好的routing,py文件,里面的channel_routing我们下面会进行填充。

1.3填写路由映射地址

from channels.routing import routeimport consumerschannel_routing = [    route('websocket.connect', consumers.ws_connect),            route('websocket.disconnect', consumers.ws_disconnect),            # route('websocket.receive', consumers.ws_message),            route('websocket.receive', consumers.ws_message_uuid),        ]

1.4路由映射到相对应的函数

from django.http import HttpResponsefrom channels.handler import AsgiHandler#message.reply_channel    一个客户端通道的对象#message.reply_channel.send(chunk)  用来唯一返回这个客户端#一个管道大概会持续30sdef ws_connect(message):    auth = True    if not auth:        reply = json.dumps({'error': error})        message.reply_channel.send({'text': reply, 'close': True})    else:        reply = "{}"        message.reply_channel.send({'text': reply})        print(">>> %s connected" % str(message))def ws_disconnect(message):    print("<<< %s disconnected" % str(message))    # with message_queue.mutex:    #     message_queue.queue.clear()    while not message_queue.empty():        try:            message_queue.get(False)        except Empty:            continue        message_queue.task_done()def ws_message_uuid(message):    task = Task.create(message)    if task:        message_queue.put(task)

 tornado用法

1.1Tornado的WebSocket模块

Tornado在websocket模块中提供了一个WebSocketHandler类。这个类提供了和已连接的客户端通信的WebSocket事件和方法的钩子。当一个新的WebSocket连接打开时,open方法被调用,而on_messageon_close方法分别在连接接收到新的消息和客户端关闭时被调用。

此外,WebSocketHandler类还提供了write_message方法用于向客户端发送消息,close方法用于关闭连接。

class EchoHandler(tornado.websocket.WebSocketHandler):    def open(self):        self.write_message('connected!')    def on_message(self, message):        self.write_message(message)

正如你在我们的EchoHandler实现中所看到的,open方法只是使用WebSocketHandler基类提供的write_message方法向客户端发送字符串”connected!”。每次处理程序从客户端接收到一个新的消息时调用on_message方法,我们的实现中将客户端提供的消息原样返回给客户端。这就是全部!让我们通过一个完整的例子看看实现这个协议是如何简单的吧。

WebSocketHandler.open()

当一个WebSocket连接建立后被调用。

WebSocketHandler.on_message(message)

当客户端发送消息message过来时被调用,注意此方法必须被重写。

WebSocketHandler.on_close()

当WebSocket连接关闭后被调用。

WebSocketHandler.write_message(message, binary=False)

向客户端发送消息messagea,message可以是字符串或字典(字典会被转为json字符串)。若binary为False,则message以utf8编码发送;二进制模式(binary=True)时,可发送任何字节码。

WebSocketHandler.close()

关闭WebSocket连接。

WebSocketHandler.check_origin(origin)

判断源origin,对于符合条件(返回判断结果为True)的请求源origin允许其连接,否则返回403。可以重写此方法来解决WebSocket的跨域请求(如始终return True)。

1.2实例–工作websocket实际应用

#coding=utf-8import uuidimport osfrom works.actions import workimport hashlibimport jsonimport Queuefrom threading import Threadimport numpy as npimport cv2import base64import jwtimport tornado.genfrom handlers.base_handler import BaseWebSocketfrom config import MEDIA_ROOTimport timemessage_queue = Queue.PriorityQueue()def work_loop():    while True:        task = message_queue.get()        iuuid = task.uuid        offset_top = task.offset_top        image_data = task.image_data        channel = task.channel        zoom = task.zoom        rType = task.rType        responseType = task.responseType        print(">>> len: %d | current offset: %d" % (message_queue.qsize(), offset_top))        filename = str(uuid.uuid1()) + '.jpg'        filepath = os.path.join(MEDIA_ROOT, filename)        with open(filepath, 'wb') as f:            f.write(image_data.decode("base64"))        if zoom != 1.0:            im = cv2.imread(filepath)            if im is None:                continue            osize = im.shape[1], im.shape[0]            size = int(im.shape[1] * zoom), int(im.shape[0] * zoom)            im = cv2.resize(im, size)            cv2.imwrite(filepath, im)        try:            reply = work(filepath, use_crop=False, result=rType,responseType=responseType)        except Exception as e:            print("!!!!!! %s -> %s caused error" % (iuuid, filename))            print(e)            cmd = u"cp %s %s" % (filepath, os.path.join(MEDIA_ROOT, 'rb_' + filename))            os.system(cmd.encode('utf-8'))            continue        if responseType == 'url':            # rtn_url = 'http://101.236.17.104:3389/upload/' + 'rb_' + filename            rtn_url = 'http://192.168.0.254:8000/upload/' + 'rb_' + filename            reply = {'url': rtn_url, 'uuid': iuuid}        reply['uuid'] = iuuid        channel.write_message({'text': json.dumps(reply)})        print '%s end time:' % channel, time.time()class BrowserWebSocket(BaseWebSocket):    '''浏览器websocket服务器'''    def open(self):        '''新的WebSocket连接打开时被调用'''        # message = {}        # remote_ip = self.request.remote_ip        # message['query_string']=self.get_argument('query_string')        # message['remote_ip']=remote_ip        # auth, error = verify_auth_token(message)        auth = True        error = 'error'        if not auth:            reply = json.dumps({'error': error})            self.write_message({'text': reply, 'close': True})        else:            reply = "{}"            self.write_message({'text': reply})            print(">>> %s connected" % self.request.remote_ip)    def on_message(self, message):        '''连接收到新消息时被调用'''        print '%s start time:'%self,time.time()        task = Task.create(message,self)        if task:            message_queue.put(task)    @tornado.gen.coroutine    def on_messages(self, message):        '''连接收到新消息时被调用'''        task = Task.create(message,self)        if task:            message_queue.put(task)    def on_close(self):        '''客户端关闭时被调用'''        print("<<< %s disconnected" % str(self.request.remote_ip))        # with message_queue.mutex:        #     message_queue.queue.clear()        while not message_queue.empty():            try:                message_queue.get(False)            except Queue.Empty:                continue            message_queue.task_done()    def check_origin(self, origin):        '''允许WebSocket的跨域请求'''        return Trueclass Task(object):    def __init__(self, uuid, offset_top, image_data, channel, zoom, rType, responseType, *args):        self.uuid = uuid        self.offset_top = int(float(offset_top))        self.image_data = image_data        self.channel = channel        self.zoom = zoom        self.rType = rType        self.responseType = responseType    @classmethod    def create(clz, message,sel):        # data = message.get('text')        data = message        try:            params = json.loads(data[:150])            image_data = data[150:]            image_data = image_data.replace(" ", "+")            params['image_data'] = image_data            params['channel'] = sel            # add Type            if params.get('responseType') is None:                params['responseType'] = 'url'            # request type            if params.get('rType') is None:                params['rType'] = 'rl'            task = Task(**params)        except ValueError as e:            task = None            print(">>>message data error!")            print(e)        return task    def __cmp__(self, other):        return cmp(self.offset_top, other.offset_top)def verify_auth_token(message):    '''token 验证'''    token = message.get('query_string')    secret_key = 'aoiakai'    try:        payload = jwt.decode(token, secret_key, algorithms=['HS256'])        if payload.get('ip') != message.get('remote_ip'):            return False, 'ip mismatch'    except jwt.ExpiredSignatureError as e:        print(e)        return False, 'token expired'    except Exception as e:        print(e)        return False, 'enter correct token'    return True, ''work_thread = Thread(target=work_loop)work_thread.daemon = Truework_thread.start()

 

文章转载于:https://www.cnblogs.com/aylin/p/8831135.html

原著是一个有趣的人,若有侵权,请通知删除

未经允许不得转载:起风网 » 你真的会websocket吗
分享到: 生成海报

评论 抢沙发

评论前必须登录!

立即登录