rfid_last_data_query
This commit is contained in:
parent
cb72da7524
commit
716ce6791a
9
xumu.py
9
xumu.py
|
@ -166,15 +166,6 @@ async def register(request: Request):
|
||||||
if len(v) != 0:
|
if len(v) != 0:
|
||||||
return BaseResponse(code=302, msg="该设备已经注册过了")
|
return BaseResponse(code=302, msg="该设备已经注册过了")
|
||||||
# 创建该设备
|
# 创建该设备
|
||||||
# send_json = {
|
|
||||||
# "devices": ["root.device"],
|
|
||||||
# "timestamps": [int(time.time() * 1000)],
|
|
||||||
# "measurements_list": [["iccid", "deviceId", "type"]],
|
|
||||||
# "data_types_list": [["TEXT", "TEXT", "INT32"]],
|
|
||||||
# "values_list": [[iccid, deviceId, type]],
|
|
||||||
# "is_aligned": False
|
|
||||||
# }
|
|
||||||
# 测试
|
|
||||||
send_json = {
|
send_json = {
|
||||||
"sql": f"insert into root.device(iccid, deviceId, type) values('{iccid}', '{deviceId}', {type})"
|
"sql": f"insert into root.device(iccid, deviceId, type) values('{iccid}', '{deviceId}', {type})"
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,286 @@
|
||||||
|
import re
|
||||||
|
import time
|
||||||
|
import json
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
import requests
|
||||||
|
import uvicorn
|
||||||
|
from fastapi import FastAPI, Request
|
||||||
|
from pydantic import BaseModel
|
||||||
|
from config import *
|
||||||
|
|
||||||
|
# fastapi
|
||||||
|
app = FastAPI()
|
||||||
|
|
||||||
|
|
||||||
|
# 统一返回数据
|
||||||
|
class BaseResponse(BaseModel):
|
||||||
|
code: int = 200
|
||||||
|
msg: str = "success"
|
||||||
|
data: Any = None
|
||||||
|
|
||||||
|
|
||||||
|
# 监控视频接口
|
||||||
|
@app.get("/api/xumu/video")
|
||||||
|
async def video_query(username):
|
||||||
|
return BaseResponse(data=get_video_url(username))
|
||||||
|
|
||||||
|
|
||||||
|
@app.get("/api/xumu/device/online")
|
||||||
|
async def device_online_query(iccid, deviceId):
|
||||||
|
try:
|
||||||
|
sql = "select * from root.farm.clientId where time >= 0"
|
||||||
|
# 检查iccid是否有值,如果有,添加到SQL语句中
|
||||||
|
if iccid:
|
||||||
|
sql += f" and iccid = '{iccid}'"
|
||||||
|
# 检查deviceId是否有值,如果有,添加到SQL语句中
|
||||||
|
if deviceId:
|
||||||
|
sql += f" and deviceId = '{deviceId}'"
|
||||||
|
send_json = {
|
||||||
|
"sql": sql
|
||||||
|
}
|
||||||
|
r = requests.post(baseHost + queryUri, headers=headers, json=send_json)
|
||||||
|
return BaseResponse(data=r.json())
|
||||||
|
except Exception as e:
|
||||||
|
return BaseResponse(code=500, msg=str(e))
|
||||||
|
|
||||||
|
|
||||||
|
# 数据查询接口
|
||||||
|
@app.get("/api/xumu/data/query")
|
||||||
|
async def data_query(deviceId):
|
||||||
|
try:
|
||||||
|
if deviceId is None or deviceId == "" or len(deviceId) != 4:
|
||||||
|
return BaseResponse(code=500, msg="参数错误")
|
||||||
|
sql = f"select last * from root.farm.{deviceId}"
|
||||||
|
send_json = {
|
||||||
|
"sql": sql
|
||||||
|
}
|
||||||
|
r = requests.post(baseHost + queryUri, headers=headers, json=send_json)
|
||||||
|
return BaseResponse(data=r.json())
|
||||||
|
except Exception as e:
|
||||||
|
return BaseResponse(code=500, msg=str(e))
|
||||||
|
|
||||||
|
|
||||||
|
async def rfid_last_query(rfid, deviceId):
|
||||||
|
try:
|
||||||
|
if deviceId is None or deviceId == "" or len(deviceId) != 4:
|
||||||
|
return BaseResponse(code=500, msg="参数错误")
|
||||||
|
sql = f"select * from root.farm.{deviceId} where RFID='{rfid}' order by time desc limit 1"
|
||||||
|
send_json = {
|
||||||
|
"sql": sql
|
||||||
|
}
|
||||||
|
r = requests.post(baseHost + queryUri, headers=headers, json=send_json)
|
||||||
|
return BaseResponse(data=r.json())
|
||||||
|
except Exception as e:
|
||||||
|
return BaseResponse(code=500, msg=str(e))
|
||||||
|
|
||||||
|
|
||||||
|
# 原生查询接口
|
||||||
|
@app.post("/api/xumu/rest/v2/query")
|
||||||
|
async def rest_query(request: Request):
|
||||||
|
data = await request.json()
|
||||||
|
r = requests.post(baseHost + queryUri, headers=headers, json=data)
|
||||||
|
return BaseResponse(data=r.json())
|
||||||
|
|
||||||
|
|
||||||
|
# RFID查询接口
|
||||||
|
@app.get("/api/xumu/rfid/query")
|
||||||
|
async def rfid_query(rfid):
|
||||||
|
try:
|
||||||
|
sql = f"select deviceId from root.rfid where rfid='{rfid}'"
|
||||||
|
send_json = {
|
||||||
|
"sql": sql
|
||||||
|
}
|
||||||
|
r = requests.post(baseHost + queryUri, headers=headers, json=send_json)
|
||||||
|
if r.status_code != 200:
|
||||||
|
return BaseResponse(data=r.json(), code=404, msg="404")
|
||||||
|
r = r.json()
|
||||||
|
values = r["values"]
|
||||||
|
if len(values) == 0:
|
||||||
|
return BaseResponse(msg="No Such RFID", code=500)
|
||||||
|
deviceId = values[0][0]
|
||||||
|
return await rfid_last_query(rfid, deviceId)
|
||||||
|
except Exception as e:
|
||||||
|
return BaseResponse(code=500, msg=str(e))
|
||||||
|
|
||||||
|
|
||||||
|
async def has_rfid(rfid):
|
||||||
|
try:
|
||||||
|
sql = f"select deviceId from root.rfid where rfid='{rfid}'"
|
||||||
|
send_json = {
|
||||||
|
"sql": sql
|
||||||
|
}
|
||||||
|
r = requests.post(baseHost + queryUri, headers=headers, json=send_json)
|
||||||
|
if r.status_code != 200:
|
||||||
|
return BaseResponse(data=r.json(), code=404, msg="404")
|
||||||
|
r = r.json()
|
||||||
|
values = r["values"]
|
||||||
|
if len(values) == 0:
|
||||||
|
return BaseResponse(msg="No Such RFID", code=500)
|
||||||
|
return BaseResponse(data=r)
|
||||||
|
except Exception as e:
|
||||||
|
return BaseResponse(code=500, msg=str(e))
|
||||||
|
|
||||||
|
|
||||||
|
# 设备查询接口
|
||||||
|
@app.get("/api/xumu/device/query")
|
||||||
|
async def get_device(iccid, deviceId):
|
||||||
|
try:
|
||||||
|
sql = "SELECT * FROM root.device where time>=0"
|
||||||
|
# 检查iccid是否有值,如果有,添加到SQL语句中
|
||||||
|
if iccid:
|
||||||
|
sql += f" and iccid = '{iccid}'"
|
||||||
|
# 检查deviceId是否有值,如果有,添加到SQL语句中
|
||||||
|
if deviceId:
|
||||||
|
sql += f" and deviceId = '{deviceId}'"
|
||||||
|
send_json = {
|
||||||
|
"sql": sql
|
||||||
|
}
|
||||||
|
r = requests.post(baseHost + queryUri, headers=headers, json=send_json)
|
||||||
|
return BaseResponse(data=r.json())
|
||||||
|
except Exception as e:
|
||||||
|
return BaseResponse(code=500, msg=str(e))
|
||||||
|
|
||||||
|
|
||||||
|
# 设备注册接口
|
||||||
|
@app.post("/api/xumu/device/register")
|
||||||
|
async def register(request: Request):
|
||||||
|
try:
|
||||||
|
data = await request.body()
|
||||||
|
data = data.decode("utf-8")
|
||||||
|
data = json.loads(data)
|
||||||
|
clientid = data["clientid"]
|
||||||
|
payload = data["payload"]
|
||||||
|
receive_len = len(payload)
|
||||||
|
payload = json.loads(payload)
|
||||||
|
send_len = payload["l"]
|
||||||
|
if receive_len != send_len:
|
||||||
|
return BaseResponse(code=301, msg=f"data valid error, receive_len:{receive_len}")
|
||||||
|
deviceId = payload["d"]
|
||||||
|
iccid = payload["cid"]
|
||||||
|
type = payload["t"]
|
||||||
|
# 该设备是否已经创建
|
||||||
|
a_device = await get_device(iccid, deviceId)
|
||||||
|
if a_device.code == 200:
|
||||||
|
v = a_device.data["values"]
|
||||||
|
if len(v) != 0:
|
||||||
|
return BaseResponse(code=302, msg="该设备已经注册过了")
|
||||||
|
# 注册
|
||||||
|
send_json = {
|
||||||
|
"devices": ["root.device", "root.farm.clientId"],
|
||||||
|
"timestamps": [int(time.time() * 1000), int(time.time() * 1000)],
|
||||||
|
"measurements_list": [["iccid", "deviceId", "type"], ["iccid", "clientId", "is_online", "deviceId"]],
|
||||||
|
"data_types_list": [["TEXT", "TEXT", "INT32"], ["TEXT", "TEXT", "BOOLEAN", "TEXT"]],
|
||||||
|
"values_list": [[iccid, deviceId, type], [iccid, clientid, True, deviceId]],
|
||||||
|
"is_aligned": False
|
||||||
|
}
|
||||||
|
r = requests.post(baseHost + insertUri, headers=headers, json=send_json)
|
||||||
|
res = [r.json()]
|
||||||
|
# 创建相应的数据库和字段
|
||||||
|
sql_list = get_sql(deviceId, type)
|
||||||
|
for sql in sql_list:
|
||||||
|
send_json = {
|
||||||
|
"sql": sql
|
||||||
|
}
|
||||||
|
r = requests.post(baseHost + nonQueryUri, headers=headers, json=send_json)
|
||||||
|
res.append(r.json())
|
||||||
|
return BaseResponse(data=res)
|
||||||
|
except Exception as e:
|
||||||
|
return BaseResponse(code=500, msg=str(e))
|
||||||
|
|
||||||
|
|
||||||
|
async def message_publish(data):
|
||||||
|
try:
|
||||||
|
payload = data["payload"].strip()
|
||||||
|
receive_len = len(payload)
|
||||||
|
payload = json.loads(payload)
|
||||||
|
send_len = payload["l"]
|
||||||
|
if receive_len != send_len:
|
||||||
|
return BaseResponse(code=301, msg=f"data valid error,receive_len:{receive_len}")
|
||||||
|
m = payload["m"]
|
||||||
|
v = payload["v"]
|
||||||
|
t = payload["t"]
|
||||||
|
deviceId = v[0]
|
||||||
|
m.append("t")
|
||||||
|
v.append(t)
|
||||||
|
res = []
|
||||||
|
# 判断t是否为0,0则代表是RFID设备,需要做一个映射
|
||||||
|
if t == 0:
|
||||||
|
rfid = v[1]
|
||||||
|
r = await has_rfid(rfid)
|
||||||
|
if r.code != 200:
|
||||||
|
sql_list = rfid_deviceId(rfid, deviceId)
|
||||||
|
sql = sql_list[0]
|
||||||
|
send_json = {
|
||||||
|
"sql": sql
|
||||||
|
}
|
||||||
|
r = requests.post(baseHost + nonQueryUri, headers=headers, json=send_json)
|
||||||
|
if r.status_code == 200:
|
||||||
|
r = r.json()
|
||||||
|
if r["code"] != 200:
|
||||||
|
return BaseResponse(code=500, msg="RFID创建映射失败", data=r)
|
||||||
|
res.append(r)
|
||||||
|
send_json = {
|
||||||
|
"devices": ["root.farm." + deviceId],
|
||||||
|
"timestamps": [int(time.time() * 1000)],
|
||||||
|
"measurements_list": [m],
|
||||||
|
"data_types_list": [dataTypes[t]],
|
||||||
|
"values_list": [v],
|
||||||
|
"is_aligned": False
|
||||||
|
}
|
||||||
|
r = requests.post(baseHost + insertUri, headers=headers, json=send_json)
|
||||||
|
r = r.json()
|
||||||
|
res.append(r)
|
||||||
|
return BaseResponse(data=res)
|
||||||
|
except Exception as e:
|
||||||
|
return BaseResponse(code=500, msg=str(e))
|
||||||
|
|
||||||
|
|
||||||
|
# 改变设备状态接口
|
||||||
|
async def client_change_status(clientid, status):
|
||||||
|
try:
|
||||||
|
sql_query = f"select timestamp from root.farm.clientId where clientId='{clientid}'"
|
||||||
|
r = requests.post(baseHost + queryUri, headers=headers, json={"sql": sql_query})
|
||||||
|
if r.status_code == 200:
|
||||||
|
r = r.json()
|
||||||
|
values = r["timestamps"]
|
||||||
|
if len(values) > 0:
|
||||||
|
timestamp = values[0]
|
||||||
|
sql_update_list = get_client_change_status_sql(timestamp, status)
|
||||||
|
sql_update = sql_update_list[0]
|
||||||
|
r = requests.post(baseHost + nonQueryUri, headers=headers, json={"sql": sql_update})
|
||||||
|
return BaseResponse(data=r.json())
|
||||||
|
except Exception as e:
|
||||||
|
return BaseResponse(code=500, msg=str(e))
|
||||||
|
|
||||||
|
|
||||||
|
@app.post("/api/xumu/data/collect")
|
||||||
|
async def process_data(request: Request):
|
||||||
|
try:
|
||||||
|
data = await request.body()
|
||||||
|
data = data.decode("utf-8")
|
||||||
|
data = json.loads(data)
|
||||||
|
event = data["event"]
|
||||||
|
clientid = data["clientid"]
|
||||||
|
# print(clientid + "======" + event)
|
||||||
|
pattern = r'^f\d{1,10}$'
|
||||||
|
amatch1 = re.match(pattern, clientid)
|
||||||
|
# 如果两个都不满足,那就错误
|
||||||
|
if not amatch1:
|
||||||
|
return BaseResponse(code=200, msg="error")
|
||||||
|
# 连接成功处理
|
||||||
|
if event == "client.connected":
|
||||||
|
return await client_change_status(clientid, True)
|
||||||
|
# 断开连接处理
|
||||||
|
elif event == "client.disconnected":
|
||||||
|
return await client_change_status(clientid, False)
|
||||||
|
# 消息体处理
|
||||||
|
elif event == "message.publish":
|
||||||
|
return await message_publish(data)
|
||||||
|
return BaseResponse()
|
||||||
|
except Exception as e:
|
||||||
|
return BaseResponse(code=500, msg=str(e))
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
uvicorn.run(app, host="0.0.0.0", port=8002)
|
Loading…
Reference in New Issue