lot_manager/video_task.py

256 lines
8.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import uvicorn
from fastapi import FastAPI, WebSocket
import requests
import asyncio
import json
import paho.mqtt.client as mqtt
from pydantic import BaseModel
app = FastAPI()
class BaseResponse(BaseModel):
code: int = 200
msg: str = "success"
class MQTTClient:
def __init__(self, broker, port, topic, username, password):
self.broker = broker
self.port = port
self.topic = topic
self.username = username
self.password = password
# 千万不要指定client_id 不然死翘翘
self.client = mqtt.Client()
self.client.username_pw_set(self.username, self.password)
def push(self):
self.client.publish(self.topic, payload=json.dumps({"msg": "open_led"}, ensure_ascii=False),
qos=0)
self.client.publish(self.topic, payload=json.dumps({"msg": "push_stream"}, ensure_ascii=False),
qos=0)
def close(self):
self.client.publish(self.topic, payload=json.dumps({"msg": "close_stream"}, ensure_ascii=False),
qos=0)
self.client.publish(self.topic, payload=json.dumps({"msg": "close_led"}, ensure_ascii=False),
qos=0)
def start(self):
self.client.connect(self.broker, self.port)
async def close(username, device):
# 倒计时600秒
await asyncio.sleep(30)
print(username + "结束推流")
user_sched.pop(username)
close_stream(username, device)
async def task_start(username, device):
print(username + "创建结束推流定时任务")
task = asyncio.create_task(close(username, device))
user_sched[username] = task
async def start(username, device):
try:
# 如果定时任务不存在,那么直接进行推流。当用户退出界面时,创建任务
if username not in user_sched:
res = requests.get(rtsp_MediaInfo + device)
if res.status_code != 200:
return BaseResponse(code=500, msg="ZLMediakit Server Error!").json()
# 转换为json
res = res.json()
if 'code' in res and res['code'] == 0:
return BaseResponse(code=200, msg=username + ",Do not repeat push stream").json()
# 开始推流
get_times = 50
# 启动推流程序
print(username + "开始推流")
push_stream(username, device)
# 查看请求 ZLMediakit是否已经生成了mp4文件,生成了就给前端返回
while True:
res = requests.get(rtsp_MediaInfo + device)
if res.status_code != 200:
return BaseResponse(code=200, msg="ZLMediakit Server Error!").json()
# 转换为json
res = res.json()
if 'code' in res and res['code'] == 0:
# code == 0 说明已经启动成功
break
# 如果都没有code那就出错了
if 'code' not in res:
return BaseResponse(code=500, msg="ZLMediakit Server Error!").json()
get_times -= 1
if get_times == 0:
# 请求次数达到50次就退了吧
return BaseResponse(code=500, msg="ZLMediakit Server Error!").json()
await asyncio.sleep(1)
return BaseResponse(code=200, msg="success").json()
# 如果定时任务存在,那么取消上次的任务。当用户退出界面时,创建任务
else:
t = user_sched[username]
# 取消任务
if t is not None:
print(username + "取消定时任务")
t.cancel()
return BaseResponse(code=200, msg="success").json()
else:
BaseResponse(code=500, msg="Internal Error").json()
except Exception as e:
return BaseResponse(code=500, msg=str(e)).json()
async def stop(username, device):
try:
await task_start(username, device)
return BaseResponse(code=200, msg="success").json()
except Exception as e:
return BaseResponse(code=500, msg=str(e)).json()
def push_stream(username, device):
MQTT = MQTTClient(broker, port, device, username, username)
MQTT.start()
MQTT.push()
def close_stream(username, device):
MQTT = MQTTClient(broker, port, device, username, username)
MQTT.start()
MQTT.close()
# 定义函数A
async def function_A(data):
data = json.loads(data)
username = data['username']
device = data['device']
return await start(username, device)
# 定义函数B
async def function_B(data):
data = json.loads(data)
username = data['username']
device = data['device']
await stop(username, device)
class Scene:
def __init__(self, user):
self.user = user
self.active = False
def turn_on(self):
self.active = True
def turn_off(self):
self.active = False
class AppScene(Scene):
def __init__(self, user):
super().__init__(user)
self.name = "app"
class WebScene(Scene):
def __init__(self, user):
super().__init__(user)
self.name = "web"
class ScreenScene(Scene):
def __init__(self, user):
super().__init__(user)
self.name = "screen"
class SceneManager:
def __init__(self):
self.users = {}
def add_user(self, user):
# 上锁
if user not in self.users:
self.users[user] = {"app": AppScene(user), "web": WebScene(user), "screen": ScreenScene(user)}
def get_scene(self, user, scene_type):
return self.users[user].get(scene_type)
async def on_connect(ws: WebSocket):
await ws.send_text(json.dumps({"code": 201, "msg": "socket已连接等待接收消息"}, ensure_ascii=False))
async def on_disconnect(the_user_scene, user_app_scene, user_web_scene, user_screen_scene, message):
if the_user_scene is not None:
the_user_scene.turn_off()
# 查看该用户的所有场景是否在线,只有都不在线就关闭推流
if user_app_scene is not None and user_app_scene is not None and user_app_scene is not None:
if user_app_scene.active is False and user_web_scene.active is False and user_screen_scene.active is False:
print("关闭场景" + the_user_scene.name)
await function_B(message)
@app.websocket("/")
async def wsk(ws: WebSocket):
the_user_scene = None
user_app_scene = None
user_web_scene = None
user_screen_scene = None
username = ""
await ws.accept()
await on_connect(ws)
try:
while True:
message = await ws.receive_text()
await ws.send_text(json.dumps({"code": 202, "msg": "socket已收到消息等待处理"}, ensure_ascii=False))
user_msg[ws] = message
data = json.loads(message)
username = data['username']
scene_name = data[SCENE_NAME]
scene_manager.add_user(username)
the_user_scene = scene_manager.get_scene(username, scene_name)
if the_user_scene.active is True:
await ws.send_text(
json.dumps({"code": 200,
"msg": the_user_scene.user + "" + the_user_scene.name + "该场景已经开启了,不要再请求我了,请等待一会!"},
ensure_ascii=False))
continue
user_app_scene = scene_manager.get_scene(username, "app")
user_web_scene = scene_manager.get_scene(username, "web")
user_screen_scene = scene_manager.get_scene(username, "screen")
if user_app_scene.active is True or user_web_scene.active is True or user_screen_scene.active is True:
the_user_scene.turn_on()
await ws.send_text(
json.dumps({"code": 200, "msg": "该用户已经有其他场景开启了,请等待一会!不需要再推流了!"},
ensure_ascii=False))
continue
# 如果该用户的场景没有开启,那么进行开启
print(username + "开启场景" + the_user_scene.name)
the_user_scene.turn_on()
response = await function_A(message)
# 当接收到消息时调用函数A
await ws.send_text(response)
except Exception as e:
print(f"{username}{the_user_scene.name}场景的连接已断开,异常消息或代码:{e}")
await on_disconnect(the_user_scene, user_app_scene, user_web_scene, user_screen_scene, user_msg[ws])
if __name__ == '__main__':
broker = 'mqtt.lihaink.cn'
port = 1883
rtsp_MediaInfo = "http://rtsp.lihaink.cn/index/api/getMediaInfo?secret=YwDtp2llj80HA19JhMXL4Po99nsMAyNT&schema=rtsp&vhost=__defaultVhost__&app=live&stream="
SCENE_NAME = "scene"
user_sched = {}
user_msg = {}
user_scene = []
scene_manager = SceneManager()
uvicorn.run(app, host="0.0.0.0", port=8765)