diff --git a/xumu.py b/xumu.py index 41d7a63..be43fd2 100644 --- a/xumu.py +++ b/xumu.py @@ -166,15 +166,6 @@ async def register(request: Request): if len(v) != 0: return BaseResponse(code=302, msg="该设备已经注册过了") # 创建该设备 - # send_json = { - # "devices": ["root.device"], - # "timestamps": [int(time.time() * 1000)], - # "measurements_list": [["iccid", "deviceId", "type"]], - # "data_types_list": [["TEXT", "TEXT", "INT32"]], - # "values_list": [[iccid, deviceId, type]], - # "is_aligned": False - # } - # 测试 send_json = { "sql": f"insert into root.device(iccid, deviceId, type) values('{iccid}', '{deviceId}', {type})" } diff --git a/xumu_plus.py b/xumu_plus.py new file mode 100644 index 0000000..15538b4 --- /dev/null +++ b/xumu_plus.py @@ -0,0 +1,286 @@ +import re +import time +import json +from typing import Any + +import requests +import uvicorn +from fastapi import FastAPI, Request +from pydantic import BaseModel +from config import * + +# fastapi +app = FastAPI() + + +# 统一返回数据 +class BaseResponse(BaseModel): + code: int = 200 + msg: str = "success" + data: Any = None + + +# 监控视频接口 +@app.get("/api/xumu/video") +async def video_query(username): + return BaseResponse(data=get_video_url(username)) + + +@app.get("/api/xumu/device/online") +async def device_online_query(iccid, deviceId): + try: + sql = "select * from root.farm.clientId where time >= 0" + # 检查iccid是否有值,如果有,添加到SQL语句中 + if iccid: + sql += f" and iccid = '{iccid}'" + # 检查deviceId是否有值,如果有,添加到SQL语句中 + if deviceId: + sql += f" and deviceId = '{deviceId}'" + send_json = { + "sql": sql + } + r = requests.post(baseHost + queryUri, headers=headers, json=send_json) + return BaseResponse(data=r.json()) + except Exception as e: + return BaseResponse(code=500, msg=str(e)) + + +# 数据查询接口 +@app.get("/api/xumu/data/query") +async def data_query(deviceId): + try: + if deviceId is None or deviceId == "" or len(deviceId) != 4: + return BaseResponse(code=500, msg="参数错误") + sql = f"select last * from root.farm.{deviceId}" + send_json = { + "sql": sql + } + r = requests.post(baseHost + queryUri, headers=headers, json=send_json) + return BaseResponse(data=r.json()) + except Exception as e: + return BaseResponse(code=500, msg=str(e)) + + +async def rfid_last_query(rfid, deviceId): + try: + if deviceId is None or deviceId == "" or len(deviceId) != 4: + return BaseResponse(code=500, msg="参数错误") + sql = f"select * from root.farm.{deviceId} where RFID='{rfid}' order by time desc limit 1" + send_json = { + "sql": sql + } + r = requests.post(baseHost + queryUri, headers=headers, json=send_json) + return BaseResponse(data=r.json()) + except Exception as e: + return BaseResponse(code=500, msg=str(e)) + + +# 原生查询接口 +@app.post("/api/xumu/rest/v2/query") +async def rest_query(request: Request): + data = await request.json() + r = requests.post(baseHost + queryUri, headers=headers, json=data) + return BaseResponse(data=r.json()) + + +# RFID查询接口 +@app.get("/api/xumu/rfid/query") +async def rfid_query(rfid): + try: + sql = f"select deviceId from root.rfid where rfid='{rfid}'" + send_json = { + "sql": sql + } + r = requests.post(baseHost + queryUri, headers=headers, json=send_json) + if r.status_code != 200: + return BaseResponse(data=r.json(), code=404, msg="404") + r = r.json() + values = r["values"] + if len(values) == 0: + return BaseResponse(msg="No Such RFID", code=500) + deviceId = values[0][0] + return await rfid_last_query(rfid, deviceId) + except Exception as e: + return BaseResponse(code=500, msg=str(e)) + + +async def has_rfid(rfid): + try: + sql = f"select deviceId from root.rfid where rfid='{rfid}'" + send_json = { + "sql": sql + } + r = requests.post(baseHost + queryUri, headers=headers, json=send_json) + if r.status_code != 200: + return BaseResponse(data=r.json(), code=404, msg="404") + r = r.json() + values = r["values"] + if len(values) == 0: + return BaseResponse(msg="No Such RFID", code=500) + return BaseResponse(data=r) + except Exception as e: + return BaseResponse(code=500, msg=str(e)) + + +# 设备查询接口 +@app.get("/api/xumu/device/query") +async def get_device(iccid, deviceId): + try: + sql = "SELECT * FROM root.device where time>=0" + # 检查iccid是否有值,如果有,添加到SQL语句中 + if iccid: + sql += f" and iccid = '{iccid}'" + # 检查deviceId是否有值,如果有,添加到SQL语句中 + if deviceId: + sql += f" and deviceId = '{deviceId}'" + send_json = { + "sql": sql + } + r = requests.post(baseHost + queryUri, headers=headers, json=send_json) + return BaseResponse(data=r.json()) + except Exception as e: + return BaseResponse(code=500, msg=str(e)) + + +# 设备注册接口 +@app.post("/api/xumu/device/register") +async def register(request: Request): + try: + data = await request.body() + data = data.decode("utf-8") + data = json.loads(data) + clientid = data["clientid"] + payload = data["payload"] + receive_len = len(payload) + payload = json.loads(payload) + send_len = payload["l"] + if receive_len != send_len: + return BaseResponse(code=301, msg=f"data valid error, receive_len:{receive_len}") + deviceId = payload["d"] + iccid = payload["cid"] + type = payload["t"] + # 该设备是否已经创建 + a_device = await get_device(iccid, deviceId) + if a_device.code == 200: + v = a_device.data["values"] + if len(v) != 0: + return BaseResponse(code=302, msg="该设备已经注册过了") + # 注册 + send_json = { + "devices": ["root.device", "root.farm.clientId"], + "timestamps": [int(time.time() * 1000), int(time.time() * 1000)], + "measurements_list": [["iccid", "deviceId", "type"], ["iccid", "clientId", "is_online", "deviceId"]], + "data_types_list": [["TEXT", "TEXT", "INT32"], ["TEXT", "TEXT", "BOOLEAN", "TEXT"]], + "values_list": [[iccid, deviceId, type], [iccid, clientid, True, deviceId]], + "is_aligned": False + } + r = requests.post(baseHost + insertUri, headers=headers, json=send_json) + res = [r.json()] + # 创建相应的数据库和字段 + sql_list = get_sql(deviceId, type) + for sql in sql_list: + send_json = { + "sql": sql + } + r = requests.post(baseHost + nonQueryUri, headers=headers, json=send_json) + res.append(r.json()) + return BaseResponse(data=res) + except Exception as e: + return BaseResponse(code=500, msg=str(e)) + + +async def message_publish(data): + try: + payload = data["payload"].strip() + receive_len = len(payload) + payload = json.loads(payload) + send_len = payload["l"] + if receive_len != send_len: + return BaseResponse(code=301, msg=f"data valid error,receive_len:{receive_len}") + m = payload["m"] + v = payload["v"] + t = payload["t"] + deviceId = v[0] + m.append("t") + v.append(t) + res = [] + # 判断t是否为0,0则代表是RFID设备,需要做一个映射 + if t == 0: + rfid = v[1] + r = await has_rfid(rfid) + if r.code != 200: + sql_list = rfid_deviceId(rfid, deviceId) + sql = sql_list[0] + send_json = { + "sql": sql + } + r = requests.post(baseHost + nonQueryUri, headers=headers, json=send_json) + if r.status_code == 200: + r = r.json() + if r["code"] != 200: + return BaseResponse(code=500, msg="RFID创建映射失败", data=r) + res.append(r) + send_json = { + "devices": ["root.farm." + deviceId], + "timestamps": [int(time.time() * 1000)], + "measurements_list": [m], + "data_types_list": [dataTypes[t]], + "values_list": [v], + "is_aligned": False + } + r = requests.post(baseHost + insertUri, headers=headers, json=send_json) + r = r.json() + res.append(r) + return BaseResponse(data=res) + except Exception as e: + return BaseResponse(code=500, msg=str(e)) + + +# 改变设备状态接口 +async def client_change_status(clientid, status): + try: + sql_query = f"select timestamp from root.farm.clientId where clientId='{clientid}'" + r = requests.post(baseHost + queryUri, headers=headers, json={"sql": sql_query}) + if r.status_code == 200: + r = r.json() + values = r["timestamps"] + if len(values) > 0: + timestamp = values[0] + sql_update_list = get_client_change_status_sql(timestamp, status) + sql_update = sql_update_list[0] + r = requests.post(baseHost + nonQueryUri, headers=headers, json={"sql": sql_update}) + return BaseResponse(data=r.json()) + except Exception as e: + return BaseResponse(code=500, msg=str(e)) + + +@app.post("/api/xumu/data/collect") +async def process_data(request: Request): + try: + data = await request.body() + data = data.decode("utf-8") + data = json.loads(data) + event = data["event"] + clientid = data["clientid"] + # print(clientid + "======" + event) + pattern = r'^f\d{1,10}$' + amatch1 = re.match(pattern, clientid) + # 如果两个都不满足,那就错误 + if not amatch1: + return BaseResponse(code=200, msg="error") + # 连接成功处理 + if event == "client.connected": + return await client_change_status(clientid, True) + # 断开连接处理 + elif event == "client.disconnected": + return await client_change_status(clientid, False) + # 消息体处理 + elif event == "message.publish": + return await message_publish(data) + return BaseResponse() + except Exception as e: + return BaseResponse(code=500, msg=str(e)) + + +if __name__ == '__main__': + uvicorn.run(app, host="0.0.0.0", port=8002)