import websockets import asyncio import json import paho.mqtt.client as mqtt from pydantic import BaseModel broker = 'mqtt.lihaink.cn' port = 1883 APP = "app" SCREEN = "screen" WEB = "web" SCENE_NAME = "scene" user_sched = {} user_msg = {} user_scene = { SCREEN: False, APP: False, WEB: False } class BaseResponse(BaseModel): code: int = 200 msg: str = "success" class MQTTClient: def __init__(self, broker, port, topic, username, password): self.broker = broker self.port = port self.topic = topic self.username = username self.password = password # 千万不要指定client_id 不然死翘翘 self.client = mqtt.Client() self.client.username_pw_set(self.username, self.password) def push(self): self.client.publish(self.topic, payload=json.dumps({"msg": "open_led"}, ensure_ascii=False), qos=0) self.client.publish(self.topic, payload=json.dumps({"msg": "push_stream"}, ensure_ascii=False), qos=0) def close(self): self.client.publish(self.topic, payload=json.dumps({"msg": "close_stream"}, ensure_ascii=False), qos=0) self.client.publish(self.topic, payload=json.dumps({"msg": "close_led"}, ensure_ascii=False), qos=0) def start(self): self.client.connect(self.broker, self.port) async def close(username, device): # 倒计时600秒 await asyncio.sleep(30) print(username + "结束推流") user_sched.pop(username) close_stream(username, device) async def task_start(username, device): print(username + "创建结束推流定时任务") task = asyncio.create_task(close(username, device)) user_sched[username] = task async def start(username, device): try: # 如果定时任务不存在,那么直接进行推流。当用户退出界面时,创建任务 if username not in user_sched: print(username + "开始推流") push_stream(username, device) # 如果定时任务存在,那么取消上次的任务。当用户退出界面时,创建任务 else: t = user_sched[username] # 取消任务 if t is not None: print(username + "取消定时任务") t.cancel() return BaseResponse(code=200, msg="success").json() except Exception as e: return BaseResponse(code=500, msg=str(e)).json() async def stop(username, device): try: await task_start(username, device) return BaseResponse(code=200, msg="success").json() except Exception as e: return BaseResponse(code=500, msg=str(e)).json() def push_stream(username, device): MQTT = MQTTClient(broker, port, device, username, username) MQTT.start() MQTT.push() def close_stream(username, device): MQTT = MQTTClient(broker, port, device, username, username) MQTT.start() MQTT.close() # 定义函数A async def function_A(client, data): data = json.loads(data) username = data['username'] device = data['device'] # print("A", username, device) return await start(username, device) # 定义函数B async def function_B(client, data): data = json.loads(data) username = data['username'] device = data['device'] # print("B", username, device) print(f"{client}的连接已断开") await stop(username, device) # 创建WebSocket连接的处理函数 async def handler(websocket, path): scene = None try: # 在循环中等待客户端的消息 async for message in websocket: user_msg[websocket] = message data = json.loads(message) scene = data[SCENE_NAME] # 如果该场景已经开启,那么不在开启了 if user_scene[scene] is True: await websocket.send(json.dumps({"code": 200, "msg": scene + "该场景已经开启了,不要再请求我了!"}, ensure_ascii=False)) continue if user_scene[APP] is True or user_scene[WEB] is True or user_scene[SCREEN] is True: user_scene[scene] = True await websocket.send(json.dumps({"code": 200, "msg": "已经有其他场景开启了,不需要再推流了!"}, ensure_ascii=False)) continue # 如果该场景没有开启,那么进行开启 user_scene[scene] = True # 当接收到消息时调用函数A response = await function_A(websocket, message) await websocket.send(response) # 没有消息了,则首先关闭该场景 if scene is not None: user_scene[scene] = False # 查看所有场景是否在线,只有都不在线就关闭推流 if user_scene[APP] is False and user_scene[WEB] is False and user_scene[SCREEN] is False: await websocket.send(json.dumps({"code": 200, "msg": "开始关闭场景" + scene}, ensure_ascii=False)) await function_B(websocket, user_msg[websocket]) except Exception as e: data = json.loads(user_msg[websocket]) scene = data[SCENE_NAME] if scene is not None: user_scene[scene] = False # 查看所有场景是否在线,只有都不在线就关闭推流 if user_scene[APP] is False and user_scene[WEB] is False and user_scene[SCREEN] is False: await function_B(websocket, user_msg[websocket]) if __name__ == '__main__': # 启动WebSocket服务器 start_server = websockets.serve(handler, "0.0.0.0", 8765) asyncio.get_event_loop().run_until_complete(start_server) asyncio.get_event_loop().run_forever()