import time import json from typing import Any import requests import uvicorn from fastapi import FastAPI, Request from pydantic import BaseModel from config import * # fastapi app = FastAPI() # 统一返回数据 class BaseResponse(BaseModel): code: int = 200 msg: str = "success" data: Any = None # 设备注册实体 class Device(BaseModel): iccid: str = "" deviceId: str = "" type: int = None # 数据查询参数实体 class DataQuery(BaseModel): deviceId: str limit: int = 1 # 数据查询接口 @app.post("/api/xumu/data/query") async def data_query(params: DataQuery): try: deviceId = params.deviceId limit = params.limit sql = f"select * from root.farm.{deviceId} order by time desc limit {limit}" send_json = { "sql": sql } r = requests.post(baseHost + queryUri, headers=headers, json=send_json) return BaseResponse(data=r.json()) except Exception as e: return BaseResponse(code=500, msg=str(e)) # 原生查询接口 @app.post("/api/xumu/rest/v2/query") async def rest_query(request: Request): data = await request.json() r = requests.post(baseHost + queryUri, headers=headers, json=data) return BaseResponse(data=r.json()) # 设备查询接口 @app.post("/api/xumu/device/query") async def get_device(device: Device): try: sql = "SELECT * FROM root.device where 1=1" # 检查iccid是否有值,如果有,添加到SQL语句中 if device.iccid: sql += f" and iccid = '{device.iccid}'" # 检查deviceId是否有值,如果有,添加到SQL语句中 if device.deviceId: sql += f" and deviceId = '{device.deviceId}'" # 检查type是否有值,如果有,添加到SQL语句中 if device.type is not None: sql += f" and type = {device.type}" send_json = { "sql": sql } r = requests.post(baseHost + queryUri, headers=headers, json=send_json) return BaseResponse(data=r.json()) except Exception as e: return BaseResponse(code=500, msg=str(e)) # 设备注册接口 @app.post("/api/xumu/device/register") async def register(request: Request): try: data = await request.body() data = data.decode("utf-8") data = json.loads(data) payload = data["payload"] receive_len = len(payload) payload = json.loads(payload) send_len = payload["l"] if receive_len != send_len: return BaseResponse(code=301, msg="data valid error") deviceId = payload["d"] iccid = payload["cid"] type = payload["t"] send_json = { "devices": ["root.device"], "timestamps": [int(time.time() * 1000)], "measurements_list": [["iccid", "deviceId", "type"]], "data_types_list": [["TEXT", "TEXT", "INT32"]], "values_list": [[iccid, deviceId, type]], "is_aligned": False } r = requests.post(baseHost + insertUri, headers=headers, json=send_json) return BaseResponse(data=r.json()) except Exception as e: return BaseResponse(code=500, msg=str(e)) # 数据上传接口 @app.post("/api/xumu/data/collect") async def process_data(request: Request): # TODO mqtt检测 data = await request.body() data = data.decode("utf-8") data = json.loads(data) event = data["event"] print(data) if event == "client.connected": print("已连接") print(data["connected_at"]) elif event == "client.disconnected": print("已断开") print(data["disconnected_at"]) elif event == "message.publish": print("消息接收") print(data["timestamp"]) return 200 # try: # data = await request.body() # data = data.decode("utf-8") # data = json.loads(data) # payload = data["payload"] # receive_len = len(payload) # payload = json.loads(payload) # send_len = payload["l"] # if receive_len != send_len: # return 301 # m = payload["m"] # v = payload["v"] # t = payload["t"] # cid = v[0] # send_json = { # "devices": ["root.farm." + cid], # "timestamps": [int(time.time() * 1000)], # "measurements_list": [m], # "data_types_list": [dataTypes[t]], # "values_list": [v], # "is_aligned": False # } # r = requests.post(baseHost + insertUri, headers=headers, json=send_json) # return BaseResponse(data=r.json()) # except Exception as e: # return BaseResponse(code=500, msg=str(e)) if __name__ == '__main__': uvicorn.run(app, host="0.0.0.0", port=8002)