xumu_iotdb/main.py

156 lines
4.7 KiB
Python
Raw Normal View History

2024-01-24 15:32:58 +08:00
import time
import json
from typing import Any
import requests
import uvicorn
from fastapi import FastAPI, Request
from pydantic import BaseModel
from config import *
# fastapi
app = FastAPI()
# 统一返回数据
class BaseResponse(BaseModel):
code: int = 200
msg: str = "success"
data: Any = None
# 设备注册实体
class Device(BaseModel):
iccid: str = ""
deviceId: str = ""
type: int = None
# 数据查询参数实体
class DataQuery(BaseModel):
deviceId: str
limit: int = 1
# 数据查询接口
@app.post("/api/xumu/data/query")
async def data_query(params: DataQuery):
try:
deviceId = params.deviceId
limit = params.limit
sql = f"select * from root.farm.{deviceId} order by time desc limit {limit}"
send_json = {
"sql": sql
}
r = requests.post(baseHost + queryUri, headers=headers, json=send_json)
return BaseResponse(data=r.json())
except Exception as e:
return BaseResponse(code=500, msg=str(e))
# 原生查询接口
@app.post("/api/xumu/rest/v2/query")
async def rest_query(request: Request):
data = await request.json()
r = requests.post(baseHost + queryUri, headers=headers, json=data)
return BaseResponse(data=r.json())
# 设备查询接口
@app.post("/api/xumu/device/query")
async def get_device(device: Device):
try:
sql = "SELECT * FROM root.device where 1=1"
# 检查iccid是否有值如果有添加到SQL语句中
if device.iccid:
sql += f" and iccid = '{device.iccid}'"
# 检查deviceId是否有值如果有添加到SQL语句中
if device.deviceId:
sql += f" and deviceId = '{device.deviceId}'"
# 检查type是否有值如果有添加到SQL语句中
if device.type is not None:
sql += f" and type = {device.type}"
send_json = {
"sql": sql
}
r = requests.post(baseHost + queryUri, headers=headers, json=send_json)
return BaseResponse(data=r.json())
except Exception as e:
return BaseResponse(code=500, msg=str(e))
# 设备注册接口
@app.post("/api/xumu/device/register")
async def register(request: Request):
try:
data = await request.body()
data = data.decode("utf-8")
data = json.loads(data)
payload = data["payload"]
receive_len = len(payload)
payload = json.loads(payload)
send_len = payload["l"]
if receive_len != send_len:
2024-01-24 17:09:03 +08:00
return BaseResponse(code=301, msg=f"data valid error, receive_len:{receive_len}")
2024-01-24 15:32:58 +08:00
deviceId = payload["d"]
iccid = payload["cid"]
type = payload["t"]
send_json = {
"devices": ["root.device"],
"timestamps": [int(time.time() * 1000)],
"measurements_list": [["iccid", "deviceId", "type"]],
"data_types_list": [["TEXT", "TEXT", "INT32"]],
"values_list": [[iccid, deviceId, type]],
"is_aligned": False
}
2024-01-24 17:09:03 +08:00
res = []
2024-01-24 15:32:58 +08:00
r = requests.post(baseHost + insertUri, headers=headers, json=send_json)
2024-01-24 17:09:03 +08:00
res.append(r.json())
# 创建相应的数据库和字段
sql_list = get_sql(deviceId, type)
for sql in sql_list:
send_json = {
"sql": sql
}
r = requests.post(baseHost + insertUri, headers=headers, json=send_json)
res.append(r.json())
2024-01-24 17:16:08 +08:00
return BaseResponse(data=res)
2024-01-24 15:32:58 +08:00
except Exception as e:
return BaseResponse(code=500, msg=str(e))
# 数据上传接口
@app.post("/api/xumu/data/collect")
async def process_data(request: Request):
2024-01-24 16:15:56 +08:00
try:
data = await request.body()
data = data.decode("utf-8")
data = json.loads(data)
payload = data["payload"]
receive_len = len(payload)
payload = json.loads(payload)
send_len = payload["l"]
if receive_len != send_len:
2024-01-24 16:37:54 +08:00
print("校验错误,收到的消息长度:", receive_len)
return BaseResponse(code=301, msg="data valid error")
2024-01-24 16:15:56 +08:00
m = payload["m"]
v = payload["v"]
cid = v[0]
2024-01-24 16:27:34 +08:00
t = v[-1]
2024-01-24 16:15:56 +08:00
send_json = {
"devices": ["root.farm." + cid],
"timestamps": [int(time.time() * 1000)],
"measurements_list": [m],
"data_types_list": [dataTypes[t]],
"values_list": [v],
"is_aligned": False
}
r = requests.post(baseHost + insertUri, headers=headers, json=send_json)
return BaseResponse(data=r.json())
except Exception as e:
return BaseResponse(code=500, msg=str(e))
2024-01-24 15:32:58 +08:00
if __name__ == '__main__':
uvicorn.run(app, host="0.0.0.0", port=8002)