import time import requests import websockets import asyncio import json import paho.mqtt.client as mqtt from pydantic import BaseModel class BaseResponse(BaseModel): code: int = 200 msg: str = "success" class MQTTClient: def __init__(self, broker, port, topic, username, password): self.broker = broker self.port = port self.topic = topic self.username = username self.password = password # 千万不要指定client_id 不然死翘翘 self.client = mqtt.Client() self.client.username_pw_set(self.username, self.password) def push(self): self.client.publish(self.topic, payload=json.dumps({"msg": "open_led"}, ensure_ascii=False), qos=0) self.client.publish(self.topic, payload=json.dumps({"msg": "push_stream"}, ensure_ascii=False), qos=0) def close(self): self.client.publish(self.topic, payload=json.dumps({"msg": "close_stream"}, ensure_ascii=False), qos=0) self.client.publish(self.topic, payload=json.dumps({"msg": "close_led"}, ensure_ascii=False), qos=0) def start(self): self.client.connect(self.broker, self.port) async def close(username, device): # 倒计时600秒 await asyncio.sleep(30) print(username + "结束推流") user_sched.pop(username) close_stream(username, device) async def task_start(username, device): print(username + "创建结束推流定时任务") task = asyncio.create_task(close(username, device)) user_sched[username] = task async def start(username, device): try: # 如果定时任务不存在,那么直接进行推流。当用户退出界面时,创建任务 if username not in user_sched: print(username + "开始推流") res = requests.get(rtsp_MediaInfo + device) if res.status_code != 200: return BaseResponse(code=500, msg="ZLMediakit Server Error!").json() # 转换为json res = res.json() if 'code' in res and res['code'] == 0: return BaseResponse(code=200, msg=username + ",Do not repeat the push stream").json() # 开始推流 get_times = 50 # 启动推流程序 push_stream(username, device) # 查看请求 ,ZLMediakit是否已经生成了mp4文件,生成了就给前端返回 while True: res = requests.get(rtsp_MediaInfo + device) if res.status_code != 200: return BaseResponse(code=200, msg="ZLMediakit Server Error!").json() # 转换为json res = res.json() if 'code' in res and res['code'] == 0: # code == 0 说明已经启动成功 break # 如果都没有code,那就出错了 if 'code' not in res: return BaseResponse(code=500, msg="ZLMediakit Server Error!").json() get_times -= 1 if get_times == 0: # 请求次数达到50次就退了吧 break await asyncio.sleep(1) return BaseResponse(code=200, msg="success").json() # 如果定时任务存在,那么取消上次的任务。当用户退出界面时,创建任务 else: t = user_sched[username] # 取消任务 if t is not None: print(username + "取消定时任务") t.cancel() return BaseResponse(code=200, msg="success").json() else: BaseResponse(code=500, msg="Internal Error").json() except Exception as e: return BaseResponse(code=500, msg=str(e)).json() async def stop(username, device): try: await task_start(username, device) return BaseResponse(code=200, msg="success").json() except Exception as e: return BaseResponse(code=500, msg=str(e)).json() def push_stream(username, device): MQTT = MQTTClient(broker, port, device, username, username) MQTT.start() MQTT.push() def close_stream(username, device): MQTT = MQTTClient(broker, port, device, username, username) MQTT.start() MQTT.close() # 定义函数A async def function_A(client, data): data = json.loads(data) username = data['username'] device = data['device'] return await start(username, device) # 定义函数B async def function_B(client, data): data = json.loads(data) username = data['username'] device = data['device'] print(f"{client}的连接已断开") await stop(username, device) class Scene: def __init__(self, user): self.user = user self.active = False def turn_on(self): self.active = True def turn_off(self): self.active = False class AppScene(Scene): def __init__(self, user): super().__init__(user) self.name = "app" class WebScene(Scene): def __init__(self, user): super().__init__(user) self.name = "web" class ScreenScene(Scene): def __init__(self, user): super().__init__(user) self.name = "screen" class SceneManager: def __init__(self): self.users = {} def add_user(self, user): # 上锁 if user not in self.users: print("创建user" + user) self.users[user] = {"app": AppScene(user), "web": WebScene(user), "screen": ScreenScene(user)} def get_scene(self, user, scene_type): return self.users[user].get(scene_type) async def handler(websocket, path): the_user_scene = None user_app_scene = None user_web_scene = None user_screen_scene = None try: # 在循环中等待客户端的消息 async for message in websocket: user_msg[websocket] = message data = json.loads(message) username = data['username'] scene_name = data[SCENE_NAME] scene_manager.add_user(username) the_user_scene = scene_manager.get_scene(username, scene_name) if the_user_scene.active is True: await websocket.send( json.dumps({"code": 200, "msg": the_user_scene.user + "的" + the_user_scene.name + "该场景已经开启了,不要再请求我了!"}, ensure_ascii=False)) continue user_app_scene = scene_manager.get_scene(username, "app") user_web_scene = scene_manager.get_scene(username, "web") user_screen_scene = scene_manager.get_scene(username, "screen") if user_app_scene.active is True or user_web_scene.active is True or user_screen_scene.active is True: the_user_scene.turn_on() await websocket.send( json.dumps({"code": 200, "msg": "该用户已经有其他场景开启了,不需要再推流了!"}, ensure_ascii=False)) continue # 如果该用户的场景没有开启,那么进行开启 the_user_scene.turn_on() response = await function_A(websocket, message) # 当接收到消息时调用函数A await websocket.send(response) # 没有消息了,则首先关闭该用户的场景 if the_user_scene is not None: the_user_scene.turn_off() # 查看该用户的所有场景是否在线,只有都不在线就关闭推流 if user_app_scene is not None and user_app_scene is not None and user_app_scene is not None: if user_app_scene.active is False and user_web_scene.active is False and user_screen_scene.active is False: print("关闭场景" + the_user_scene.name) await function_B(websocket, user_msg[websocket]) except Exception as e: print("异常", e) # # 没有消息了,则首先关闭该用户的场景 if the_user_scene is not None: the_user_scene.turn_off() # 查看该用户的所有场景是否在线,只有都不在线就关闭推流 if user_app_scene is not None and user_app_scene is not None and user_app_scene is not None: if user_app_scene.active is False and user_web_scene.active is False and user_screen_scene.active is False: print("关闭场景" + the_user_scene.name) await function_B(websocket, user_msg[websocket]) if __name__ == '__main__': broker = 'mqtt.lihaink.cn' port = 1883 rtsp_MediaInfo = "http://rtsp.lihaink.cn/index/api/getMediaInfo?secret=YwDtp2llj80HA19JhMXL4Po99nsMAyNT&schema=rtsp&vhost=__defaultVhost__&app=live&stream=" SCENE_NAME = "scene" user_sched = {} user_msg = {} user_scene = [] scene_manager = SceneManager() # 启动WebSocket服务器 start_server = websockets.serve(handler, "0.0.0.0", 8765) asyncio.get_event_loop().run_until_complete(start_server) asyncio.get_event_loop().run_forever()