import time import json from typing import Any import requests import uvicorn from fastapi import FastAPI, Request from pydantic import BaseModel from config import * # fastapi app = FastAPI() # 统一返回数据 class BaseResponse(BaseModel): code: int = 200 msg: str = "success" data: Any = None # 设备注册实体 class Device(BaseModel): iccid: str = "" deviceId: str type: int = None # 数据查询参数实体 class DataQuery(BaseModel): deviceId: str # 数据查询接口 @app.post("/api/xumu/data/query") async def data_query(params: DataQuery): try: deviceId = params.deviceId sql = f"select last * from root.farm.{deviceId}" send_json = { "sql": sql } r = requests.post(baseHost + queryUri, headers=headers, json=send_json) return BaseResponse(data=r.json()) except Exception as e: return BaseResponse(code=500, msg=str(e)) # 原生查询接口 @app.post("/api/xumu/rest/v2/query") async def rest_query(request: Request): data = await request.json() r = requests.post(baseHost + queryUri, headers=headers, json=data) return BaseResponse(data=r.json()) # 设备查询接口 @app.post("/api/xumu/device/query") async def get_device(device: Device): try: sql = "SELECT * FROM root.device where 1=1" # 检查iccid是否有值,如果有,添加到SQL语句中 if device.iccid: sql += f" and iccid = '{device.iccid}'" # 检查deviceId是否有值,如果有,添加到SQL语句中 if device.deviceId: sql += f" and deviceId = '{device.deviceId}'" # 检查type是否有值,如果有,添加到SQL语句中 if device.type is not None: sql += f" and type = {device.type}" send_json = { "sql": sql } r = requests.post(baseHost + queryUri, headers=headers, json=send_json) return BaseResponse(data=r.json()) except Exception as e: return BaseResponse(code=500, msg=str(e)) # 设备注册接口 @app.post("/api/xumu/device/register") async def register(request: Request): try: data = await request.body() data = data.decode("utf-8") data = json.loads(data) payload = data["payload"] receive_len = len(payload) payload = json.loads(payload) send_len = payload["l"] if receive_len != send_len: return BaseResponse(code=301, msg=f"data valid error, receive_len:{receive_len}") deviceId = payload["d"] iccid = payload["cid"] type = payload["t"] # TODO 检测该设备是否已经创建 # 创建该设备 send_json = { "devices": ["root.device"], "timestamps": [int(time.time() * 1000)], "measurements_list": [["iccid", "deviceId", "type"]], "data_types_list": [["TEXT", "TEXT", "INT32"]], "values_list": [[iccid, deviceId, type]], "is_aligned": False } res = [] r = requests.post(baseHost + insertUri, headers=headers, json=send_json) res.append(r.json()) # 创建相应的数据库和字段 sql_list = get_sql(deviceId, type) for sql in sql_list: send_json = { "sql": sql } r = requests.post(baseHost + insertUri, headers=headers, json=send_json) res.append(r.json()) return BaseResponse(data=res) except Exception as e: return BaseResponse(code=500, msg=str(e)) # 数据上传接口 @app.post("/api/xumu/data/collect") async def process_data(request: Request): try: data = await request.body() data = data.decode("utf-8") data = json.loads(data) payload = data["payload"] receive_len = len(payload) payload = json.loads(payload) send_len = payload["l"] if receive_len != send_len: print("校验错误,收到的消息长度:", receive_len) return BaseResponse(code=301, msg="data valid error") m = payload["m"] v = payload["v"] cid = v[0] t = v[-1] send_json = { "devices": ["root.farm." + cid], "timestamps": [int(time.time() * 1000)], "measurements_list": [m], "data_types_list": [dataTypes[t]], "values_list": [v], "is_aligned": False } r = requests.post(baseHost + insertUri, headers=headers, json=send_json) return BaseResponse(data=r.json()) except Exception as e: return BaseResponse(code=500, msg=str(e)) if __name__ == '__main__': uvicorn.run(app, host="0.0.0.0", port=8002)