diff --git a/ap.py b/ap.py deleted file mode 100644 index 9c1b940..0000000 --- a/ap.py +++ /dev/null @@ -1,28 +0,0 @@ - -from fastapi import FastAPI -import asyncio - -app = FastAPI() -user_sched = {} - -async def my_async_task(): - # 这里是异步任务的代码 - # 倒计时3秒 - await asyncio.sleep(10) - print("close") - -async def task_start(username, device): - task = asyncio.create_task(my_async_task()) - user_sched[username] = task -@app.get("/video/{username}/{device}") -async def startup(username, device): - if username not in user_sched: - await task_start(username, device) - else: - t = user_sched['1'] - t.cancel() - await task_start() - -if __name__ == "__main__": - import uvicorn - uvicorn.run(app, host="0.0.0.0", port=8004) diff --git a/test.py b/test.py deleted file mode 100644 index 5c381b3..0000000 --- a/test.py +++ /dev/null @@ -1,112 +0,0 @@ -import asyncio -import json - -import uvicorn -from fastapi import FastAPI -import paho.mqtt.client as mqtt -from pydantic import BaseModel -from starlette.middleware.cors import CORSMiddleware - - -class BaseResponse(BaseModel): - code: int = 200 - msg: str = "success" - - -app = FastAPI() - -app.add_middleware( - CORSMiddleware, - allow_origins=["*"], - allow_credentials=True, - allow_methods=["*"], - allow_headers=["*"], -) - -class MQTTClient: - def __init__(self, broker, port, topic, username, password): - self.broker = broker - self.port = port - self.topic = topic - self.username = username - self.password = password - # 千万不要指定client_id 不然死翘翘 - self.client = mqtt.Client() - self.client.username_pw_set(self.username, self.password) - - def push(self): - self.client.publish(self.topic, payload=json.dumps({"msg": "open_led"}, ensure_ascii=False), - qos=0) - self.client.publish(self.topic, payload=json.dumps({"msg": "push_stream"}, ensure_ascii=False), - qos=0) - - def close(self): - self.client.publish(self.topic, payload=json.dumps({"msg": "close_stream"}, ensure_ascii=False), - qos=0) - self.client.publish(self.topic, payload=json.dumps({"msg": "close_led"}, ensure_ascii=False), - qos=0) - - def start(self): - self.client.connect(self.broker, self.port) - - -user_sched = {} - -async def close(username, device): - # 倒计时600秒 - await asyncio.sleep(35) - print(username + "结束推流") - user_sched.pop(username) - close_stream(username, device) - - -async def task_start(username, device): - print(username + "创建结束推流定时任务") - task = asyncio.create_task(close(username, device)) - user_sched[username] = task - - -@app.get("/start/video/{username}/{device}") -async def start(username, device): - try: - # 如果定时任务不存在,那么直接进行推流。当用户退出界面时,创建任务 - if username not in user_sched: - print(username + "开始推流") - push_stream(username, device) - # 如果定时任务存在,那么取消上次的任务。当用户退出界面时,创建任务 - else: - t = user_sched[username] - # 取消任务 - if t is not None: - print(username + "取消定时任务") - t.cancel() - return BaseResponse(code=200, msg="success") - except Exception as e: - return BaseResponse(code=500, msg=str(e)) - - -@app.get("/stop/video/{username}/{device}") -async def stop(username, device): - try: - await task_start(username, device) - return BaseResponse(code=200, msg="success") - except Exception as e: - return BaseResponse(code=500, msg=str(e)) - - -def push_stream(username, device): - MQTT = MQTTClient(broker, port, device, username, username) - MQTT.start() - MQTT.push() - - -def close_stream(username, device): - MQTT = MQTTClient(broker, port, device, username, username) - MQTT.start() - MQTT.close() - - -if __name__ == '__main__': - broker = 'mqtt.lihaink.cn' - port = 1883 - uvicorn.run(app, host="0.0.0.0", port=8000) diff --git a/test2.py b/test2.py deleted file mode 100644 index 99fbfa0..0000000 --- a/test2.py +++ /dev/null @@ -1,106 +0,0 @@ -import asyncio -import json - -import paho.mqtt.client as mqtt -import uvicorn -from fastapi import FastAPI -from pydantic import BaseModel - -broker = 'mqtt.lihaink.cn' -port = 1883 - - -class BaseResponse(BaseModel): - code: int = 200 - msg: str = "success" - - -app = FastAPI() - - -class MQTTClient: - def __init__(self, broker, port, topic, username, password): - self.broker = broker - self.port = port - self.topic = topic - self.username = username - self.password = password - # 千万不要指定client_id 不然死翘翘 - self.client = mqtt.Client() - self.client.username_pw_set(self.username, self.password) - - def push(self): - self.client.publish(self.topic, payload=json.dumps({"msg": "open_led"}, ensure_ascii=False), - qos=0) - self.client.publish(self.topic, payload=json.dumps({"msg": "push_stream"}, ensure_ascii=False), - qos=0) - - def close(self): - self.client.publish(self.topic, payload=json.dumps({"msg": "close_stream"}, ensure_ascii=False), - qos=0) - self.client.publish(self.topic, payload=json.dumps({"msg": "close_led"}, ensure_ascii=False), - qos=0) - - def start(self): - self.client.connect(self.broker, self.port) - - -user_sched = {} - - -async def close(username, device): - # 倒计时600秒 - await asyncio.sleep(5) - print(username + "结束推流") - user_sched.pop(username) - close_stream(username, device) - - -async def task_start(username, device): - print(username + "创建结束推流定时任务") - task = asyncio.ensure_future(close(username, device)) - user_sched[username] = task - - -@app.get("/video/start/{username}/{device}") -async def start(username, device): - try: - # 如果定时任务不存在,那么直接进行推流。当用户退出界面时,创建任务 - if username not in user_sched: - print(username + "开始推流") - push_stream(username, device) - # 如果定时任务存在,那么取消上次的任务。当用户退出界面时,创建任务 - else: - t = user_sched[username] - # 取消任务 - if t is not None: - print(username + "取消定时任务") - t.cancel() - return BaseResponse(code=200, msg="success").json() - except Exception as e: - return BaseResponse(code=500, msg=str(e)).json() - - -@app.get("/video/stop/{username}/{device}") -async def stop(username, device): - try: - await task_start(username, device) - return BaseResponse(code=200, msg="success").json() - except Exception as e: - return BaseResponse(code=500, msg=str(e)).json() - - -def push_stream(username, device): - MQTT = MQTTClient(broker, port, device, username, username) - MQTT.start() - MQTT.push() - - -def close_stream(username, device): - MQTT = MQTTClient(broker, port, device, username, username) - MQTT.start() - MQTT.close() - - -if __name__ == '__main__': - uvicorn.run(app, host="127.0.0.1", port=8001) diff --git a/skt.py b/video_task.py similarity index 94% rename from skt.py rename to video_task.py index f538c00..ce967bf 100644 --- a/skt.py +++ b/video_task.py @@ -1,4 +1,3 @@ -import asyncio import websockets import asyncio import json @@ -47,7 +46,7 @@ user_sched = {} async def close(username, device): # 倒计时600秒 - await asyncio.sleep(5) + await asyncio.sleep(600) print(username + "结束推流") user_sched.pop(username) close_stream(username, device) @@ -55,9 +54,8 @@ async def close(username, device): async def task_start(username, device): print(username + "创建结束推流定时任务") - task = asyncio.ensure_future(close(username, device)) + task = asyncio.create_task(close(username, device)) user_sched[username] = task - print("已经完成") async def start(username, device): @@ -103,7 +101,7 @@ async def function_A(client, data): data = json.loads(data) username = data['username'] device = data['device'] - print("A", username, device) + # print("A", username, device) return await start(username, device) @@ -112,7 +110,7 @@ async def function_B(client, data): data = json.loads(data) username = data['username'] device = data['device'] - print("B", username, device) + # print("B", username, device) print(f"{client}的连接已断开") await stop(username, device) @@ -140,4 +138,4 @@ async def handler(websocket, path): start_server = websockets.serve(handler, "0.0.0.0", 8765) asyncio.get_event_loop().run_until_complete(start_server) -asyncio.get_event_loop().run_forever() +asyncio.get_event_loop().run_forever() \ No newline at end of file