diff --git a/requirements.txt b/requirements.txt index 3c8abca..a368907 100755 --- a/requirements.txt +++ b/requirements.txt @@ -5,4 +5,6 @@ fastapi~=0.95.1 paho-mqtt~=1.6.1 requests~=2.31.0 pyserial~=3.5 -board~=1.0 \ No newline at end of file +board~=1.0 +websockets~=11.0.3 +starlette~=0.26.1 \ No newline at end of file diff --git a/skt.py b/skt.py new file mode 100644 index 0000000..3c7bc65 --- /dev/null +++ b/skt.py @@ -0,0 +1,157 @@ +import asyncio +import websockets +import asyncio +import json + +import paho.mqtt.client as mqtt +from pydantic import BaseModel + +broker = 'mqtt.lihaink.cn' +port = 1883 + + +class BaseResponse(BaseModel): + code: int = 200 + msg: str = "success" + + +class MQTTClient: + def __init__(self, broker, port, topic, username, password): + self.broker = broker + self.port = port + self.topic = topic + self.username = username + self.password = password + # 千万不要指定client_id 不然死翘翘 + self.client = mqtt.Client() + self.client.username_pw_set(self.username, self.password) + + def push(self): + self.client.publish(self.topic, payload=json.dumps({"msg": "open_led"}, ensure_ascii=False), + qos=0) + self.client.publish(self.topic, payload=json.dumps({"msg": "push_stream"}, ensure_ascii=False), + qos=0) + + def close(self): + self.client.publish(self.topic, payload=json.dumps({"msg": "close_stream"}, ensure_ascii=False), + qos=0) + self.client.publish(self.topic, payload=json.dumps({"msg": "close_led"}, ensure_ascii=False), + qos=0) + + def start(self): + self.client.connect(self.broker, self.port) + + +user_sched = {} + + +async def close(username, device): + # 倒计时600秒 + await asyncio.sleep(35) + print(username + "结束推流") + user_sched.pop(username) + close_stream(username, device) + + +async def task_start(username, device): + print(username + "创建结束推流定时任务") + task = asyncio.create_task(close(username, device)) + user_sched[username] = task + + +async def start(username, device): + try: + # 如果定时任务不存在,那么直接进行推流。当用户退出界面时,创建任务 + if username not in user_sched: + print(username + "开始推流") + push_stream(username, device) + # 如果定时任务存在,那么取消上次的任务。当用户退出界面时,创建任务 + else: + t = user_sched[username] + # 取消任务 + if t is not None: + print(username + "取消定时任务") + t.cancel() + return BaseResponse(code=200, msg="success") + except Exception as e: + return BaseResponse(code=500, msg=str(e)) + + +async def stop(username, device): + try: + await task_start(username, device) + return BaseResponse(code=200, msg="success") + except Exception as e: + return BaseResponse(code=500, msg=str(e)) + + +def push_stream(username, device): + MQTT = MQTTClient(broker, port, device, username, username) + MQTT.start() + MQTT.push() + + +def close_stream(username, device): + MQTT = MQTTClient(broker, port, device, username, username) + MQTT.start() + MQTT.close() + + +# 存储所有连接的WebSocket客户端 +connected_clients = set() + + +# 定义函数A +async def function_A(client, data): + print(type(data)) + try: + data = json.loads(data) + except: + pass + print(f"{client}调用了函数A,接收到的数据:", data) + username = data['username'] + device = data['device'] + # return await start(username, device) + return await get() + +async def get(): + return BaseResponse(code=200, msg="success") + +# 定义函数B +async def function_B(client, data): + try: + data = json.loads(data) + except: + pass + print(data) + username = data['username'] + device = data['device'] + print(f"{client}的连接已断开") + # return await stop(username, device) + return await get() + +# 创建WebSocket连接的处理函数 +async def handler(websocket, path): + # 当新的客户端连接时,添加到集合中 + connected_clients.add(websocket) + try: + # 在循环中等待客户端的消息 + async for message in websocket: + # 当接收到消息时调用函数A + response = await function_A(websocket, message) + await websocket.send(response) + except Exception as e: + print(e) + await function_B(websocket, message) + finally: + # 确保在连接断开时从集合中移除客户端 + # print(connected_clients) + connected_clients.remove(websocket) + # await function_B(websocket, message) + + +# 启动WebSocket服务器 +start_server = websockets.serve(handler, "0.0.0.0", 8765) + +asyncio.get_event_loop().run_until_complete(start_server) +asyncio.get_event_loop().run_forever() diff --git a/test.py b/test.py index 72433c8..5c381b3 100644 --- a/test.py +++ b/test.py @@ -5,6 +5,7 @@ import uvicorn from fastapi import FastAPI import paho.mqtt.client as mqtt from pydantic import BaseModel +from starlette.middleware.cors import CORSMiddleware class BaseResponse(BaseModel): @@ -14,6 +15,13 @@ class BaseResponse(BaseModel): app = FastAPI() +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) class MQTTClient: def __init__(self, broker, port, topic, username, password): @@ -44,17 +52,16 @@ class MQTTClient: user_sched = {} - async def close(username, device): # 倒计时600秒 - await asyncio.sleep(600) - # print(username + "结束推流") + await asyncio.sleep(35) + print(username + "结束推流") user_sched.pop(username) close_stream(username, device) async def task_start(username, device): - # print(username + "创建结束推流定时任务") + print(username + "创建结束推流定时任务") task = asyncio.create_task(close(username, device)) user_sched[username] = task @@ -64,14 +71,14 @@ async def start(username, device): try: # 如果定时任务不存在,那么直接进行推流。当用户退出界面时,创建任务 if username not in user_sched: - # print(username + "开始推流") + print(username + "开始推流") push_stream(username, device) # 如果定时任务存在,那么取消上次的任务。当用户退出界面时,创建任务 else: t = user_sched[username] # 取消任务 if t is not None: - # print(username + "取消定时任务") + print(username + "取消定时任务") t.cancel() return BaseResponse(code=200, msg="success") except Exception as e: @@ -102,4 +109,4 @@ def close_stream(username, device): if __name__ == '__main__': broker = 'mqtt.lihaink.cn' port = 1883 - uvicorn.run(app, host="127.0.0.1", port=8001) + uvicorn.run(app, host="0.0.0.0", port=8000) diff --git a/test2.py b/test2.py new file mode 100644 index 0000000..d2c5cf2 --- /dev/null +++ b/test2.py @@ -0,0 +1,90 @@ +import asyncio +import json + +import paho.mqtt.client as mqtt +from pydantic import BaseModel +broker = 'mqtt.lihaink.cn' +port = 1883 +class BaseResponse(BaseModel): + code: int = 200 + msg: str = "success" + +class MQTTClient: + def __init__(self, broker, port, topic, username, password): + self.broker = broker + self.port = port + self.topic = topic + self.username = username + self.password = password + # 千万不要指定client_id 不然死翘翘 + self.client = mqtt.Client() + self.client.username_pw_set(self.username, self.password) + + def push(self): + self.client.publish(self.topic, payload=json.dumps({"msg": "open_led"}, ensure_ascii=False), + qos=0) + self.client.publish(self.topic, payload=json.dumps({"msg": "push_stream"}, ensure_ascii=False), + qos=0) + + def close(self): + self.client.publish(self.topic, payload=json.dumps({"msg": "close_stream"}, ensure_ascii=False), + qos=0) + self.client.publish(self.topic, payload=json.dumps({"msg": "close_led"}, ensure_ascii=False), + qos=0) + + def start(self): + self.client.connect(self.broker, self.port) + + +user_sched = {} + +async def close(username, device): + # 倒计时600秒 + await asyncio.sleep(35) + print(username + "结束推流") + user_sched.pop(username) + close_stream(username, device) + + +async def task_start(username, device): + print(username + "创建结束推流定时任务") + task = asyncio.create_task(close(username, device)) + user_sched[username] = task + + +async def start(username, device): + try: + # 如果定时任务不存在,那么直接进行推流。当用户退出界面时,创建任务 + if username not in user_sched: + print(username + "开始推流") + push_stream(username, device) + # 如果定时任务存在,那么取消上次的任务。当用户退出界面时,创建任务 + else: + t = user_sched[username] + # 取消任务 + if t is not None: + print(username + "取消定时任务") + t.cancel() + # return BaseResponse(code=200, msg="success") + except Exception as e: + return BaseResponse(code=500, msg=str(e)) + + +async def stop(username, device): + try: + await task_start(username, device) + # return BaseResponse(code=200, msg="success") + except Exception as e: + return BaseResponse(code=500, msg=str(e)) + + +def push_stream(username, device): + MQTT = MQTTClient(broker, port, device, username, username) + MQTT.start() + MQTT.push() + + +def close_stream(username, device): + MQTT = MQTTClient(broker, port, device, username, username) + MQTT.start() + MQTT.close()