xumu_iotdb/xumu.py

285 lines
9.6 KiB
Python
Raw Normal View History

2024-01-26 17:22:01 +08:00
import re
2024-01-24 15:32:58 +08:00
import time
import json
from typing import Any
import requests
import uvicorn
from fastapi import FastAPI, Request
from pydantic import BaseModel
from config import *
# fastapi
app = FastAPI()
# 统一返回数据
class BaseResponse(BaseModel):
code: int = 200
msg: str = "success"
data: Any = None
2024-01-26 16:54:19 +08:00
# 监控视频接口
2024-01-25 17:10:40 +08:00
@app.get("/api/xumu/video")
async def data_query(username):
return BaseResponse(data=get_video_url(username))
2024-01-26 16:54:19 +08:00
@app.get("/api/xumu/device/online")
async def device_online_query(iccid, deviceId):
try:
2024-01-26 16:57:34 +08:00
sql = "select * from root.farm.clientId where time >= 0"
2024-01-26 16:54:19 +08:00
# 检查iccid是否有值如果有添加到SQL语句中
if iccid:
sql += f" and iccid = '{iccid}'"
# 检查deviceId是否有值如果有添加到SQL语句中
if deviceId:
sql += f" and deviceId = '{deviceId}'"
send_json = {
"sql": sql
}
r = requests.post(baseHost + queryUri, headers=headers, json=send_json)
return BaseResponse(data=r.json())
except Exception as e:
return BaseResponse(code=500, msg=str(e))
2024-01-24 15:32:58 +08:00
# 数据查询接口
2024-01-25 15:01:20 +08:00
@app.get("/api/xumu/data/query")
async def data_query(deviceId):
2024-01-24 15:32:58 +08:00
try:
2024-01-25 10:10:11 +08:00
if deviceId is None or deviceId == "" or len(deviceId) != 4:
2024-01-25 10:09:17 +08:00
return BaseResponse(code=500, msg="参数错误")
2024-01-24 17:29:46 +08:00
sql = f"select last * from root.farm.{deviceId}"
2024-01-24 15:32:58 +08:00
send_json = {
"sql": sql
}
r = requests.post(baseHost + queryUri, headers=headers, json=send_json)
return BaseResponse(data=r.json())
except Exception as e:
return BaseResponse(code=500, msg=str(e))
# 原生查询接口
@app.post("/api/xumu/rest/v2/query")
async def rest_query(request: Request):
data = await request.json()
r = requests.post(baseHost + queryUri, headers=headers, json=data)
return BaseResponse(data=r.json())
2024-01-25 11:59:21 +08:00
# RFID查询接口
2024-01-25 15:01:20 +08:00
@app.get("/api/xumu/rfid/query")
async def rfid_query(rfid):
2024-01-25 14:10:42 +08:00
try:
sql = f"select deviceId from root.rfid where rfid='{rfid}'"
send_json = {
"sql": sql
}
r = requests.post(baseHost + queryUri, headers=headers, json=send_json)
if r.status_code != 200:
return BaseResponse(data=r.json(), code=404, msg="404")
r = r.json()
values = r["values"]
if len(values) == 0:
return BaseResponse(msg="No Such RFID", code=500)
deviceId = values[0][0]
2024-01-25 15:02:37 +08:00
return await data_query(deviceId)
2024-01-25 14:10:42 +08:00
except Exception as e:
return BaseResponse(code=500, msg=str(e))
2024-01-25 11:59:21 +08:00
2024-01-25 14:54:28 +08:00
async def has_rfid(rfid):
try:
sql = f"select deviceId from root.rfid where rfid='{rfid}'"
send_json = {
"sql": sql
}
r = requests.post(baseHost + queryUri, headers=headers, json=send_json)
if r.status_code != 200:
return BaseResponse(data=r.json(), code=404, msg="404")
r = r.json()
values = r["values"]
if len(values) == 0:
return BaseResponse(msg="No Such RFID", code=500)
return BaseResponse(data=r)
except Exception as e:
return BaseResponse(code=500, msg=str(e))
2024-01-24 15:32:58 +08:00
# 设备查询接口
2024-01-25 14:54:28 +08:00
@app.get("/api/xumu/device/query")
async def get_device(iccid, deviceId):
2024-01-24 15:32:58 +08:00
try:
2024-01-25 09:42:51 +08:00
sql = "SELECT * FROM root.device where time>=0"
2024-01-24 15:32:58 +08:00
# 检查iccid是否有值如果有添加到SQL语句中
2024-01-25 14:54:28 +08:00
if iccid:
sql += f" and iccid = '{iccid}'"
2024-01-24 15:32:58 +08:00
# 检查deviceId是否有值如果有添加到SQL语句中
2024-01-25 14:54:28 +08:00
if deviceId:
sql += f" and deviceId = '{deviceId}'"
2024-01-24 15:32:58 +08:00
send_json = {
"sql": sql
}
r = requests.post(baseHost + queryUri, headers=headers, json=send_json)
return BaseResponse(data=r.json())
except Exception as e:
return BaseResponse(code=500, msg=str(e))
# 设备注册接口
@app.post("/api/xumu/device/register")
async def register(request: Request):
try:
data = await request.body()
data = data.decode("utf-8")
data = json.loads(data)
2024-01-26 16:30:33 +08:00
clientid = data["clientid"]
2024-01-24 15:32:58 +08:00
payload = data["payload"]
receive_len = len(payload)
payload = json.loads(payload)
send_len = payload["l"]
if receive_len != send_len:
2024-01-24 17:09:03 +08:00
return BaseResponse(code=301, msg=f"data valid error, receive_len:{receive_len}")
2024-01-24 15:32:58 +08:00
deviceId = payload["d"]
iccid = payload["cid"]
type = payload["t"]
2024-01-25 09:42:51 +08:00
# 该设备是否已经创建
2024-01-26 09:15:19 +08:00
a_device = await get_device(iccid, deviceId)
2024-01-25 09:23:01 +08:00
if a_device.code == 200:
v = a_device.data["values"]
if len(v) != 0:
return BaseResponse(code=302, msg="该设备已经注册过了")
2024-01-24 17:29:46 +08:00
# 创建该设备
2024-01-24 15:32:58 +08:00
send_json = {
"devices": ["root.device"],
"timestamps": [int(time.time() * 1000)],
"measurements_list": [["iccid", "deviceId", "type"]],
"data_types_list": [["TEXT", "TEXT", "INT32"]],
"values_list": [[iccid, deviceId, type]],
"is_aligned": False
}
2024-01-26 16:30:33 +08:00
# TODO 测试
# send_json = {
# "sql": f"insert into root.device(iccid, deviceId, type) values('{iccid}', '{deviceId}', {type})"
# }
2024-01-24 17:09:03 +08:00
res = []
2024-01-26 17:41:57 +08:00
r = requests.post(baseHost + insertUri, headers=headers, json=send_json)
2024-01-26 16:30:33 +08:00
res.append(r.json())
# 插入设备状态表
send_json = {
"sql": f"insert into root.farm.clientId(iccid, clientId, is_online, deviceId) values('{iccid}','{clientid}', False, '{deviceId}')"
}
r = requests.post(baseHost + nonQueryUri, headers=headers, json=send_json)
2024-01-24 17:09:03 +08:00
res.append(r.json())
# 创建相应的数据库和字段
sql_list = get_sql(deviceId, type)
for sql in sql_list:
send_json = {
"sql": sql
}
2024-01-25 09:42:51 +08:00
r = requests.post(baseHost + nonQueryUri, headers=headers, json=send_json)
2024-01-24 17:09:03 +08:00
res.append(r.json())
2024-01-24 17:16:08 +08:00
return BaseResponse(data=res)
2024-01-24 15:32:58 +08:00
except Exception as e:
return BaseResponse(code=500, msg=str(e))
2024-01-26 16:30:33 +08:00
async def message_publish(data):
2024-01-24 16:15:56 +08:00
try:
2024-01-26 16:30:33 +08:00
payload = data["payload"].strip()
2024-01-24 16:15:56 +08:00
receive_len = len(payload)
payload = json.loads(payload)
send_len = payload["l"]
if receive_len != send_len:
2024-01-25 14:16:47 +08:00
return BaseResponse(code=301, msg=f"data valid error,receive_len:{receive_len}")
2024-01-24 16:15:56 +08:00
m = payload["m"]
v = payload["v"]
2024-01-25 10:50:29 +08:00
t = payload["t"]
2024-01-25 14:16:47 +08:00
deviceId = v[0]
m.append("t")
v.append(t)
2024-01-25 14:10:42 +08:00
res = []
2024-01-25 14:16:47 +08:00
# 判断t是否为00则代表是RFID设备需要做一个映射
2024-01-25 14:10:42 +08:00
if t == 0:
rfid = v[1]
2024-01-25 14:54:28 +08:00
r = await has_rfid(rfid)
if r.code != 200:
sql_list = rfid_deviceId(rfid, deviceId)
sql = sql_list[0]
send_json = {
"sql": sql
}
r = requests.post(baseHost + nonQueryUri, headers=headers, json=send_json)
if r.status_code == 200:
r = r.json()
if r["code"] != 200:
return BaseResponse(code=500, msg="RFID创建映射失败", data=r)
res.append(r)
2024-01-24 16:15:56 +08:00
send_json = {
2024-01-25 14:10:42 +08:00
"devices": ["root.farm." + deviceId],
2024-01-24 16:15:56 +08:00
"timestamps": [int(time.time() * 1000)],
"measurements_list": [m],
"data_types_list": [dataTypes[t]],
"values_list": [v],
"is_aligned": False
}
r = requests.post(baseHost + insertUri, headers=headers, json=send_json)
2024-01-25 14:10:42 +08:00
r = r.json()
res.append(r)
return BaseResponse(data=res)
2024-01-24 16:15:56 +08:00
except Exception as e:
return BaseResponse(code=500, msg=str(e))
2024-01-24 15:32:58 +08:00
2024-01-26 16:30:33 +08:00
# 改变设备状态接口
async def client_change_status(clientid, status):
try:
sql_query = f"select timestamp from root.farm.clientId where clientId='{clientid}'"
r = requests.post(baseHost + queryUri, headers=headers, json={"sql": sql_query})
if r.status_code == 200:
r = r.json()
values = r["timestamps"]
if len(values) > 0:
timestamp = values[0]
sql_update_list = get_client_change_status_sql(timestamp, status)
sql_update = sql_update_list[0]
r = requests.post(baseHost + nonQueryUri, headers=headers, json={"sql": sql_update})
return BaseResponse(data=r.json())
except Exception as e:
return BaseResponse(code=500, msg=str(e))
@app.post("/api/xumu/data/collect")
async def process_data(request: Request):
try:
data = await request.body()
data = data.decode("utf-8")
data = json.loads(data)
event = data["event"]
clientid = data["clientid"]
2024-01-26 17:36:50 +08:00
print(clientid)
2024-01-26 17:28:18 +08:00
pattern = r'^farm_[a-zA-Z0-9]{12}$'
2024-01-26 17:31:08 +08:00
amatch1 = re.match(pattern, clientid)
2024-01-26 17:28:18 +08:00
pattern = r'^test'
2024-01-26 17:31:08 +08:00
amatch2 = re.match(pattern, clientid)
2024-01-26 17:33:20 +08:00
# 如果两个都不满足,那就错误
if not amatch1 and not amatch2:
2024-01-26 17:09:28 +08:00
return BaseResponse(code=500, msg="不符合畜牧clientId")
2024-01-26 16:30:33 +08:00
# 连接成功处理
if event == "client.connected":
return await client_change_status(clientid, True)
# 断开连接处理
elif event == "client.disconnected":
return await client_change_status(clientid, False)
# 消息体处理
elif event == "message.publish":
return await message_publish(data)
except Exception as e:
return BaseResponse(code=500, msg=str(e))
2024-01-24 15:32:58 +08:00
if __name__ == '__main__':
uvicorn.run(app, host="0.0.0.0", port=8002)