import time import json from typing import Any import requests import uvicorn from fastapi import FastAPI, Request from pydantic import BaseModel from config import * # fastapi app = FastAPI() # 统一返回数据 class BaseResponse(BaseModel): code: int = 200 msg: str = "success" data: Any = None @app.get("/api/xumu/video") async def data_query(username): return BaseResponse(data=get_video_url(username)) # 数据查询接口 @app.get("/api/xumu/data/query") async def data_query(deviceId): try: if deviceId is None or deviceId == "" or len(deviceId) != 4: return BaseResponse(code=500, msg="参数错误") sql = f"select last * from root.farm.{deviceId}" send_json = { "sql": sql } r = requests.post(baseHost + queryUri, headers=headers, json=send_json) return BaseResponse(data=r.json()) except Exception as e: return BaseResponse(code=500, msg=str(e)) # 原生查询接口 @app.post("/api/xumu/rest/v2/query") async def rest_query(request: Request): data = await request.json() r = requests.post(baseHost + queryUri, headers=headers, json=data) return BaseResponse(data=r.json()) # RFID查询接口 @app.get("/api/xumu/rfid/query") async def rfid_query(rfid): try: sql = f"select deviceId from root.rfid where rfid='{rfid}'" send_json = { "sql": sql } r = requests.post(baseHost + queryUri, headers=headers, json=send_json) if r.status_code != 200: return BaseResponse(data=r.json(), code=404, msg="404") r = r.json() values = r["values"] if len(values) == 0: return BaseResponse(msg="No Such RFID", code=500) deviceId = values[0][0] return await data_query(deviceId) except Exception as e: return BaseResponse(code=500, msg=str(e)) async def has_rfid(rfid): try: sql = f"select deviceId from root.rfid where rfid='{rfid}'" send_json = { "sql": sql } r = requests.post(baseHost + queryUri, headers=headers, json=send_json) if r.status_code != 200: return BaseResponse(data=r.json(), code=404, msg="404") r = r.json() values = r["values"] if len(values) == 0: return BaseResponse(msg="No Such RFID", code=500) return BaseResponse(data=r) except Exception as e: return BaseResponse(code=500, msg=str(e)) # 设备查询接口 @app.get("/api/xumu/device/query") async def get_device(iccid, deviceId): try: sql = "SELECT * FROM root.device where time>=0" # 检查iccid是否有值,如果有,添加到SQL语句中 if iccid: sql += f" and iccid = '{iccid}'" # 检查deviceId是否有值,如果有,添加到SQL语句中 if deviceId: sql += f" and deviceId = '{deviceId}'" send_json = { "sql": sql } r = requests.post(baseHost + queryUri, headers=headers, json=send_json) return BaseResponse(data=r.json()) except Exception as e: return BaseResponse(code=500, msg=str(e)) # 设备注册接口 @app.post("/api/xumu/device/register") async def register(request: Request): try: data = await request.body() data = data.decode("utf-8") data = json.loads(data) payload = data["payload"] receive_len = len(payload) payload = json.loads(payload) send_len = payload["l"] if receive_len != send_len: print(receive_len) return BaseResponse(code=301, msg=f"data valid error, receive_len:{receive_len}") deviceId = payload["d"] iccid = payload["cid"] type = payload["t"] # 该设备是否已经创建 a_device = await get_device(iccid, deviceId) if a_device.code == 200: v = a_device.data["values"] if len(v) != 0: return BaseResponse(code=302, msg="该设备已经注册过了") # 创建该设备 send_json = { "devices": ["root.device"], "timestamps": [int(time.time() * 1000)], "measurements_list": [["iccid", "deviceId", "type"]], "data_types_list": [["TEXT", "TEXT", "INT32"]], "values_list": [[iccid, deviceId, type]], "is_aligned": False } res = [] r = requests.post(baseHost + insertUri, headers=headers, json=send_json) res.append(r.json()) # 创建相应的数据库和字段 sql_list = get_sql(deviceId, type) for sql in sql_list: send_json = { "sql": sql } r = requests.post(baseHost + nonQueryUri, headers=headers, json=send_json) res.append(r.json()) return BaseResponse(data=res) except Exception as e: return BaseResponse(code=500, msg=str(e)) # 数据上传接口 @app.post("/api/xumu/data/collect") async def process_data(request: Request): try: data = await request.body() data = data.decode("utf-8") data = json.loads(data) payload = data["payload"] receive_len = len(payload) payload = json.loads(payload) send_len = payload["l"] if receive_len != send_len: print("校验错误,收到的消息长度:", receive_len) return BaseResponse(code=301, msg=f"data valid error,receive_len:{receive_len}") m = payload["m"] v = payload["v"] t = payload["t"] deviceId = v[0] m.append("t") v.append(t) res = [] # 判断t是否为0,0则代表是RFID设备,需要做一个映射 if t == 0: rfid = v[1] r = await has_rfid(rfid) if r.code != 200: sql_list = rfid_deviceId(rfid, deviceId) sql = sql_list[0] send_json = { "sql": sql } r = requests.post(baseHost + nonQueryUri, headers=headers, json=send_json) if r.status_code == 200: r = r.json() if r["code"] != 200: return BaseResponse(code=500, msg="RFID创建映射失败", data=r) res.append(r) send_json = { "devices": ["root.farm." + deviceId], "timestamps": [int(time.time() * 1000)], "measurements_list": [m], "data_types_list": [dataTypes[t]], "values_list": [v], "is_aligned": False } r = requests.post(baseHost + insertUri, headers=headers, json=send_json) r = r.json() res.append(r) return BaseResponse(data=res) except Exception as e: return BaseResponse(code=500, msg=str(e)) if __name__ == '__main__': uvicorn.run(app, host="0.0.0.0", port=8002)