lot_manager/video_task.py

255 lines
9.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import time
import requests
import websockets
import asyncio
import json
import paho.mqtt.client as mqtt
from pydantic import BaseModel
class BaseResponse(BaseModel):
code: int = 200
msg: str = "success"
class MQTTClient:
def __init__(self, broker, port, topic, username, password):
self.broker = broker
self.port = port
self.topic = topic
self.username = username
self.password = password
# 千万不要指定client_id 不然死翘翘
self.client = mqtt.Client()
self.client.username_pw_set(self.username, self.password)
def push(self):
self.client.publish(self.topic, payload=json.dumps({"msg": "open_led"}, ensure_ascii=False),
qos=0)
self.client.publish(self.topic, payload=json.dumps({"msg": "push_stream"}, ensure_ascii=False),
qos=0)
def close(self):
self.client.publish(self.topic, payload=json.dumps({"msg": "close_stream"}, ensure_ascii=False),
qos=0)
self.client.publish(self.topic, payload=json.dumps({"msg": "close_led"}, ensure_ascii=False),
qos=0)
def start(self):
self.client.connect(self.broker, self.port)
async def close(username, device):
# 倒计时600秒
await asyncio.sleep(3)
print(username + "结束推流")
user_sched.pop(username)
close_stream(username, device)
async def task_start(username, device):
print(username + "创建结束推流定时任务")
task = asyncio.create_task(close(username, device))
user_sched[username] = task
async def start(username, device):
try:
# 如果定时任务不存在,那么直接进行推流。当用户退出界面时,创建任务
if username not in user_sched:
print(username + "开始推流")
res = requests.get(rtsp_MediaInfo + device)
if res.status_code != 200:
return BaseResponse(code=500, msg="ZLMediakit Server Error!").json()
# 转换为json
res = res.json()
if 'code' in res and res['code'] == 0:
return BaseResponse(code=200, msg=username + ",Do not repeat the push stream").json()
# 开始推流
get_times = 50
# 启动推流程序
push_stream(username, device)
# 查看请求 ZLMediakit是否已经生成了mp4文件,生成了就给前端返回
while True:
res = requests.get(rtsp_MediaInfo + device)
if res.status_code != 200:
return BaseResponse(code=200, msg="ZLMediakit Server Error!").json()
# 转换为json
res = res.json()
if 'code' in res and res['code'] == 0:
# code == 0 说明已经启动成功
break
# 如果都没有code那就出错了
if 'code' not in res:
return BaseResponse(code=500, msg="ZLMediakit Server Error!").json()
get_times -= 1
if get_times == 0:
# 请求次数达到50次就退了吧
break
await asyncio.sleep(1)
return BaseResponse(code=200, msg="success").json()
# 如果定时任务存在,那么取消上次的任务。当用户退出界面时,创建任务
else:
t = user_sched[username]
# 取消任务
if t is not None:
print(username + "取消定时任务")
t.cancel()
return BaseResponse(code=200, msg="success").json()
else:
BaseResponse(code=500, msg="Internal Error").json()
except Exception as e:
return BaseResponse(code=500, msg=str(e)).json()
async def stop(username, device):
try:
await task_start(username, device)
return BaseResponse(code=200, msg="success").json()
except Exception as e:
return BaseResponse(code=500, msg=str(e)).json()
def push_stream(username, device):
MQTT = MQTTClient(broker, port, device, username, username)
MQTT.start()
MQTT.push()
def close_stream(username, device):
MQTT = MQTTClient(broker, port, device, username, username)
MQTT.start()
MQTT.close()
# 定义函数A
async def function_A(client, data):
data = json.loads(data)
username = data['username']
device = data['device']
return await start(username, device)
# 定义函数B
async def function_B(client, data):
data = json.loads(data)
username = data['username']
device = data['device']
print(f"{client}的连接已断开")
await stop(username, device)
class Scene:
def __init__(self, user):
self.user = user
self.active = False
def turn_on(self):
self.active = True
def turn_off(self):
self.active = False
class AppScene(Scene):
def __init__(self, user):
super().__init__(user)
self.name = "app"
class WebScene(Scene):
def __init__(self, user):
super().__init__(user)
self.name = "web"
class ScreenScene(Scene):
def __init__(self, user):
super().__init__(user)
self.name = "screen"
class SceneManager:
def __init__(self):
self.users = {}
def add_user(self, user):
# 上锁
if user not in self.users:
print("创建user" + user)
self.users[user] = {"app": AppScene(user), "web": WebScene(user), "screen": ScreenScene(user)}
def get_scene(self, user, scene_type):
return self.users[user].get(scene_type)
async def handler(websocket, path):
the_user_scene = None
user_app_scene = None
user_web_scene = None
user_screen_scene = None
try:
# 在循环中等待客户端的消息
async for message in websocket:
user_msg[websocket] = message
data = json.loads(message)
username = data['username']
scene_name = data[SCENE_NAME]
scene_manager.add_user(username)
the_user_scene = scene_manager.get_scene(username, scene_name)
if the_user_scene.active is True:
await websocket.send(
json.dumps({"code": 200,
"msg": the_user_scene.user + "" + the_user_scene.name + "该场景已经开启了,不要再请求我了!"},
ensure_ascii=False))
continue
user_app_scene = scene_manager.get_scene(username, "app")
user_web_scene = scene_manager.get_scene(username, "web")
user_screen_scene = scene_manager.get_scene(username, "screen")
if user_app_scene.active is True or user_web_scene.active is True or user_screen_scene.active is True:
the_user_scene.turn_on()
await websocket.send(
json.dumps({"code": 200, "msg": "该用户已经有其他场景开启了,不需要再推流了!"}, ensure_ascii=False))
continue
# 如果该用户的场景没有开启,那么进行开启
the_user_scene.turn_on()
response = await function_A(websocket, message)
# 当接收到消息时调用函数A
await websocket.send(response)
# 没有消息了,则首先关闭该用户的场景
if the_user_scene is not None:
the_user_scene.turn_off()
# 查看该用户的所有场景是否在线,只有都不在线就关闭推流
if user_app_scene is not None and user_app_scene is not None and user_app_scene is not None:
if user_app_scene.active is False and user_web_scene.active is False and user_screen_scene.active is False:
print("关闭场景" + the_user_scene.name)
await function_B(websocket, user_msg[websocket])
except Exception as e:
print("异常", e)
# # 没有消息了,则首先关闭该用户的场景
if the_user_scene is not None:
the_user_scene.turn_off()
# 查看该用户的所有场景是否在线,只有都不在线就关闭推流
if user_app_scene is not None and user_app_scene is not None and user_app_scene is not None:
if user_app_scene.active is False and user_web_scene.active is False and user_screen_scene.active is False:
print("关闭场景" + the_user_scene.name)
await function_B(websocket, user_msg[websocket])
if __name__ == '__main__':
broker = 'mqtt.lihaink.cn'
port = 1883
rtsp_MediaInfo = "http://rtsp.lihaink.cn/index/api/getMediaInfo?secret=YwDtp2llj80HA19JhMXL4Po99nsMAyNT&schema=rtsp&vhost=__defaultVhost__&app=live&stream="
SCENE_NAME = "scene"
user_sched = {}
user_msg = {}
user_scene = []
scene_manager = SceneManager()
# 启动WebSocket服务器
start_server = websockets.serve(handler, "0.0.0.0", 8765)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()