From db1f6fd6a93dbc4ff3dd5175aff0cbf1180f72cc Mon Sep 17 00:00:00 2001 From: xyj <10908227994@qq.com> Date: Sat, 6 Jan 2024 17:46:53 +0800 Subject: [PATCH] update --- t.py | 128 -------------------------------------------------- video_task.py | 126 ++++++++++++++++++++++++++++++++----------------- 2 files changed, 84 insertions(+), 170 deletions(-) delete mode 100644 t.py diff --git a/t.py b/t.py deleted file mode 100644 index c38b58d..0000000 --- a/t.py +++ /dev/null @@ -1,128 +0,0 @@ -import asyncio -import json - -import websockets -import threading - -from video_task import function_A, function_B - - -class Scene: - def __init__(self, user): - self.user = user - self.active = False - - def turn_on(self): - self.active = True - - def turn_off(self): - self.active = False - - -class AppScene(Scene): - def __init__(self, user): - super().__init__(user) - self.name = "app" - - -class WebScene(Scene): - def __init__(self, user): - super().__init__(user) - self.name = "web" - - -class ScreenScene(Scene): - def __init__(self, user): - super().__init__(user) - self.name = "screen" - - -class SceneManager: - def __init__(self): - self.users = {} - - def add_user(self, user): - # 上锁 - if user not in self.users: - self.users[user] = {"app": None, "web": None, "screen": None} - - def get_scene(self, user, scene_type): - if user not in self.users: - print(f"用户 {user} 不存在。") - return None - scene = self.users[user].get(scene_type) - if scene is None: - scene = self.create_scene(user, scene_type) - return scene - - def create_scene(self, user, scene_type): - if scene_type == "app": - return AppScene(user) - elif scene_type == "web": - return WebScene(user) - elif scene_type == "screen": - return ScreenScene(user) - else: - print(f"未知的场景类型: {scene_type}") - - -broker = 'mqtt.lihaink.cn' -port = 1883 -APP = "app" -SCREEN = "screen" -WEB = "web" -SCENE_NAME = "scene" -user_sched = {} -user_msg = {} -user_scene = [] -scene_manager = SceneManager() - - -async def handler(websocket, path): - the_user_scene = None - user_app_scene = None - user_web_scene = None - user_screen_scene = None - try: - # 在循环中等待客户端的消息 - async for message in websocket: - user_msg[websocket] = message - data = json.loads(message) - username = data['username'] - scene_name = data[SCENE_NAME] - scene_manager.add_user(username) - the_user_scene = scene_manager.get_scene(username, scene_name) - if the_user_scene.active is True: - print("该用户的场景已经开启,那么不在开启了") - continue - user_app_scene = scene_manager.get_scene(username, "app") - user_web_scene = scene_manager.get_scene(username, "web") - user_screen_scene = scene_manager.get_scene(username, "screen") - if user_app_scene.active is True or user_web_scene.active is True or user_screen_scene.active is True: - the_user_scene.active = True - print("已经有其他场景开启了,不需要再推流了!") - continue - # 如果该用户的场景没有开启,那么进行开启 - the_user_scene.active = True - # 当接收到消息时调用函数A - # response = await function_A(websocket, message) - # await websocket.send(response) - # 没有消息了,则首先关闭该用户的场景 - if the_user_scene is not None: - the_user_scene.active = False - # 查看该用户的所有场景是否在线,只有都不在线就关闭推流 - if user_app_scene is not None and user_app_scene is not None and user_app_scene is not None: - if user_app_scene.active is False and user_web_scene.active is False and user_screen_scene.active is False: - print("关闭场景" + the_user_scene) - # await websocket.send(json.dumps({"code": 200, "msg": "开始关闭场景" + the_user_scene}, ensure_ascii=False)) - # await function_B(websocket, user_msg[websocket]) - except Exception as e: - print(e) - - -if __name__ == '__main__': - # 启动WebSocket服务器 - start_server = websockets.serve(handler, "0.0.0.0", 8765) - - asyncio.get_event_loop().run_until_complete(start_server) - asyncio.get_event_loop().run_forever() diff --git a/video_task.py b/video_task.py index 8ad4a2c..96ba845 100644 --- a/video_task.py +++ b/video_task.py @@ -5,16 +5,6 @@ import json import paho.mqtt.client as mqtt from pydantic import BaseModel -broker = 'mqtt.lihaink.cn' -port = 1883 -APP = "app" -SCREEN = "screen" -WEB = "web" -SCENE_NAME = "scene" -user_sched = {} -user_msg = {} -user_scene = [] - class BaseResponse(BaseModel): code: int = 200 @@ -119,63 +109,115 @@ async def function_B(client, data): await stop(username, device) -# 创建WebSocket连接的处理函数 +class Scene: + def __init__(self, user): + self.user = user + self.active = False + + def turn_on(self): + self.active = True + + def turn_off(self): + self.active = False + + +class AppScene(Scene): + def __init__(self, user): + super().__init__(user) + self.name = "app" + + +class WebScene(Scene): + def __init__(self, user): + super().__init__(user) + self.name = "web" + + +class ScreenScene(Scene): + def __init__(self, user): + super().__init__(user) + self.name = "screen" + + +class SceneManager: + def __init__(self): + self.users = {} + + def add_user(self, user): + # 上锁 + if user not in self.users: + print("创建user" + user) + self.users[user] = {"app": AppScene(user), "web": WebScene(user), "screen": ScreenScene(user)} + + def get_scene(self, user, scene_type): + return self.users[user].get(scene_type) + + async def handler(websocket, path): the_user_scene = None - username = None + user_app_scene = None + user_web_scene = None + user_screen_scene = None try: # 在循环中等待客户端的消息 async for message in websocket: user_msg[websocket] = message data = json.loads(message) - if ('username' not in data - or 'device' not in data - or 'scene' not in data): - await websocket.send(json.dumps({"code": 200, "msg": "请正确传参"}, ensure_ascii=False)) - continue - if (data['username'] is None - or data['device'] is None - or data['scene'] is None): - await websocket.send(json.dumps({"code": 200, "msg": "请传参数,不要为空"}, ensure_ascii=False)) - continue username = data['username'] - the_user_scene = username + data[SCENE_NAME] - # 如果该用户的场景已经开启,那么不在开启了 - if user_scene[the_user_scene] is True: + scene_name = data[SCENE_NAME] + scene_manager.add_user(username) + the_user_scene = scene_manager.get_scene(username, scene_name) + if the_user_scene.active is True: await websocket.send( - json.dumps({"code": 200, "msg": the_user_scene + "该场景已经开启了,不要再请求我了!"}, + json.dumps({"code": 200, + "msg": the_user_scene.user + "的" + the_user_scene.name + "该场景已经开启了,不要再请求我了!"}, ensure_ascii=False)) continue - if user_scene[username + APP] is True or user_scene[username + WEB] is True or user_scene[ - username + SCREEN] is True: - user_scene[the_user_scene] = True + user_app_scene = scene_manager.get_scene(username, "app") + user_web_scene = scene_manager.get_scene(username, "web") + user_screen_scene = scene_manager.get_scene(username, "screen") + if user_app_scene.active is True or user_web_scene.active is True or user_screen_scene.active is True: + the_user_scene.turn_on() await websocket.send( - json.dumps({"code": 200, "msg": "已经有其他场景开启了,不需要再推流了!"}, ensure_ascii=False)) + json.dumps({"code": 200, "msg": "该用户已经有其他场景开启了,不需要再推流了!"}, ensure_ascii=False)) continue # 如果该用户的场景没有开启,那么进行开启 - user_scene[the_user_scene] = True + the_user_scene.turn_on() + await websocket.send( + json.dumps({"code": 200, "msg": the_user_scene.user + "开启场景" + the_user_scene.name}, + ensure_ascii=False)) # 当接收到消息时调用函数A response = await function_A(websocket, message) await websocket.send(response) # 没有消息了,则首先关闭该用户的场景 if the_user_scene is not None: - user_scene[the_user_scene] = False + the_user_scene.turn_off() # 查看该用户的所有场景是否在线,只有都不在线就关闭推流 - if user_scene[username + APP] is False and user_scene[username + WEB] is False and user_scene[ - username + SCREEN] is False: - await websocket.send(json.dumps({"code": 200, "msg": "开始关闭场景" + the_user_scene}, ensure_ascii=False)) - await function_B(websocket, user_msg[websocket]) + if user_app_scene is not None and user_app_scene is not None and user_app_scene is not None: + if user_app_scene.active is False and user_web_scene.active is False and user_screen_scene.active is False: + print("关闭场景" + the_user_scene.name) + await function_B(websocket, user_msg[websocket]) except Exception as e: - if username is not None: - if the_user_scene is not None: - user_scene[the_user_scene] = False - # 查看所有场景是否在线,只有都不在线就关闭推流 - if user_scene[username + APP] is False and user_scene[username + WEB] is False and user_scene[ - username + SCREEN] is False: + print(e) + pass + # # 没有消息了,则首先关闭该用户的场景 + if the_user_scene is not None: + the_user_scene.turn_off() + # 查看该用户的所有场景是否在线,只有都不在线就关闭推流 + if user_app_scene is not None and user_app_scene is not None and user_app_scene is not None: + if user_app_scene.active is False and user_web_scene.active is False and user_screen_scene.active is False: + print("关闭场景" + the_user_scene.name) await function_B(websocket, user_msg[websocket]) if __name__ == '__main__': + broker = 'mqtt.lihaink.cn' + port = 1883 + SCENE_NAME = "scene" + user_sched = {} + user_msg = {} + user_scene = [] + scene_manager = SceneManager() # 启动WebSocket服务器 start_server = websockets.serve(handler, "0.0.0.0", 8765)