lot_manager/skt.py

158 lines
4.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
import websockets
import asyncio
import json
import paho.mqtt.client as mqtt
from pydantic import BaseModel
broker = 'mqtt.lihaink.cn'
port = 1883
class BaseResponse(BaseModel):
code: int = 200
msg: str = "success"
class MQTTClient:
def __init__(self, broker, port, topic, username, password):
self.broker = broker
self.port = port
self.topic = topic
self.username = username
self.password = password
# 千万不要指定client_id 不然死翘翘
self.client = mqtt.Client()
self.client.username_pw_set(self.username, self.password)
def push(self):
self.client.publish(self.topic, payload=json.dumps({"msg": "open_led"}, ensure_ascii=False),
qos=0)
self.client.publish(self.topic, payload=json.dumps({"msg": "push_stream"}, ensure_ascii=False),
qos=0)
def close(self):
self.client.publish(self.topic, payload=json.dumps({"msg": "close_stream"}, ensure_ascii=False),
qos=0)
self.client.publish(self.topic, payload=json.dumps({"msg": "close_led"}, ensure_ascii=False),
qos=0)
def start(self):
self.client.connect(self.broker, self.port)
user_sched = {}
async def close(username, device):
# 倒计时600秒
await asyncio.sleep(35)
print(username + "结束推流")
user_sched.pop(username)
close_stream(username, device)
async def task_start(username, device):
print(username + "创建结束推流定时任务")
task = asyncio.create_task(close(username, device))
user_sched[username] = task
async def start(username, device):
try:
# 如果定时任务不存在,那么直接进行推流。当用户退出界面时,创建任务
if username not in user_sched:
print(username + "开始推流")
push_stream(username, device)
# 如果定时任务存在,那么取消上次的任务。当用户退出界面时,创建任务
else:
t = user_sched[username]
# 取消任务
if t is not None:
print(username + "取消定时任务")
t.cancel()
return BaseResponse(code=200, msg="success")
except Exception as e:
return BaseResponse(code=500, msg=str(e))
async def stop(username, device):
try:
await task_start(username, device)
return BaseResponse(code=200, msg="success")
except Exception as e:
return BaseResponse(code=500, msg=str(e))
def push_stream(username, device):
MQTT = MQTTClient(broker, port, device, username, username)
MQTT.start()
MQTT.push()
def close_stream(username, device):
MQTT = MQTTClient(broker, port, device, username, username)
MQTT.start()
MQTT.close()
# 存储所有连接的WebSocket客户端
connected_clients = set()
# 定义函数A
async def function_A(client, data):
print(type(data))
try:
data = json.loads(data)
except:
pass
print(f"{client}调用了函数A接收到的数据", data)
username = data['username']
device = data['device']
# return await start(username, device)
return await get()
async def get():
return BaseResponse(code=200, msg="success")
# 定义函数B
async def function_B(client, data):
try:
data = json.loads(data)
except:
pass
print(data)
username = data['username']
device = data['device']
print(f"{client}的连接已断开")
# return await stop(username, device)
return await get()
# 创建WebSocket连接的处理函数
async def handler(websocket, path):
# 当新的客户端连接时,添加到集合中
connected_clients.add(websocket)
try:
# 在循环中等待客户端的消息
async for message in websocket:
# 当接收到消息时调用函数A
response = await function_A(websocket, message)
await websocket.send(response)
except Exception as e:
print(e)
await function_B(websocket, message)
finally:
# 确保在连接断开时从集合中移除客户端
# print(connected_clients)
connected_clients.remove(websocket)
# await function_B(websocket, message)
# 启动WebSocket服务器
start_server = websockets.serve(handler, "0.0.0.0", 8765)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()