diff --git a/video_task.py b/video_task.py index 96ba845..4fde51f 100644 --- a/video_task.py +++ b/video_task.py @@ -1,3 +1,6 @@ +import time + +import requests import websockets import asyncio import json @@ -40,7 +43,7 @@ class MQTTClient: async def close(username, device): # 倒计时600秒 - await asyncio.sleep(30) + await asyncio.sleep(3) print(username + "结束推流") user_sched.pop(username) close_stream(username, device) @@ -57,7 +60,36 @@ async def start(username, device): # 如果定时任务不存在,那么直接进行推流。当用户退出界面时,创建任务 if username not in user_sched: print(username + "开始推流") + res = requests.get(rtsp_MediaInfo + device) + if res.status_code != 200: + return BaseResponse(code=500, msg="ZLMediakit Server Error!").json() + # 转换为json + res = res.json() + if 'code' in res and res['code'] == 0: + return BaseResponse(code=200, msg=username + ",Do not repeat the push stream").json() + # 开始推流 + get_times = 50 + # 启动推流程序 push_stream(username, device) + # 查看请求 ,ZLMediakit是否已经生成了mp4文件,生成了就给前端返回 + while True: + res = requests.get(rtsp_MediaInfo + device) + if res.status_code != 200: + return BaseResponse(code=200, msg="ZLMediakit Server Error!").json() + # 转换为json + res = res.json() + if 'code' in res and res['code'] == 0: + # code == 0 说明已经启动成功 + break + # 如果都没有code,那就出错了 + if 'code' not in res: + return BaseResponse(code=500, msg="ZLMediakit Server Error!").json() + get_times -= 1 + if get_times == 0: + # 请求次数达到50次就退了吧 + break + await asyncio.sleep(1) + return BaseResponse(code=200, msg="success").json() # 如果定时任务存在,那么取消上次的任务。当用户退出界面时,创建任务 else: t = user_sched[username] @@ -65,7 +97,9 @@ async def start(username, device): if t is not None: print(username + "取消定时任务") t.cancel() - return BaseResponse(code=200, msg="success").json() + return BaseResponse(code=200, msg="success").json() + else: + BaseResponse(code=500, msg="Internal Error").json() except Exception as e: return BaseResponse(code=500, msg=str(e)).json() @@ -95,7 +129,6 @@ async def function_A(client, data): data = json.loads(data) username = data['username'] device = data['device'] - # print("A", username, device) return await start(username, device) @@ -104,7 +137,6 @@ async def function_B(client, data): data = json.loads(data) username = data['username'] device = data['device'] - # print("B", username, device) print(f"{client}的连接已断开") await stop(username, device) @@ -183,11 +215,8 @@ async def handler(websocket, path): continue # 如果该用户的场景没有开启,那么进行开启 the_user_scene.turn_on() - await websocket.send( - json.dumps({"code": 200, "msg": the_user_scene.user + "开启场景" + the_user_scene.name}, - ensure_ascii=False)) - # 当接收到消息时调用函数A response = await function_A(websocket, message) + # 当接收到消息时调用函数A await websocket.send(response) # 没有消息了,则首先关闭该用户的场景 if the_user_scene is not None: @@ -198,8 +227,7 @@ async def handler(websocket, path): print("关闭场景" + the_user_scene.name) await function_B(websocket, user_msg[websocket]) except Exception as e: - print(e) - pass + print("异常", e) # # 没有消息了,则首先关闭该用户的场景 if the_user_scene is not None: the_user_scene.turn_off() @@ -213,6 +241,7 @@ async def handler(websocket, path): if __name__ == '__main__': broker = 'mqtt.lihaink.cn' port = 1883 + rtsp_MediaInfo = "http://rtsp.lihaink.cn/index/api/getMediaInfo?secret=YwDtp2llj80HA19JhMXL4Po99nsMAyNT&schema=rtsp&vhost=__defaultVhost__&app=live&stream=" SCENE_NAME = "scene" user_sched = {} user_msg = {}