This commit is contained in:
xyj 2024-01-11 13:09:45 +08:00
parent 5c2112f047
commit dba3bb9811
1 changed files with 44 additions and 42 deletions

View File

@ -1,13 +1,15 @@
import time import uvicorn
from fastapi import FastAPI, WebSocket
import requests import requests
import websockets
import asyncio import asyncio
import json import json
import paho.mqtt.client as mqtt import paho.mqtt.client as mqtt
from pydantic import BaseModel from pydantic import BaseModel
app = FastAPI()
class BaseResponse(BaseModel): class BaseResponse(BaseModel):
code: int = 200 code: int = 200
@ -59,17 +61,17 @@ async def start(username, device):
try: try:
# 如果定时任务不存在,那么直接进行推流。当用户退出界面时,创建任务 # 如果定时任务不存在,那么直接进行推流。当用户退出界面时,创建任务
if username not in user_sched: if username not in user_sched:
print(username + "开始推流")
res = requests.get(rtsp_MediaInfo + device) res = requests.get(rtsp_MediaInfo + device)
if res.status_code != 200: if res.status_code != 200:
return BaseResponse(code=500, msg="ZLMediakit Server Error!").json() return BaseResponse(code=500, msg="ZLMediakit Server Error!").json()
# 转换为json # 转换为json
res = res.json() res = res.json()
if 'code' in res and res['code'] == 0: if 'code' in res and res['code'] == 0:
return BaseResponse(code=200, msg=username + ",Do not repeat the push stream").json() return BaseResponse(code=200, msg=username + ",Do not repeat push stream").json()
# 开始推流 # 开始推流
get_times = 50 get_times = 50
# 启动推流程序 # 启动推流程序
print(username + "开始推流")
push_stream(username, device) push_stream(username, device)
# 查看请求 ZLMediakit是否已经生成了mp4文件,生成了就给前端返回 # 查看请求 ZLMediakit是否已经生成了mp4文件,生成了就给前端返回
while True: while True:
@ -87,7 +89,7 @@ async def start(username, device):
get_times -= 1 get_times -= 1
if get_times == 0: if get_times == 0:
# 请求次数达到50次就退了吧 # 请求次数达到50次就退了吧
break return BaseResponse(code=500, msg="ZLMediakit Server Error!").json()
await asyncio.sleep(1) await asyncio.sleep(1)
return BaseResponse(code=200, msg="success").json() return BaseResponse(code=200, msg="success").json()
# 如果定时任务存在,那么取消上次的任务。当用户退出界面时,创建任务 # 如果定时任务存在,那么取消上次的任务。当用户退出界面时,创建任务
@ -125,7 +127,7 @@ def close_stream(username, device):
# 定义函数A # 定义函数A
async def function_A(client, data): async def function_A(data):
data = json.loads(data) data = json.loads(data)
username = data['username'] username = data['username']
device = data['device'] device = data['device']
@ -133,11 +135,10 @@ async def function_A(client, data):
# 定义函数B # 定义函数B
async def function_B(client, data): async def function_B(data):
data = json.loads(data) data = json.loads(data)
username = data['username'] username = data['username']
device = data['device'] device = data['device']
print(f"{client}的连接已断开")
await stop(username, device) await stop(username, device)
@ -178,31 +179,50 @@ class SceneManager:
def add_user(self, user): def add_user(self, user):
# 上锁 # 上锁
if user not in self.users: if user not in self.users:
print("创建user" + user)
self.users[user] = {"app": AppScene(user), "web": WebScene(user), "screen": ScreenScene(user)} self.users[user] = {"app": AppScene(user), "web": WebScene(user), "screen": ScreenScene(user)}
def get_scene(self, user, scene_type): def get_scene(self, user, scene_type):
return self.users[user].get(scene_type) return self.users[user].get(scene_type)
async def handler(websocket, path): async def on_connect(ws: WebSocket):
await ws.send_text(json.dumps({"code": 201, "msg": "socket已连接等待接收消息"}, ensure_ascii=False))
async def on_disconnect(the_user_scene, user_app_scene, user_web_scene, user_screen_scene, message):
if the_user_scene is not None:
the_user_scene.turn_off()
# 查看该用户的所有场景是否在线,只有都不在线就关闭推流
if user_app_scene is not None and user_app_scene is not None and user_app_scene is not None:
if user_app_scene.active is False and user_web_scene.active is False and user_screen_scene.active is False:
print("关闭场景" + the_user_scene.name)
await function_B(message)
print("socket已关闭")
@app.websocket("/")
async def wsk(ws: WebSocket):
the_user_scene = None the_user_scene = None
user_app_scene = None user_app_scene = None
user_web_scene = None user_web_scene = None
user_screen_scene = None user_screen_scene = None
username = ""
await ws.accept()
await on_connect(ws)
try: try:
# 在循环中等待客户端的消息 while True:
async for message in websocket: message = await ws.receive_text()
user_msg[websocket] = message await ws.send_text(json.dumps({"code": 202, "msg": "socket已收到消息等待处理"}, ensure_ascii=False))
user_msg[ws] = message
data = json.loads(message) data = json.loads(message)
username = data['username'] username = data['username']
scene_name = data[SCENE_NAME] scene_name = data[SCENE_NAME]
scene_manager.add_user(username) scene_manager.add_user(username)
the_user_scene = scene_manager.get_scene(username, scene_name) the_user_scene = scene_manager.get_scene(username, scene_name)
if the_user_scene.active is True: if the_user_scene.active is True:
await websocket.send( await ws.send_text(
json.dumps({"code": 200, json.dumps({"code": 200,
"msg": the_user_scene.user + "" + the_user_scene.name + "该场景已经开启了,不要再请求我了"}, "msg": the_user_scene.user + "" + the_user_scene.name + "该场景已经开启了,不要再请求我了,请等待一会"},
ensure_ascii=False)) ensure_ascii=False))
continue continue
user_app_scene = scene_manager.get_scene(username, "app") user_app_scene = scene_manager.get_scene(username, "app")
@ -210,33 +230,19 @@ async def handler(websocket, path):
user_screen_scene = scene_manager.get_scene(username, "screen") user_screen_scene = scene_manager.get_scene(username, "screen")
if user_app_scene.active is True or user_web_scene.active is True or user_screen_scene.active is True: if user_app_scene.active is True or user_web_scene.active is True or user_screen_scene.active is True:
the_user_scene.turn_on() the_user_scene.turn_on()
await websocket.send( await ws.send_text(
json.dumps({"code": 200, "msg": "该用户已经有其他场景开启了,不需要再推流了!"}, ensure_ascii=False)) json.dumps({"code": 200, "msg": "该用户已经有其他场景开启了,请等待一会!不需要再推流了!"},
ensure_ascii=False))
continue continue
# 如果该用户的场景没有开启,那么进行开启 # 如果该用户的场景没有开启,那么进行开启
print(username + "开启场景" + the_user_scene.name)
the_user_scene.turn_on() the_user_scene.turn_on()
response = await function_A(websocket, message) response = await function_A(message)
# 当接收到消息时调用函数A # 当接收到消息时调用函数A
await websocket.send(response) await ws.send_text(response)
# 没有消息了,则首先关闭该用户的场景
if the_user_scene is not None:
the_user_scene.turn_off()
# 查看该用户的所有场景是否在线,只有都不在线就关闭推流
if user_app_scene is not None and user_app_scene is not None and user_app_scene is not None:
if user_app_scene.active is False and user_web_scene.active is False and user_screen_scene.active is False:
print("关闭场景" + the_user_scene.name)
await function_B(websocket, user_msg[websocket])
except Exception as e: except Exception as e:
print("异常", e) print(f"{username}{the_user_scene.name}场景的连接已断开,异常消息或代码:{e}")
# # 没有消息了,则首先关闭该用户的场景 await on_disconnect(the_user_scene, user_app_scene, user_web_scene, user_screen_scene, user_msg[ws])
if the_user_scene is not None:
the_user_scene.turn_off()
# 查看该用户的所有场景是否在线,只有都不在线就关闭推流
if user_app_scene is not None and user_app_scene is not None and user_app_scene is not None:
if user_app_scene.active is False and user_web_scene.active is False and user_screen_scene.active is False:
print("关闭场景" + the_user_scene.name)
await function_B(websocket, user_msg[websocket])
if __name__ == '__main__': if __name__ == '__main__':
broker = 'mqtt.lihaink.cn' broker = 'mqtt.lihaink.cn'
@ -247,8 +253,4 @@ if __name__ == '__main__':
user_msg = {} user_msg = {}
user_scene = [] user_scene = []
scene_manager = SceneManager() scene_manager = SceneManager()
# 启动WebSocket服务器 uvicorn.run(app)
start_server = websockets.serve(handler, "0.0.0.0", 8765)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()