diff --git a/xumu.py b/xumu.py index 0045958..7340199 100644 --- a/xumu.py +++ b/xumu.py @@ -114,6 +114,7 @@ async def register(request: Request): data = await request.body() data = data.decode("utf-8") data = json.loads(data) + clientid = data["clientid"] payload = data["payload"] receive_len = len(payload) payload = json.loads(payload) @@ -131,8 +132,6 @@ async def register(request: Request): if len(v) != 0: return BaseResponse(code=302, msg="该设备已经注册过了") # 创建该设备 - is_online = False - send_json = { "devices": ["root.device"], "timestamps": [int(time.time() * 1000)], @@ -141,9 +140,18 @@ async def register(request: Request): "values_list": [[iccid, deviceId, type]], "is_aligned": False } - + # TODO 测试 + # send_json = { + # "sql": f"insert into root.device(iccid, deviceId, type) values('{iccid}', '{deviceId}', {type})" + # } res = [] - r = requests.post(baseHost + insertUri, headers=headers, json=send_json) + r = requests.post(baseHost + nonQueryUri, headers=headers, json=send_json) + res.append(r.json()) + # 插入设备状态表 + send_json = { + "sql": f"insert into root.farm.clientId(iccid, clientId, is_online, deviceId) values('{iccid}','{clientid}', False, '{deviceId}')" + } + r = requests.post(baseHost + nonQueryUri, headers=headers, json=send_json) res.append(r.json()) # 创建相应的数据库和字段 sql_list = get_sql(deviceId, type) @@ -158,19 +166,9 @@ async def register(request: Request): return BaseResponse(code=500, msg=str(e)) -# 数据上传接口 -@app.post("/api/xumu/data/collect") -async def process_data(request: Request): +async def message_publish(data): try: - data = await request.body() - data = data.decode("utf-8") - print(data) - return - data = json.loads(data) - # 连接成功处理 - # 断开连接处理 - # 消息体处理 - payload = data["payload"] + payload = data["payload"].strip() receive_len = len(payload) payload = json.loads(payload) send_len = payload["l"] @@ -216,5 +214,45 @@ async def process_data(request: Request): return BaseResponse(code=500, msg=str(e)) +# 改变设备状态接口 +async def client_change_status(clientid, status): + print(clientid) + try: + sql_query = f"select timestamp from root.farm.clientId where clientId='{clientid}'" + r = requests.post(baseHost + queryUri, headers=headers, json={"sql": sql_query}) + if r.status_code == 200: + r = r.json() + values = r["timestamps"] + if len(values) > 0: + timestamp = values[0] + sql_update_list = get_client_change_status_sql(timestamp, status) + sql_update = sql_update_list[0] + r = requests.post(baseHost + nonQueryUri, headers=headers, json={"sql": sql_update}) + return BaseResponse(data=r.json()) + except Exception as e: + return BaseResponse(code=500, msg=str(e)) + + +@app.post("/api/xumu/data/collect") +async def process_data(request: Request): + try: + data = await request.body() + data = data.decode("utf-8") + data = json.loads(data) + event = data["event"] + clientid = data["clientid"] + # 连接成功处理 + if event == "client.connected": + return await client_change_status(clientid, True) + # 断开连接处理 + elif event == "client.disconnected": + return await client_change_status(clientid, False) + # 消息体处理 + elif event == "message.publish": + return await message_publish(data) + except Exception as e: + return BaseResponse(code=500, msg=str(e)) + + if __name__ == '__main__': uvicorn.run(app, host="0.0.0.0", port=8002) diff --git a/xumu_ori.py b/xumu_ori.py new file mode 100644 index 0000000..2544052 --- /dev/null +++ b/xumu_ori.py @@ -0,0 +1,220 @@ +import time +import json +from typing import Any + +import requests +import uvicorn +from fastapi import FastAPI, Request +from pydantic import BaseModel +from config import * + +# fastapi +app = FastAPI() + + +# 统一返回数据 +class BaseResponse(BaseModel): + code: int = 200 + msg: str = "success" + data: Any = None + + +@app.get("/api/xumu/video") +async def data_query(username): + return BaseResponse(data=get_video_url(username)) + + +# 数据查询接口 +@app.get("/api/xumu/data/query") +async def data_query(deviceId): + try: + if deviceId is None or deviceId == "" or len(deviceId) != 4: + return BaseResponse(code=500, msg="参数错误") + sql = f"select last * from root.farm.{deviceId}" + send_json = { + "sql": sql + } + r = requests.post(baseHost + queryUri, headers=headers, json=send_json) + return BaseResponse(data=r.json()) + except Exception as e: + return BaseResponse(code=500, msg=str(e)) + + +# 原生查询接口 +@app.post("/api/xumu/rest/v2/query") +async def rest_query(request: Request): + data = await request.json() + r = requests.post(baseHost + queryUri, headers=headers, json=data) + return BaseResponse(data=r.json()) + + +# RFID查询接口 +@app.get("/api/xumu/rfid/query") +async def rfid_query(rfid): + try: + sql = f"select deviceId from root.rfid where rfid='{rfid}'" + send_json = { + "sql": sql + } + r = requests.post(baseHost + queryUri, headers=headers, json=send_json) + if r.status_code != 200: + return BaseResponse(data=r.json(), code=404, msg="404") + r = r.json() + values = r["values"] + if len(values) == 0: + return BaseResponse(msg="No Such RFID", code=500) + deviceId = values[0][0] + return await data_query(deviceId) + except Exception as e: + return BaseResponse(code=500, msg=str(e)) + + +async def has_rfid(rfid): + try: + sql = f"select deviceId from root.rfid where rfid='{rfid}'" + send_json = { + "sql": sql + } + r = requests.post(baseHost + queryUri, headers=headers, json=send_json) + if r.status_code != 200: + return BaseResponse(data=r.json(), code=404, msg="404") + r = r.json() + values = r["values"] + if len(values) == 0: + return BaseResponse(msg="No Such RFID", code=500) + return BaseResponse(data=r) + except Exception as e: + return BaseResponse(code=500, msg=str(e)) + + +# 设备查询接口 +@app.get("/api/xumu/device/query") +async def get_device(iccid, deviceId): + try: + sql = "SELECT * FROM root.device where time>=0" + # 检查iccid是否有值,如果有,添加到SQL语句中 + if iccid: + sql += f" and iccid = '{iccid}'" + # 检查deviceId是否有值,如果有,添加到SQL语句中 + if deviceId: + sql += f" and deviceId = '{deviceId}'" + send_json = { + "sql": sql + } + r = requests.post(baseHost + queryUri, headers=headers, json=send_json) + return BaseResponse(data=r.json()) + except Exception as e: + return BaseResponse(code=500, msg=str(e)) + + +# 设备注册接口 +@app.post("/api/xumu/device/register") +async def register(request: Request): + try: + data = await request.body() + data = data.decode("utf-8") + data = json.loads(data) + payload = data["payload"] + receive_len = len(payload) + payload = json.loads(payload) + send_len = payload["l"] + if receive_len != send_len: + print(receive_len) + return BaseResponse(code=301, msg=f"data valid error, receive_len:{receive_len}") + deviceId = payload["d"] + iccid = payload["cid"] + type = payload["t"] + # 该设备是否已经创建 + a_device = await get_device(iccid, deviceId) + if a_device.code == 200: + v = a_device.data["values"] + if len(v) != 0: + return BaseResponse(code=302, msg="该设备已经注册过了") + # 创建该设备 + is_online = False + + send_json = { + "devices": ["root.device"], + "timestamps": [int(time.time() * 1000)], + "measurements_list": [["iccid", "deviceId", "type"]], + "data_types_list": [["TEXT", "TEXT", "INT32"]], + "values_list": [[iccid, deviceId, type]], + "is_aligned": False + } + + res = [] + r = requests.post(baseHost + insertUri, headers=headers, json=send_json) + res.append(r.json()) + # 创建相应的数据库和字段 + sql_list = get_sql(deviceId, type) + for sql in sql_list: + send_json = { + "sql": sql + } + r = requests.post(baseHost + nonQueryUri, headers=headers, json=send_json) + res.append(r.json()) + return BaseResponse(data=res) + except Exception as e: + return BaseResponse(code=500, msg=str(e)) + + +# 数据上传接口 +@app.post("/api/xumu/data/collect") +async def process_data(request: Request): + try: + data = await request.body() + data = data.decode("utf-8") + print(data) + return + data = json.loads(data) + # 连接成功处理 + # 断开连接处理 + # 消息体处理 + payload = data["payload"] + receive_len = len(payload) + payload = json.loads(payload) + send_len = payload["l"] + if receive_len != send_len: + print("校验错误,收到的消息长度:", receive_len) + return BaseResponse(code=301, msg=f"data valid error,receive_len:{receive_len}") + m = payload["m"] + v = payload["v"] + t = payload["t"] + deviceId = v[0] + m.append("t") + v.append(t) + res = [] + # 判断t是否为0,0则代表是RFID设备,需要做一个映射 + if t == 0: + rfid = v[1] + r = await has_rfid(rfid) + if r.code != 200: + sql_list = rfid_deviceId(rfid, deviceId) + sql = sql_list[0] + send_json = { + "sql": sql + } + r = requests.post(baseHost + nonQueryUri, headers=headers, json=send_json) + if r.status_code == 200: + r = r.json() + if r["code"] != 200: + return BaseResponse(code=500, msg="RFID创建映射失败", data=r) + res.append(r) + send_json = { + "devices": ["root.farm." + deviceId], + "timestamps": [int(time.time() * 1000)], + "measurements_list": [m], + "data_types_list": [dataTypes[t]], + "values_list": [v], + "is_aligned": False + } + r = requests.post(baseHost + insertUri, headers=headers, json=send_json) + r = r.json() + res.append(r) + return BaseResponse(data=res) + except Exception as e: + return BaseResponse(code=500, msg=str(e)) + + +if __name__ == '__main__': + uvicorn.run(app, host="0.0.0.0", port=8002)