From 954a8d714bc1d803a84f13554d1048a7bf92f631 Mon Sep 17 00:00:00 2001 From: xyj <10908227994@qq.com> Date: Sat, 6 Jan 2024 16:29:19 +0800 Subject: [PATCH] update --- conf/example/bash/cron_set_for_32G.sh | 2 +- conf/example/bash/cron_set_for_64G.sh | 2 +- t.py | 128 ++++++++++++++++++++++++++ video_task.py | 69 ++++++++------ 4 files changed, 171 insertions(+), 30 deletions(-) create mode 100644 t.py diff --git a/conf/example/bash/cron_set_for_32G.sh b/conf/example/bash/cron_set_for_32G.sh index 82a83b5..645d6ef 100644 --- a/conf/example/bash/cron_set_for_32G.sh +++ b/conf/example/bash/cron_set_for_32G.sh @@ -2,7 +2,7 @@ set -e sudo crontab -r sudo crontab -l > cron.cron -echo "*/2 * * * * /usr/bin/bash /home/pi/lot_manager/conf/example/bash/start_dataupload_internal_one_hour.sh" >> cron.cron +echo "*/1 * * * * /usr/bin/bash /home/pi/lot_manager/conf/example/bash/start_dataupload_internal_one_hour.sh" >> cron.cron echo "0 0 * * * /usr/bin/python /home/pi/lot_manager/delete_lot_data_3_days.py" >> cron.cron echo "*/15 * * * * /usr/bin/python /home/pi/lot_manager/delete_than20G.py" >> cron.cron sudo crontab cron.cron diff --git a/conf/example/bash/cron_set_for_64G.sh b/conf/example/bash/cron_set_for_64G.sh index a0ed1b5..86da0cb 100644 --- a/conf/example/bash/cron_set_for_64G.sh +++ b/conf/example/bash/cron_set_for_64G.sh @@ -2,7 +2,7 @@ set -e sudo crontab -r sudo crontab -l > cron.cron -echo "*/2 * * * * /usr/bin/bash /home/pi/lot_manager/conf/example/bash/start_dataupload_internal_one_hour.sh" >> cron.cron +echo "*/1 * * * * /usr/bin/bash /home/pi/lot_manager/conf/example/bash/start_dataupload_internal_one_hour.sh" >> cron.cron echo "0 0 * * * /usr/bin/python /home/pi/lot_manager/delete_lot_data_3_days.py" >> cron.cron # 64G的才加 echo "0 * * * * /usr/bin/bash /home/pi/lot_manager/bash/cron_delete_mp4.sh" >> cron.cron diff --git a/t.py b/t.py new file mode 100644 index 0000000..c38b58d --- /dev/null +++ b/t.py @@ -0,0 +1,128 @@ +import asyncio +import json + +import websockets +import threading + +from video_task import function_A, function_B + + +class Scene: + def __init__(self, user): + self.user = user + self.active = False + + def turn_on(self): + self.active = True + + def turn_off(self): + self.active = False + + +class AppScene(Scene): + def __init__(self, user): + super().__init__(user) + self.name = "app" + + +class WebScene(Scene): + def __init__(self, user): + super().__init__(user) + self.name = "web" + + +class ScreenScene(Scene): + def __init__(self, user): + super().__init__(user) + self.name = "screen" + + +class SceneManager: + def __init__(self): + self.users = {} + + def add_user(self, user): + # 上锁 + if user not in self.users: + self.users[user] = {"app": None, "web": None, "screen": None} + + def get_scene(self, user, scene_type): + if user not in self.users: + print(f"用户 {user} 不存在。") + return None + scene = self.users[user].get(scene_type) + if scene is None: + scene = self.create_scene(user, scene_type) + return scene + + def create_scene(self, user, scene_type): + if scene_type == "app": + return AppScene(user) + elif scene_type == "web": + return WebScene(user) + elif scene_type == "screen": + return ScreenScene(user) + else: + print(f"未知的场景类型: {scene_type}") + + +broker = 'mqtt.lihaink.cn' +port = 1883 +APP = "app" +SCREEN = "screen" +WEB = "web" +SCENE_NAME = "scene" +user_sched = {} +user_msg = {} +user_scene = [] +scene_manager = SceneManager() + + +async def handler(websocket, path): + the_user_scene = None + user_app_scene = None + user_web_scene = None + user_screen_scene = None + try: + # 在循环中等待客户端的消息 + async for message in websocket: + user_msg[websocket] = message + data = json.loads(message) + username = data['username'] + scene_name = data[SCENE_NAME] + scene_manager.add_user(username) + the_user_scene = scene_manager.get_scene(username, scene_name) + if the_user_scene.active is True: + print("该用户的场景已经开启,那么不在开启了") + continue + user_app_scene = scene_manager.get_scene(username, "app") + user_web_scene = scene_manager.get_scene(username, "web") + user_screen_scene = scene_manager.get_scene(username, "screen") + if user_app_scene.active is True or user_web_scene.active is True or user_screen_scene.active is True: + the_user_scene.active = True + print("已经有其他场景开启了,不需要再推流了!") + continue + # 如果该用户的场景没有开启,那么进行开启 + the_user_scene.active = True + # 当接收到消息时调用函数A + # response = await function_A(websocket, message) + # await websocket.send(response) + # 没有消息了,则首先关闭该用户的场景 + if the_user_scene is not None: + the_user_scene.active = False + # 查看该用户的所有场景是否在线,只有都不在线就关闭推流 + if user_app_scene is not None and user_app_scene is not None and user_app_scene is not None: + if user_app_scene.active is False and user_web_scene.active is False and user_screen_scene.active is False: + print("关闭场景" + the_user_scene) + # await websocket.send(json.dumps({"code": 200, "msg": "开始关闭场景" + the_user_scene}, ensure_ascii=False)) + # await function_B(websocket, user_msg[websocket]) + except Exception as e: + print(e) + + +if __name__ == '__main__': + # 启动WebSocket服务器 + start_server = websockets.serve(handler, "0.0.0.0", 8765) + + asyncio.get_event_loop().run_until_complete(start_server) + asyncio.get_event_loop().run_forever() diff --git a/video_task.py b/video_task.py index 2714fec..8ad4a2c 100644 --- a/video_task.py +++ b/video_task.py @@ -13,11 +13,7 @@ WEB = "web" SCENE_NAME = "scene" user_sched = {} user_msg = {} -user_scene = { - SCREEN: False, - APP: False, - WEB: False -} +user_scene = [] class BaseResponse(BaseModel): @@ -125,41 +121,58 @@ async def function_B(client, data): # 创建WebSocket连接的处理函数 async def handler(websocket, path): - scene = None + the_user_scene = None + username = None try: # 在循环中等待客户端的消息 async for message in websocket: user_msg[websocket] = message data = json.loads(message) - scene = data[SCENE_NAME] - # 如果该场景已经开启,那么不在开启了 - if user_scene[scene] is True: - await websocket.send(json.dumps({"code": 200, "msg": scene + "该场景已经开启了,不要再请求我了!"}, ensure_ascii=False)) + if ('username' not in data + or 'device' not in data + or 'scene' not in data): + await websocket.send(json.dumps({"code": 200, "msg": "请正确传参"}, ensure_ascii=False)) continue - if user_scene[APP] is True or user_scene[WEB] is True or user_scene[SCREEN] is True: - user_scene[scene] = True - await websocket.send(json.dumps({"code": 200, "msg": "已经有其他场景开启了,不需要再推流了!"}, ensure_ascii=False)) + if (data['username'] is None + or data['device'] is None + or data['scene'] is None): + await websocket.send(json.dumps({"code": 200, "msg": "请传参数,不要为空"}, ensure_ascii=False)) continue - # 如果该场景没有开启,那么进行开启 - user_scene[scene] = True + username = data['username'] + the_user_scene = username + data[SCENE_NAME] + # 如果该用户的场景已经开启,那么不在开启了 + if user_scene[the_user_scene] is True: + await websocket.send( + json.dumps({"code": 200, "msg": the_user_scene + "该场景已经开启了,不要再请求我了!"}, + ensure_ascii=False)) + continue + if user_scene[username + APP] is True or user_scene[username + WEB] is True or user_scene[ + username + SCREEN] is True: + user_scene[the_user_scene] = True + await websocket.send( + json.dumps({"code": 200, "msg": "已经有其他场景开启了,不需要再推流了!"}, ensure_ascii=False)) + continue + # 如果该用户的场景没有开启,那么进行开启 + user_scene[the_user_scene] = True # 当接收到消息时调用函数A response = await function_A(websocket, message) await websocket.send(response) - # 没有消息了,则首先关闭该场景 - if scene is not None: - user_scene[scene] = False - # 查看所有场景是否在线,只有都不在线就关闭推流 - if user_scene[APP] is False and user_scene[WEB] is False and user_scene[SCREEN] is False: - await websocket.send(json.dumps({"code": 200, "msg": "开始关闭场景" + scene}, ensure_ascii=False)) + # 没有消息了,则首先关闭该用户的场景 + if the_user_scene is not None: + user_scene[the_user_scene] = False + # 查看该用户的所有场景是否在线,只有都不在线就关闭推流 + if user_scene[username + APP] is False and user_scene[username + WEB] is False and user_scene[ + username + SCREEN] is False: + await websocket.send(json.dumps({"code": 200, "msg": "开始关闭场景" + the_user_scene}, ensure_ascii=False)) await function_B(websocket, user_msg[websocket]) except Exception as e: - data = json.loads(user_msg[websocket]) - scene = data[SCENE_NAME] - if scene is not None: - user_scene[scene] = False - # 查看所有场景是否在线,只有都不在线就关闭推流 - if user_scene[APP] is False and user_scene[WEB] is False and user_scene[SCREEN] is False: - await function_B(websocket, user_msg[websocket]) + if username is not None: + if the_user_scene is not None: + user_scene[the_user_scene] = False + # 查看所有场景是否在线,只有都不在线就关闭推流 + if user_scene[username + APP] is False and user_scene[username + WEB] is False and user_scene[ + username + SCREEN] is False: + await function_B(websocket, user_msg[websocket]) if __name__ == '__main__':