import asyncio import json import websockets import threading from video_task import function_A, function_B class Scene: def __init__(self, user): self.user = user self.active = False def turn_on(self): self.active = True def turn_off(self): self.active = False class AppScene(Scene): def __init__(self, user): super().__init__(user) self.name = "app" class WebScene(Scene): def __init__(self, user): super().__init__(user) self.name = "web" class ScreenScene(Scene): def __init__(self, user): super().__init__(user) self.name = "screen" class SceneManager: def __init__(self): self.users = {} def add_user(self, user): # 上锁 if user not in self.users: self.users[user] = {"app": None, "web": None, "screen": None} def get_scene(self, user, scene_type): if user not in self.users: print(f"用户 {user} 不存在。") return None scene = self.users[user].get(scene_type) if scene is None: scene = self.create_scene(user, scene_type) return scene def create_scene(self, user, scene_type): if scene_type == "app": return AppScene(user) elif scene_type == "web": return WebScene(user) elif scene_type == "screen": return ScreenScene(user) else: print(f"未知的场景类型: {scene_type}") broker = 'mqtt.lihaink.cn' port = 1883 APP = "app" SCREEN = "screen" WEB = "web" SCENE_NAME = "scene" user_sched = {} user_msg = {} user_scene = [] scene_manager = SceneManager() async def handler(websocket, path): the_user_scene = None user_app_scene = None user_web_scene = None user_screen_scene = None try: # 在循环中等待客户端的消息 async for message in websocket: user_msg[websocket] = message data = json.loads(message) username = data['username'] scene_name = data[SCENE_NAME] scene_manager.add_user(username) the_user_scene = scene_manager.get_scene(username, scene_name) if the_user_scene.active is True: print("该用户的场景已经开启,那么不在开启了") continue user_app_scene = scene_manager.get_scene(username, "app") user_web_scene = scene_manager.get_scene(username, "web") user_screen_scene = scene_manager.get_scene(username, "screen") if user_app_scene.active is True or user_web_scene.active is True or user_screen_scene.active is True: the_user_scene.active = True print("已经有其他场景开启了,不需要再推流了!") continue # 如果该用户的场景没有开启,那么进行开启 the_user_scene.active = True # 当接收到消息时调用函数A # response = await function_A(websocket, message) # await websocket.send(response) # 没有消息了,则首先关闭该用户的场景 if the_user_scene is not None: the_user_scene.active = False # 查看该用户的所有场景是否在线,只有都不在线就关闭推流 if user_app_scene is not None and user_app_scene is not None and user_app_scene is not None: if user_app_scene.active is False and user_web_scene.active is False and user_screen_scene.active is False: print("关闭场景" + the_user_scene) # await websocket.send(json.dumps({"code": 200, "msg": "开始关闭场景" + the_user_scene}, ensure_ascii=False)) # await function_B(websocket, user_msg[websocket]) except Exception as e: print(e) if __name__ == '__main__': # 启动WebSocket服务器 start_server = websockets.serve(handler, "0.0.0.0", 8765) asyncio.get_event_loop().run_until_complete(start_server) asyncio.get_event_loop().run_forever()