From a8f881a12ff43e4d781f26fbdaa6638c7866d5ab Mon Sep 17 00:00:00 2001 From: xyj <10908227994@qq.com> Date: Fri, 26 Jan 2024 16:28:47 +0800 Subject: [PATCH] update --- config.py | 8 ++ xumu.py | 8 ++ xumu_test.py | 258 +++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 274 insertions(+) create mode 100644 xumu_test.py diff --git a/config.py b/config.py index 4afeb13..effd0a2 100644 --- a/config.py +++ b/config.py @@ -94,6 +94,14 @@ def get_sql(deviceId, type): return common_template(deviceId) +def get_client_change_status_sql(timestamp, status): + if status: + return [f"insert into root.farm.clientId(timestamp, is_online) values({timestamp}, True)"] + else: + return [f"insert into root.farm.clientId(timestamp, is_online) values({timestamp}, False)"] + + + # 监控视频接口 def get_video_url(username): return f"http://60.204.152.17:8888/live/xumu_{username}.live.mp4" diff --git a/xumu.py b/xumu.py index b8bd880..0045958 100644 --- a/xumu.py +++ b/xumu.py @@ -131,6 +131,8 @@ async def register(request: Request): if len(v) != 0: return BaseResponse(code=302, msg="该设备已经注册过了") # 创建该设备 + is_online = False + send_json = { "devices": ["root.device"], "timestamps": [int(time.time() * 1000)], @@ -139,6 +141,7 @@ async def register(request: Request): "values_list": [[iccid, deviceId, type]], "is_aligned": False } + res = [] r = requests.post(baseHost + insertUri, headers=headers, json=send_json) res.append(r.json()) @@ -161,7 +164,12 @@ async def process_data(request: Request): try: data = await request.body() data = data.decode("utf-8") + print(data) + return data = json.loads(data) + # 连接成功处理 + # 断开连接处理 + # 消息体处理 payload = data["payload"] receive_len = len(payload) payload = json.loads(payload) diff --git a/xumu_test.py b/xumu_test.py new file mode 100644 index 0000000..968f81d --- /dev/null +++ b/xumu_test.py @@ -0,0 +1,258 @@ +import time +import json +from typing import Any + +import requests +import uvicorn +from fastapi import FastAPI, Request +from pydantic import BaseModel +from config import * + +# fastapi +app = FastAPI() + + +# 统一返回数据 +class BaseResponse(BaseModel): + code: int = 200 + msg: str = "success" + data: Any = None + + +@app.get("/api/xumu/video") +async def data_query(username): + return BaseResponse(data=get_video_url(username)) + + +# 数据查询接口 +@app.get("/api/xumu/data/query") +async def data_query(deviceId): + try: + if deviceId is None or deviceId == "" or len(deviceId) != 4: + return BaseResponse(code=500, msg="参数错误") + sql = f"select last * from root.farm.{deviceId}" + send_json = { + "sql": sql + } + r = requests.post(baseHost + queryUri, headers=headers, json=send_json) + return BaseResponse(data=r.json()) + except Exception as e: + return BaseResponse(code=500, msg=str(e)) + + +# 原生查询接口 +@app.post("/api/xumu/rest/v2/query") +async def rest_query(request: Request): + data = await request.json() + r = requests.post(baseHost + queryUri, headers=headers, json=data) + return BaseResponse(data=r.json()) + + +# RFID查询接口 +@app.get("/api/xumu/rfid/query") +async def rfid_query(rfid): + try: + sql = f"select deviceId from root.rfid where rfid='{rfid}'" + send_json = { + "sql": sql + } + r = requests.post(baseHost + queryUri, headers=headers, json=send_json) + if r.status_code != 200: + return BaseResponse(data=r.json(), code=404, msg="404") + r = r.json() + values = r["values"] + if len(values) == 0: + return BaseResponse(msg="No Such RFID", code=500) + deviceId = values[0][0] + return await data_query(deviceId) + except Exception as e: + return BaseResponse(code=500, msg=str(e)) + + +async def has_rfid(rfid): + try: + sql = f"select deviceId from root.rfid where rfid='{rfid}'" + send_json = { + "sql": sql + } + r = requests.post(baseHost + queryUri, headers=headers, json=send_json) + if r.status_code != 200: + return BaseResponse(data=r.json(), code=404, msg="404") + r = r.json() + values = r["values"] + if len(values) == 0: + return BaseResponse(msg="No Such RFID", code=500) + return BaseResponse(data=r) + except Exception as e: + return BaseResponse(code=500, msg=str(e)) + + +# 设备查询接口 +@app.get("/api/xumu/device/query") +async def get_device(iccid, deviceId): + try: + sql = "SELECT * FROM root.device where time>=0" + # 检查iccid是否有值,如果有,添加到SQL语句中 + if iccid: + sql += f" and iccid = '{iccid}'" + # 检查deviceId是否有值,如果有,添加到SQL语句中 + if deviceId: + sql += f" and deviceId = '{deviceId}'" + send_json = { + "sql": sql + } + r = requests.post(baseHost + queryUri, headers=headers, json=send_json) + return BaseResponse(data=r.json()) + except Exception as e: + return BaseResponse(code=500, msg=str(e)) + + +# 设备注册接口 +@app.post("/api/xumu/device/register") +async def register(request: Request): + try: + data = await request.body() + data = data.decode("utf-8") + data = json.loads(data) + clientid = data["clientid"] + payload = data["payload"] + receive_len = len(payload) + payload = json.loads(payload) + send_len = payload["l"] + if receive_len != send_len: + print(receive_len) + return BaseResponse(code=301, msg=f"data valid error, receive_len:{receive_len}") + deviceId = payload["d"] + iccid = payload["cid"] + type = payload["t"] + # 该设备是否已经创建 + a_device = await get_device(iccid, deviceId) + if a_device.code == 200: + v = a_device.data["values"] + if len(v) != 0: + return BaseResponse(code=302, msg="该设备已经注册过了") + # 创建该设备 + send_json = { + "devices": ["root.device"], + "timestamps": [int(time.time() * 1000)], + "measurements_list": [["iccid", "deviceId", "type"]], + "data_types_list": [["TEXT", "TEXT", "INT32"]], + "values_list": [[iccid, deviceId, type]], + "is_aligned": False + } + # TODO 测试 + # send_json = { + # "sql": f"insert into root.device(iccid, deviceId, type) values('{iccid}', '{deviceId}', {type})" + # } + res = [] + r = requests.post(baseHost + nonQueryUri, headers=headers, json=send_json) + res.append(r.json()) + # 插入设备状态表 + send_json = { + "sql": f"insert into root.farm.clientId(iccid, clientId, is_online, deviceId) values('{iccid}','{clientid}', False, '{deviceId}')" + } + r = requests.post(baseHost + nonQueryUri, headers=headers, json=send_json) + res.append(r.json()) + # 创建相应的数据库和字段 + sql_list = get_sql(deviceId, type) + for sql in sql_list: + send_json = { + "sql": sql + } + r = requests.post(baseHost + nonQueryUri, headers=headers, json=send_json) + res.append(r.json()) + return BaseResponse(data=res) + except Exception as e: + return BaseResponse(code=500, msg=str(e)) + + +async def message_publish(data): + try: + payload = data["payload"].strip() + receive_len = len(payload) + payload = json.loads(payload) + send_len = payload["l"] + if receive_len != send_len: + print("校验错误,收到的消息长度:", receive_len) + return BaseResponse(code=301, msg=f"data valid error,receive_len:{receive_len}") + m = payload["m"] + v = payload["v"] + t = payload["t"] + deviceId = v[0] + m.append("t") + v.append(t) + res = [] + # 判断t是否为0,0则代表是RFID设备,需要做一个映射 + if t == 0: + rfid = v[1] + r = await has_rfid(rfid) + if r.code != 200: + sql_list = rfid_deviceId(rfid, deviceId) + sql = sql_list[0] + send_json = { + "sql": sql + } + r = requests.post(baseHost + nonQueryUri, headers=headers, json=send_json) + if r.status_code == 200: + r = r.json() + if r["code"] != 200: + return BaseResponse(code=500, msg="RFID创建映射失败", data=r) + res.append(r) + send_json = { + "devices": ["root.farm." + deviceId], + "timestamps": [int(time.time() * 1000)], + "measurements_list": [m], + "data_types_list": [dataTypes[t]], + "values_list": [v], + "is_aligned": False + } + r = requests.post(baseHost + insertUri, headers=headers, json=send_json) + r = r.json() + res.append(r) + return BaseResponse(data=res) + except Exception as e: + return BaseResponse(code=500, msg=str(e)) + + +# 改变设备状态接口 +async def client_change_status(clientid, status): + print(clientid) + try: + sql_query = f"select timestamp from root.farm.clientId where clientId='{clientid}'" + r = requests.post(baseHost + queryUri, headers=headers, json={"sql": sql_query}) + if r.status_code == 200: + r = r.json() + values = r["timestamps"] + if len(values) > 0: + timestamp = values[0] + sql_update_list = get_client_change_status_sql(timestamp, status) + sql_update = sql_update_list[0] + r = requests.post(baseHost + nonQueryUri, headers=headers, json={"sql": sql_update}) + return BaseResponse(data=r.json()) + except Exception as e: + return BaseResponse(code=500, msg=str(e)) + + +@app.post("/api/xumu/data/collect") +async def process_data(request: Request): + try: + data = await request.body() + data = data.decode("utf-8") + data = json.loads(data) + event = data["event"] + clientid = data["clientid"] + # 连接成功处理 + if event == "client.connected": + return await client_change_status(clientid, True) + # 断开连接处理 + elif event == "client.disconnected": + return await client_change_status(clientid, False) + # 消息体处理 + elif event == "message.publish": + return await message_publish(data) + except Exception as e: + return BaseResponse(code=500, msg=str(e)) + + +if __name__ == '__main__': + uvicorn.run(app, host="0.0.0.0", port=8003)