2024-01-24 15:32:58 +08:00
|
|
|
|
import time
|
|
|
|
|
import json
|
|
|
|
|
from typing import Any
|
|
|
|
|
|
|
|
|
|
import requests
|
|
|
|
|
import uvicorn
|
|
|
|
|
from fastapi import FastAPI, Request
|
|
|
|
|
from pydantic import BaseModel
|
|
|
|
|
from config import *
|
|
|
|
|
|
|
|
|
|
# fastapi
|
|
|
|
|
app = FastAPI()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# 统一返回数据
|
|
|
|
|
class BaseResponse(BaseModel):
|
|
|
|
|
code: int = 200
|
|
|
|
|
msg: str = "success"
|
|
|
|
|
data: Any = None
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# 设备注册实体
|
|
|
|
|
class Device(BaseModel):
|
|
|
|
|
iccid: str = ""
|
2024-01-24 17:29:46 +08:00
|
|
|
|
deviceId: str
|
2024-01-24 15:32:58 +08:00
|
|
|
|
type: int = None
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# 数据查询参数实体
|
|
|
|
|
class DataQuery(BaseModel):
|
|
|
|
|
deviceId: str
|
|
|
|
|
|
2024-01-25 09:23:01 +08:00
|
|
|
|
|
2024-01-24 15:32:58 +08:00
|
|
|
|
# 数据查询接口
|
|
|
|
|
@app.post("/api/xumu/data/query")
|
|
|
|
|
async def data_query(params: DataQuery):
|
|
|
|
|
try:
|
|
|
|
|
deviceId = params.deviceId
|
2024-01-24 17:29:46 +08:00
|
|
|
|
sql = f"select last * from root.farm.{deviceId}"
|
2024-01-24 15:32:58 +08:00
|
|
|
|
send_json = {
|
|
|
|
|
"sql": sql
|
|
|
|
|
}
|
|
|
|
|
r = requests.post(baseHost + queryUri, headers=headers, json=send_json)
|
|
|
|
|
return BaseResponse(data=r.json())
|
|
|
|
|
except Exception as e:
|
|
|
|
|
return BaseResponse(code=500, msg=str(e))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# 原生查询接口
|
|
|
|
|
@app.post("/api/xumu/rest/v2/query")
|
|
|
|
|
async def rest_query(request: Request):
|
|
|
|
|
data = await request.json()
|
|
|
|
|
r = requests.post(baseHost + queryUri, headers=headers, json=data)
|
|
|
|
|
return BaseResponse(data=r.json())
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# 设备查询接口
|
|
|
|
|
@app.post("/api/xumu/device/query")
|
|
|
|
|
async def get_device(device: Device):
|
|
|
|
|
try:
|
2024-01-25 09:42:51 +08:00
|
|
|
|
sql = "SELECT * FROM root.device where time>=0"
|
2024-01-24 15:32:58 +08:00
|
|
|
|
# 检查iccid是否有值,如果有,添加到SQL语句中
|
|
|
|
|
if device.iccid:
|
|
|
|
|
sql += f" and iccid = '{device.iccid}'"
|
|
|
|
|
# 检查deviceId是否有值,如果有,添加到SQL语句中
|
|
|
|
|
if device.deviceId:
|
|
|
|
|
sql += f" and deviceId = '{device.deviceId}'"
|
|
|
|
|
# 检查type是否有值,如果有,添加到SQL语句中
|
|
|
|
|
if device.type is not None:
|
|
|
|
|
sql += f" and type = {device.type}"
|
|
|
|
|
send_json = {
|
|
|
|
|
"sql": sql
|
|
|
|
|
}
|
|
|
|
|
r = requests.post(baseHost + queryUri, headers=headers, json=send_json)
|
|
|
|
|
return BaseResponse(data=r.json())
|
|
|
|
|
except Exception as e:
|
|
|
|
|
return BaseResponse(code=500, msg=str(e))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# 设备注册接口
|
|
|
|
|
@app.post("/api/xumu/device/register")
|
|
|
|
|
async def register(request: Request):
|
|
|
|
|
try:
|
|
|
|
|
data = await request.body()
|
|
|
|
|
data = data.decode("utf-8")
|
|
|
|
|
data = json.loads(data)
|
|
|
|
|
payload = data["payload"]
|
|
|
|
|
receive_len = len(payload)
|
|
|
|
|
payload = json.loads(payload)
|
|
|
|
|
send_len = payload["l"]
|
|
|
|
|
if receive_len != send_len:
|
2024-01-24 17:09:03 +08:00
|
|
|
|
return BaseResponse(code=301, msg=f"data valid error, receive_len:{receive_len}")
|
2024-01-24 15:32:58 +08:00
|
|
|
|
deviceId = payload["d"]
|
|
|
|
|
iccid = payload["cid"]
|
|
|
|
|
type = payload["t"]
|
2024-01-25 09:42:51 +08:00
|
|
|
|
# 该设备是否已经创建
|
2024-01-25 09:23:01 +08:00
|
|
|
|
device = Device(iccid=iccid, deviceId=deviceId, type=type)
|
|
|
|
|
a_device = await get_device(device)
|
|
|
|
|
if a_device.code == 200:
|
|
|
|
|
v = a_device.data["values"]
|
|
|
|
|
if len(v) != 0:
|
|
|
|
|
return BaseResponse(code=302, msg="该设备已经注册过了")
|
2024-01-24 17:29:46 +08:00
|
|
|
|
# 创建该设备
|
2024-01-24 15:32:58 +08:00
|
|
|
|
send_json = {
|
|
|
|
|
"devices": ["root.device"],
|
|
|
|
|
"timestamps": [int(time.time() * 1000)],
|
|
|
|
|
"measurements_list": [["iccid", "deviceId", "type"]],
|
|
|
|
|
"data_types_list": [["TEXT", "TEXT", "INT32"]],
|
|
|
|
|
"values_list": [[iccid, deviceId, type]],
|
|
|
|
|
"is_aligned": False
|
|
|
|
|
}
|
2024-01-24 17:09:03 +08:00
|
|
|
|
res = []
|
2024-01-24 15:32:58 +08:00
|
|
|
|
r = requests.post(baseHost + insertUri, headers=headers, json=send_json)
|
2024-01-24 17:09:03 +08:00
|
|
|
|
res.append(r.json())
|
|
|
|
|
# 创建相应的数据库和字段
|
|
|
|
|
sql_list = get_sql(deviceId, type)
|
|
|
|
|
for sql in sql_list:
|
|
|
|
|
send_json = {
|
|
|
|
|
"sql": sql
|
|
|
|
|
}
|
2024-01-25 09:42:51 +08:00
|
|
|
|
r = requests.post(baseHost + nonQueryUri, headers=headers, json=send_json)
|
2024-01-24 17:09:03 +08:00
|
|
|
|
res.append(r.json())
|
2024-01-24 17:16:08 +08:00
|
|
|
|
return BaseResponse(data=res)
|
2024-01-24 15:32:58 +08:00
|
|
|
|
except Exception as e:
|
|
|
|
|
return BaseResponse(code=500, msg=str(e))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# 数据上传接口
|
|
|
|
|
@app.post("/api/xumu/data/collect")
|
|
|
|
|
async def process_data(request: Request):
|
2024-01-24 16:15:56 +08:00
|
|
|
|
try:
|
|
|
|
|
data = await request.body()
|
|
|
|
|
data = data.decode("utf-8")
|
|
|
|
|
data = json.loads(data)
|
|
|
|
|
payload = data["payload"]
|
|
|
|
|
receive_len = len(payload)
|
|
|
|
|
payload = json.loads(payload)
|
|
|
|
|
send_len = payload["l"]
|
|
|
|
|
if receive_len != send_len:
|
2024-01-24 16:37:54 +08:00
|
|
|
|
print("校验错误,收到的消息长度:", receive_len)
|
|
|
|
|
return BaseResponse(code=301, msg="data valid error")
|
2024-01-24 16:15:56 +08:00
|
|
|
|
m = payload["m"]
|
|
|
|
|
v = payload["v"]
|
|
|
|
|
cid = v[0]
|
2024-01-24 16:27:34 +08:00
|
|
|
|
t = v[-1]
|
2024-01-24 16:15:56 +08:00
|
|
|
|
send_json = {
|
|
|
|
|
"devices": ["root.farm." + cid],
|
|
|
|
|
"timestamps": [int(time.time() * 1000)],
|
|
|
|
|
"measurements_list": [m],
|
|
|
|
|
"data_types_list": [dataTypes[t]],
|
|
|
|
|
"values_list": [v],
|
|
|
|
|
"is_aligned": False
|
|
|
|
|
}
|
|
|
|
|
r = requests.post(baseHost + insertUri, headers=headers, json=send_json)
|
|
|
|
|
return BaseResponse(data=r.json())
|
|
|
|
|
except Exception as e:
|
|
|
|
|
return BaseResponse(code=500, msg=str(e))
|
2024-01-24 15:32:58 +08:00
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
|
|
uvicorn.run(app, host="0.0.0.0", port=8002)
|