纵有疾风起
人生不言弃

Django配置websocket请求接口

1.settings.py

INSTALLED_APPS = [    '...',    'channels',    '...',]ASGI_APPLICATION = 'server.routing.application'CHANNEL_LAYERS = {    'default': {        'BACKEND': 'channels_redis.core.RedisChannelLayer',        'CONFIG': {            "hosts": [(os.getenv('REDIS_SERVER_HOST', '127.0.0.1'), int(os.getenv('REDIS_SERVER_PORT', '6379')))],        },    },}

2.routing.py(settin.py同级)

# -*- coding: utf-8 -*-from channels.routing import ProtocolTypeRouter, URLRouterfrom device.consumers import QueryAuthMiddleware  # websocket中间件import device.routing  # 路由application = ProtocolTypeRouter({    'websocket': QueryAuthMiddleware(        URLRouter(            device.routing.websocket_urlpatterns        )    )})
class QueryAuthMiddleware:    """    WebSocket 认证中间件    """    def __init__(self, inner):        # Store the ASGI application we were passed        self.inner = inner    def __call__(self, scope):        query = parse.parse_qs(scope["query_string"].decode())        token = query.get('token', [None])[0]        device_name = None        try:            login_info = json.loads(base64.b64decode(token).decode())            device_name = login_info['name']            logger.info('Device {} connect from {} port {}'.format(device_name, scope['client'][0], scope['client'][1]))            # 验证时间            gen_time = datetime.strptime(login_info['time'], '%Y%m%d%H%M%S')            now_time = datetime.now()            if abs(gen_time - now_time).total_seconds() > 5 * 60:                raise ValueError('Device {} time validate fail: token time: {}, server time: {}'.format(                    device_name, gen_time.strftime('%Y-%m-%d %H:%M:%S'), now_time.strftime('%Y-%m-%d %H:%M:%S')))            # 验证密码            device = DeviceInfoModel.objects.get(name=device_name)  # 获取到设备            correct = device.check_password(login_info['password'])  # 检查密码            if not correct:                raise ValueError('Device {} password validate fail'.format(device_name))        except DeviceInfoModel.DoesNotExist:            logger.info('Device {} can not find in DB'.format(device_name))            device = None        except Exception as e:            logger.error('Connect error {}'.format(e))            device = None        finally:            # Middleware 中必须手动关闭数据库连接            # http://channels.readthedocs.io/en/latest/topics/authentication.html#custom-authentication            close_old_connections()        return self.inner(dict(scope, user=device))

此时,启动方式也应该稍作调整(项目根目录下新建asgi.py文件)

"""ASGI entrypoint. Configures Django and then runs the applicationdefined in the ASGI_APPLICATION setting."""import osimport djangofrom channels.routing import get_default_applicationos.environ.setdefault("DJANGO_SETTINGS_MODULE", "server.settings")django.setup()application = get_default_application()

 客户端示例:

import threadingimport timeimport jsonimport tarfileimport base64import hashlibimport osimport picklefrom io import BytesIOfrom datetime import datetimeimport websocketdef on_message(ws, message):    print('on_message')    msg = json.loads(message)    print(msg)    # print(msg['result'])    # print(pickle.loads(base64.b64decode(msg['content']['2-5b4747af5f627dbcb5eaefeb'])))def on_error(ws, error):    print(error)def on_close(ws):    print("### closed ###")def on_open(ws):    def run(*args):        # for i in range(1):        #     tarobj = BytesIO()        #     tar = tarfile.open(fileobj=tarobj, mode='w:gz')        #     dir_list = filter(lambda name: os.path.isdir(name) and not name.startswith('.'), os.listdir(os.path.abspath(os.path.dirname('__file__'))))        #     for dir in dir_list:        #         tar.add(dir)        #     tar.close()        #     tarobj.seek(0)        with open('D:\\software\\websocket\websocket\\face\\18032811042400\\0.jpg', 'rb') as f:            base64_data = base64.b64encode(f.read())            s = base64_data.decode()        file_obj = {            'timestamp': datetime.now().timestamp(),            'image': s        }        info = {            'command': 'TEST',            'content':  file_obj,            # 'content':  ['30-5b691d365f627dee73c4b58d-common', '71-5b695edf5f627d762d5d651b'],        }        ws.send(json.dumps(info))        time.sleep(100)        ws.close()        print("thread terminating...")    threading.Thread(target=run).start()if __name__ == "__main__":        websocket.enableTrace(True)    login_info = {        'name': '0000000000',        'password': hashlib.md5('0000000000'.encode('utf8')).hexdigest(),        'time': datetime.now().strftime('%Y%m%d%H%M%S')    }    b64_token = base64.b64encode(json.dumps(login_info).encode()).decode()    url = "ws://xx.xx.xx.xx:8000/push/device/?token={}".format(b64_token)    print(login_info)    print('Connect {}'.format(url))    ws = websocket.WebSocketApp(url,                              on_message=on_message,                              on_error=on_error,                              on_close=on_close)    ws.on_open = on_open    ws.run_forever()

 

 

参考:https://channels.readthedocs.io/en/latest/deploying.html

 

文章转载于:https://www.cnblogs.com/52-qq/p/11451435.html

原著是一个有趣的人,若有侵权,请通知删除

未经允许不得转载:起风网 » Django配置websocket请求接口
分享到: 生成海报

评论 抢沙发

评论前必须登录!

立即登录