xumu_iotdb/xumu.py

282 lines
9.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import time
import json
from typing import Any
import requests
import uvicorn
from fastapi import FastAPI, Request
from pydantic import BaseModel
from config import *
# fastapi
app = FastAPI()
# 统一返回数据
class BaseResponse(BaseModel):
code: int = 200
msg: str = "success"
data: Any = None
# 监控视频接口
@app.get("/api/xumu/video")
async def data_query(username):
return BaseResponse(data=get_video_url(username))
@app.get("/api/xumu/device/online")
async def device_online_query(iccid, deviceId):
try:
sql = "select * from root.farm.clientId where time >= 0"
# 检查iccid是否有值如果有添加到SQL语句中
if iccid:
sql += f" and iccid = '{iccid}'"
# 检查deviceId是否有值如果有添加到SQL语句中
if deviceId:
sql += f" and deviceId = '{deviceId}'"
send_json = {
"sql": sql
}
r = requests.post(baseHost + queryUri, headers=headers, json=send_json)
return BaseResponse(data=r.json())
except Exception as e:
return BaseResponse(code=500, msg=str(e))
# 数据查询接口
@app.get("/api/xumu/data/query")
async def data_query(deviceId):
try:
if deviceId is None or deviceId == "" or len(deviceId) != 4:
return BaseResponse(code=500, msg="参数错误")
sql = f"select last * from root.farm.{deviceId}"
send_json = {
"sql": sql
}
r = requests.post(baseHost + queryUri, headers=headers, json=send_json)
return BaseResponse(data=r.json())
except Exception as e:
return BaseResponse(code=500, msg=str(e))
# 原生查询接口
@app.post("/api/xumu/rest/v2/query")
async def rest_query(request: Request):
data = await request.json()
r = requests.post(baseHost + queryUri, headers=headers, json=data)
return BaseResponse(data=r.json())
# RFID查询接口
@app.get("/api/xumu/rfid/query")
async def rfid_query(rfid):
try:
sql = f"select deviceId from root.rfid where rfid='{rfid}'"
send_json = {
"sql": sql
}
r = requests.post(baseHost + queryUri, headers=headers, json=send_json)
if r.status_code != 200:
return BaseResponse(data=r.json(), code=404, msg="404")
r = r.json()
values = r["values"]
if len(values) == 0:
return BaseResponse(msg="No Such RFID", code=500)
deviceId = values[0][0]
return await data_query(deviceId)
except Exception as e:
return BaseResponse(code=500, msg=str(e))
async def has_rfid(rfid):
try:
sql = f"select deviceId from root.rfid where rfid='{rfid}'"
send_json = {
"sql": sql
}
r = requests.post(baseHost + queryUri, headers=headers, json=send_json)
if r.status_code != 200:
return BaseResponse(data=r.json(), code=404, msg="404")
r = r.json()
values = r["values"]
if len(values) == 0:
return BaseResponse(msg="No Such RFID", code=500)
return BaseResponse(data=r)
except Exception as e:
return BaseResponse(code=500, msg=str(e))
# 设备查询接口
@app.get("/api/xumu/device/query")
async def get_device(iccid, deviceId):
try:
sql = "SELECT * FROM root.device where time>=0"
# 检查iccid是否有值如果有添加到SQL语句中
if iccid:
sql += f" and iccid = '{iccid}'"
# 检查deviceId是否有值如果有添加到SQL语句中
if deviceId:
sql += f" and deviceId = '{deviceId}'"
send_json = {
"sql": sql
}
r = requests.post(baseHost + queryUri, headers=headers, json=send_json)
return BaseResponse(data=r.json())
except Exception as e:
return BaseResponse(code=500, msg=str(e))
# 设备注册接口
@app.post("/api/xumu/device/register")
async def register(request: Request):
try:
data = await request.body()
data = data.decode("utf-8")
data = json.loads(data)
clientid = data["clientid"]
payload = data["payload"]
receive_len = len(payload)
payload = json.loads(payload)
send_len = payload["l"]
if receive_len != send_len:
print(receive_len)
return BaseResponse(code=301, msg=f"data valid error, receive_len:{receive_len}")
deviceId = payload["d"]
iccid = payload["cid"]
type = payload["t"]
# 该设备是否已经创建
a_device = await get_device(iccid, deviceId)
if a_device.code == 200:
v = a_device.data["values"]
if len(v) != 0:
return BaseResponse(code=302, msg="该设备已经注册过了")
# 创建该设备
send_json = {
"devices": ["root.device"],
"timestamps": [int(time.time() * 1000)],
"measurements_list": [["iccid", "deviceId", "type"]],
"data_types_list": [["TEXT", "TEXT", "INT32"]],
"values_list": [[iccid, deviceId, type]],
"is_aligned": False
}
# TODO 测试
# send_json = {
# "sql": f"insert into root.device(iccid, deviceId, type) values('{iccid}', '{deviceId}', {type})"
# }
res = []
r = requests.post(baseHost + nonQueryUri, headers=headers, json=send_json)
res.append(r.json())
# 插入设备状态表
send_json = {
"sql": f"insert into root.farm.clientId(iccid, clientId, is_online, deviceId) values('{iccid}','{clientid}', False, '{deviceId}')"
}
r = requests.post(baseHost + nonQueryUri, headers=headers, json=send_json)
res.append(r.json())
# 创建相应的数据库和字段
sql_list = get_sql(deviceId, type)
for sql in sql_list:
send_json = {
"sql": sql
}
r = requests.post(baseHost + nonQueryUri, headers=headers, json=send_json)
res.append(r.json())
return BaseResponse(data=res)
except Exception as e:
return BaseResponse(code=500, msg=str(e))
async def message_publish(data):
try:
payload = data["payload"].strip()
receive_len = len(payload)
payload = json.loads(payload)
send_len = payload["l"]
if receive_len != send_len:
print("校验错误,收到的消息长度:", receive_len)
return BaseResponse(code=301, msg=f"data valid error,receive_len:{receive_len}")
m = payload["m"]
v = payload["v"]
t = payload["t"]
deviceId = v[0]
m.append("t")
v.append(t)
res = []
# 判断t是否为00则代表是RFID设备需要做一个映射
if t == 0:
rfid = v[1]
r = await has_rfid(rfid)
if r.code != 200:
sql_list = rfid_deviceId(rfid, deviceId)
sql = sql_list[0]
send_json = {
"sql": sql
}
r = requests.post(baseHost + nonQueryUri, headers=headers, json=send_json)
if r.status_code == 200:
r = r.json()
if r["code"] != 200:
return BaseResponse(code=500, msg="RFID创建映射失败", data=r)
res.append(r)
send_json = {
"devices": ["root.farm." + deviceId],
"timestamps": [int(time.time() * 1000)],
"measurements_list": [m],
"data_types_list": [dataTypes[t]],
"values_list": [v],
"is_aligned": False
}
r = requests.post(baseHost + insertUri, headers=headers, json=send_json)
r = r.json()
res.append(r)
return BaseResponse(data=res)
except Exception as e:
return BaseResponse(code=500, msg=str(e))
# 改变设备状态接口
async def client_change_status(clientid, status):
print(clientid)
try:
sql_query = f"select timestamp from root.farm.clientId where clientId='{clientid}'"
r = requests.post(baseHost + queryUri, headers=headers, json={"sql": sql_query})
if r.status_code == 200:
r = r.json()
values = r["timestamps"]
if len(values) > 0:
timestamp = values[0]
sql_update_list = get_client_change_status_sql(timestamp, status)
sql_update = sql_update_list[0]
r = requests.post(baseHost + nonQueryUri, headers=headers, json={"sql": sql_update})
return BaseResponse(data=r.json())
except Exception as e:
return BaseResponse(code=500, msg=str(e))
@app.post("/api/xumu/data/collect")
async def process_data(request: Request):
try:
data = await request.body()
data = data.decode("utf-8")
data = json.loads(data)
event = data["event"]
clientid = data["clientid"]
print(clientid)
if not clientid.startswith("farm") or not clientid.startswith("test"):
return BaseResponse(code=500, msg="不符合畜牧clientId")
# 连接成功处理
if event == "client.connected":
return await client_change_status(clientid, True)
# 断开连接处理
elif event == "client.disconnected":
return await client_change_status(clientid, False)
# 消息体处理
elif event == "message.publish":
return await message_publish(data)
except Exception as e:
return BaseResponse(code=500, msg=str(e))
if __name__ == '__main__':
uvicorn.run(app, host="0.0.0.0", port=8002)