diff --git a/skt.py b/skt.py index 2fd59fb..0355e8d 100644 --- a/skt.py +++ b/skt.py @@ -47,7 +47,7 @@ user_sched = {} async def close(username, device): # 倒计时600秒 - await asyncio.sleep(35) + await asyncio.sleep(5) print(username + "结束推流") user_sched.pop(username) close_stream(username, device) @@ -55,8 +55,9 @@ async def close(username, device): async def task_start(username, device): print(username + "创建结束推流定时任务") - task = asyncio.create_task(close(username, device)) + task = asyncio.ensure_future(close(username, device)) user_sched[username] = task + print("已经完成") async def start(username, device): @@ -72,17 +73,17 @@ async def start(username, device): if t is not None: print(username + "取消定时任务") t.cancel() - return BaseResponse(code=200, msg="success") + return BaseResponse(code=200, msg="success").json() except Exception as e: - return BaseResponse(code=500, msg=str(e)) + return BaseResponse(code=500, msg=str(e)).json() async def stop(username, device): try: await task_start(username, device) - return BaseResponse(code=200, msg="success") + return BaseResponse(code=200, msg="success").json() except Exception as e: - return BaseResponse(code=500, msg=str(e)) + return BaseResponse(code=500, msg=str(e)).json() def push_stream(username, device): @@ -99,47 +100,42 @@ def close_stream(username, device): # 定义函数A async def function_A(client, data): - try: - data = json.loads(data) - except: - pass - print(f"{client}调用了函数A,接收到的数据:", data) + data = json.loads(data) username = data['username'] device = data['device'] - # return await start(username, device) - - -async def get(): - return BaseResponse(code=200, msg="success") + print("A", username, device) + return await start(username, device) # 定义函数B async def function_B(client, data): - try: - data = json.loads(data) - except: - pass - print(data) + data = json.loads(data) username = data['username'] device = data['device'] + print("B", username, device) print(f"{client}的连接已断开") - # return await stop(username, device) + await stop(username, device) + + +user_msg = {} # 创建WebSocket连接的处理函数 async def handler(websocket, path): + msg = "" try: # 在循环中等待客户端的消息 async for message in websocket: + user_msg[websocket] = message # 当接收到消息时调用函数A response = await function_A(websocket, message) await websocket.send(response) + # 本地情况 + # await function_B(websocket, message) except Exception as e: - print(e) - await function_B(websocket, message) - finally: - pass - + msg = user_msg[websocket] + print("Exception=", msg) + await function_B(websocket, msg) # 启动WebSocket服务器 diff --git a/test2.py b/test2.py index d2c5cf2..99fbfa0 100644 --- a/test2.py +++ b/test2.py @@ -2,13 +2,22 @@ import asyncio import json import paho.mqtt.client as mqtt +import uvicorn +from fastapi import FastAPI from pydantic import BaseModel + broker = 'mqtt.lihaink.cn' port = 1883 + + class BaseResponse(BaseModel): code: int = 200 msg: str = "success" + +app = FastAPI() + + class MQTTClient: def __init__(self, broker, port, topic, username, password): self.broker = broker @@ -38,9 +47,10 @@ class MQTTClient: user_sched = {} + async def close(username, device): # 倒计时600秒 - await asyncio.sleep(35) + await asyncio.sleep(5) print(username + "结束推流") user_sched.pop(username) close_stream(username, device) @@ -48,10 +58,11 @@ async def close(username, device): async def task_start(username, device): print(username + "创建结束推流定时任务") - task = asyncio.create_task(close(username, device)) + task = asyncio.ensure_future(close(username, device)) user_sched[username] = task +@app.get("/video/start/{username}/{device}") async def start(username, device): try: # 如果定时任务不存在,那么直接进行推流。当用户退出界面时,创建任务 @@ -65,17 +76,18 @@ async def start(username, device): if t is not None: print(username + "取消定时任务") t.cancel() - # return BaseResponse(code=200, msg="success") + return BaseResponse(code=200, msg="success").json() except Exception as e: - return BaseResponse(code=500, msg=str(e)) + return BaseResponse(code=500, msg=str(e)).json() +@app.get("/video/stop/{username}/{device}") async def stop(username, device): try: await task_start(username, device) - # return BaseResponse(code=200, msg="success") + return BaseResponse(code=200, msg="success").json() except Exception as e: - return BaseResponse(code=500, msg=str(e)) + return BaseResponse(code=500, msg=str(e)).json() def push_stream(username, device): @@ -88,3 +100,7 @@ def close_stream(username, device): MQTT = MQTTClient(broker, port, device, username, username) MQTT.start() MQTT.close() + + +if __name__ == '__main__': + uvicorn.run(app, host="127.0.0.1", port=8001)