Websocket
Django用法
在1.9版本之后,Django实现了对Channels的支持,他所使用的是WebSocket通信,解决了实时通信的问题,而且在使用WebSocket进行通信的同时依旧能够支持HTTP通信。
1.1目录结构
在此结构中必须有硬性要求,具体如下:
新的目录如下:|-- channels_example| |--channels_example| |-- __init__.py| |-- settings.py| |-- urls.py| |-- wsgi.py| |-- routing.py #必须| |-- consumer.py #必须| |-- asgi.py| |-- manage.py
1.2配置settings.py文件
1.2.1将其添加到APP列表里
INSTALLED_APPS = [ 'django.contrib.admin', 'django.contrib.auth', 'django.contrib.contenttypes', 'django.contrib.sessions', 'django.contrib.messages', 'django.contrib.staticfiles', 'channels',]
1.2.2然后,添加新的参数CHANNEL_LAYERS,如下:
CHANNEL_LAYERS = { "default": { "BACKEND": "asgiref.inmemory.ChannelLayer", "ROUTING": "channels_example.routing.channel_routing", },}
需要注意的是 ROUTING 参数,他是用来指定WebSocket表单的位置,当有WebSocket请求访问时,就会根据这个路径找到相应表单,调用相应的函数进行处理。
channels_example.routing 就是我们刚才建好的routing,py文件,里面的channel_routing我们下面会进行填充。
1.3填写路由映射地址
from channels.routing import routeimport consumerschannel_routing = [ route('websocket.connect', consumers.ws_connect), route('websocket.disconnect', consumers.ws_disconnect), # route('websocket.receive', consumers.ws_message), route('websocket.receive', consumers.ws_message_uuid), ]
1.4路由映射到相对应的函数
from django.http import HttpResponsefrom channels.handler import AsgiHandler#message.reply_channel 一个客户端通道的对象#message.reply_channel.send(chunk) 用来唯一返回这个客户端#一个管道大概会持续30sdef ws_connect(message): auth = True if not auth: reply = json.dumps({'error': error}) message.reply_channel.send({'text': reply, 'close': True}) else: reply = "{}" message.reply_channel.send({'text': reply}) print(">>> %s connected" % str(message))def ws_disconnect(message): print("<<< %s disconnected" % str(message)) # with message_queue.mutex: # message_queue.queue.clear() while not message_queue.empty(): try: message_queue.get(False) except Empty: continue message_queue.task_done()def ws_message_uuid(message): task = Task.create(message) if task: message_queue.put(task)
tornado用法
1.1Tornado的WebSocket模块
Tornado在websocket模块中提供了一个WebSocketHandler类。这个类提供了和已连接的客户端通信的WebSocket事件和方法的钩子。当一个新的WebSocket连接打开时,open方法被调用,而on_message和on_close方法分别在连接接收到新的消息和客户端关闭时被调用。
此外,WebSocketHandler类还提供了write_message方法用于向客户端发送消息,close方法用于关闭连接。
class EchoHandler(tornado.websocket.WebSocketHandler): def open(self): self.write_message('connected!') def on_message(self, message): self.write_message(message)
正如你在我们的EchoHandler实现中所看到的,open方法只是使用WebSocketHandler基类提供的write_message方法向客户端发送字符串"connected!"。每次处理程序从客户端接收到一个新的消息时调用on_message方法,我们的实现中将客户端提供的消息原样返回给客户端。这就是全部!让我们通过一个完整的例子看看实现这个协议是如何简单的吧。
WebSocketHandler.open()
当一个WebSocket连接建立后被调用。
WebSocketHandler.on_message(message)
当客户端发送消息message过来时被调用,注意此方法必须被重写。
WebSocketHandler.on_close()
当WebSocket连接关闭后被调用。
WebSocketHandler.write_message(message, binary=False)
向客户端发送消息messagea,message可以是字符串或字典(字典会被转为json字符串)。若binary为False,则message以utf8编码发送;二进制模式(binary=True)时,可发送任何字节码。
WebSocketHandler.close()
关闭WebSocket连接。
WebSocketHandler.check_origin(origin)
判断源origin,对于符合条件(返回判断结果为True)的请求源origin允许其连接,否则返回403。可以重写此方法来解决WebSocket的跨域请求(如始终return True)。
1.2实例--工作websocket实际应用
#coding=utf-8import uuidimport osfrom works.actions import workimport hashlibimport jsonimport Queuefrom threading import Threadimport numpy as npimport cv2import base64import jwtimport tornado.genfrom handlers.base_handler import BaseWebSocketfrom config import MEDIA_ROOTimport timemessage_queue = Queue.PriorityQueue()def work_loop(): while True: task = message_queue.get() iuuid = task.uuid offset_top = task.offset_top image_data = task.image_data channel = task.channel zoom = task.zoom rType = task.rType responseType = task.responseType print(">>> len: %d | current offset: %d" % (message_queue.qsize(), offset_top)) filename = str(uuid.uuid1()) + '.jpg' filepath = os.path.join(MEDIA_ROOT, filename) with open(filepath, 'wb') as f: f.write(image_data.decode("base64")) if zoom != 1.0: im = cv2.imread(filepath) if im is None: continue osize = im.shape[1], im.shape[0] size = int(im.shape[1] * zoom), int(im.shape[0] * zoom) im = cv2.resize(im, size) cv2.imwrite(filepath, im) try: reply = work(filepath, use_crop=False, result=rType,responseType=responseType) except Exception as e: print("!!!!!! %s -> %s caused error" % (iuuid, filename)) print(e) cmd = u"cp %s %s" % (filepath, os.path.join(MEDIA_ROOT, 'rb_' + filename)) os.system(cmd.encode('utf-8')) continue if responseType == 'url': # rtn_url = 'http://101.236.17.104:3389/upload/' + 'rb_' + filename rtn_url = 'http://192.168.0.254:8000/upload/' + 'rb_' + filename reply = {'url': rtn_url, 'uuid': iuuid} reply['uuid'] = iuuid channel.write_message({'text': json.dumps(reply)}) print '%s end time:' % channel, time.time()class BrowserWebSocket(BaseWebSocket): '''浏览器websocket服务器''' def open(self): '''新的WebSocket连接打开时被调用''' # message = {} # remote_ip = self.request.remote_ip # message['query_string']=self.get_argument('query_string') # message['remote_ip']=remote_ip # auth, error = verify_auth_token(message) auth = True error = 'error' if not auth: reply = json.dumps({'error': error}) self.write_message({'text': reply, 'close': True}) else: reply = "{}" self.write_message({'text': reply}) print(">>> %s connected" % self.request.remote_ip) def on_message(self, message): '''连接收到新消息时被调用''' print '%s start time:'%self,time.time() task = Task.create(message,self) if task: message_queue.put(task) @tornado.gen.coroutine def on_messages(self, message): '''连接收到新消息时被调用''' task = Task.create(message,self) if task: message_queue.put(task) def on_close(self): '''客户端关闭时被调用''' print("<<< %s disconnected" % str(self.request.remote_ip)) # with message_queue.mutex: # message_queue.queue.clear() while not message_queue.empty(): try: message_queue.get(False) except Queue.Empty: continue message_queue.task_done() def check_origin(self, origin): '''允许WebSocket的跨域请求''' return Trueclass Task(object): def __init__(self, uuid, offset_top, image_data, channel, zoom, rType, responseType, *args): self.uuid = uuid self.offset_top = int(float(offset_top)) self.image_data = image_data self.channel = channel self.zoom = zoom self.rType = rType self.responseType = responseType @classmethod def create(clz, message,sel): # data = message.get('text') data = message try: params = json.loads(data[:150]) image_data = data[150:] image_data = image_data.replace(" ", "+") params['image_data'] = image_data params['channel'] = sel # add Type if params.get('responseType') is None: params['responseType'] = 'url' # request type if params.get('rType') is None: params['rType'] = 'rl' task = Task(**params) except ValueError as e: task = None print(">>>message data error!") print(e) return task def __cmp__(self, other): return cmp(self.offset_top, other.offset_top)def verify_auth_token(message): '''token 验证''' token = message.get('query_string') secret_key = 'aoiakai' try: payload = jwt.decode(token, secret_key, algorithms=['HS256']) if payload.get('ip') != message.get('remote_ip'): return False, 'ip mismatch' except jwt.ExpiredSignatureError as e: print(e) return False, 'token expired' except Exception as e: print(e) return False, 'enter correct token' return True, ''work_thread = Thread(target=work_loop)work_thread.daemon = Truework_thread.start()
还没有人抢沙发呢~