xumu_iotdb/xumu.py

376 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import re
import json
from typing import Any
from datetime import datetime, timedelta
import requests
import uvicorn
from fastapi import FastAPI, Request
from pydantic import BaseModel
from config import *
# fastapi
app = FastAPI()
# 统一返回数据
class BaseResponse(BaseModel):
code: int = 200
msg: str = "success"
data: Any = None
# 监控视频接口
@app.get("/api/xumu/video")
async def video_query(username):
return BaseResponse(data=get_video_url(username))
# 报警数据接口
@app.get("/api/xumu/warning")
async def get_warning(deviceId):
try:
sql = f"select * from root.warning.{deviceId}"
# 检查iccid是否有值如果有添加到SQL语句中
send_json = {
"sql": sql
}
r = requests.post(baseHost + queryUri, headers=headers, json=send_json)
data = r.json()
return BaseResponse(data=r.json())
except Exception as e:
return BaseResponse(code=500, msg=str(e))
@app.get("/api/xumu/device/online")
async def device_online_query(iccid, deviceId):
try:
sql = "select * from root.farm.clientId where time >= 0"
# 检查iccid是否有值如果有添加到SQL语句中
if iccid:
sql += f" and iccid = '{iccid}'"
# 检查deviceId是否有值如果有添加到SQL语句中
if deviceId:
sql += f" and deviceId = '{deviceId}'"
send_json = {
"sql": sql
}
r = requests.post(baseHost + queryUri, headers=headers, json=send_json)
return BaseResponse(data=r.json())
except Exception as e:
return BaseResponse(code=500, msg=str(e))
# 数据查询接口
@app.get("/api/xumu/data/query")
async def data_query(deviceId):
try:
if deviceId is None or deviceId == "" or len(deviceId) != 4:
return BaseResponse(code=500, msg="参数错误")
sql = f"select last * from root.farm.{deviceId}"
send_json = {
"sql": sql
}
r = requests.post(baseHost + queryUri, headers=headers, json=send_json)
return BaseResponse(data=r.json())
except Exception as e:
return BaseResponse(code=500, msg=str(e))
async def rfid_last_query(rfid, deviceId):
try:
if deviceId is None or deviceId == "" or len(deviceId) != 4:
return BaseResponse(code=500, msg="参数错误")
sql = f"select * from root.farm.{deviceId} where RFID='{rfid}' order by time desc limit 1"
send_json = {
"sql": sql
}
r = requests.post(baseHost + queryUri, headers=headers, json=send_json)
return BaseResponse(data=r.json())
except Exception as e:
return BaseResponse(code=500, msg=str(e))
# 原生查询接口
@app.post("/api/xumu/rest/v2/query")
async def rest_query(request: Request):
data = await request.json()
r = requests.post(baseHost + queryUri, headers=headers, json=data)
return BaseResponse(data=r.json())
# RFID查询接口
@app.get("/api/xumu/rfid/query")
async def rfid_query(rfid):
# TODO 可能一个rfid有多个iccid设备上传需要获取所有的设备数据然后返回现在这个功能只有一个设备 不一定是最新的
try:
sql = f"select deviceId from root.rfid where rfid='{rfid}'"
send_json = {
"sql": sql
}
r = requests.post(baseHost + queryUri, headers=headers, json=send_json)
if r.status_code != 200:
return BaseResponse(data=r.json(), code=404, msg="404")
r = r.json()
values = r["values"]
if len(values) == 0:
return BaseResponse(msg="No Such RFID", code=500)
deviceId = values[0][0]
return await rfid_last_query(rfid, deviceId)
except Exception as e:
return BaseResponse(code=500, msg=str(e))
async def has_rfid(rfid):
try:
sql = f"select deviceId from root.rfid where rfid='{rfid}'"
send_json = {
"sql": sql
}
r = requests.post(baseHost + queryUri, headers=headers, json=send_json)
if r.status_code != 200:
return BaseResponse(data=r.json(), code=404, msg="404")
r = r.json()
values = r["values"]
if len(values) == 0:
return BaseResponse(msg="No Such RFID", code=500)
return BaseResponse(data=r)
except Exception as e:
return BaseResponse(code=500, msg=str(e))
# 设备查询接口
@app.get("/api/xumu/device/query")
async def get_device(iccid, deviceId):
try:
sql = "SELECT * FROM root.device where time>=0"
# 检查iccid是否有值如果有添加到SQL语句中
if iccid:
sql += f" and iccid = '{iccid}'"
# 检查deviceId是否有值如果有添加到SQL语句中
if deviceId:
sql += f" and deviceId = '{deviceId}'"
send_json = {
"sql": sql
}
r = requests.post(baseHost + queryUri, headers=headers, json=send_json)
return BaseResponse(data=r.json())
except Exception as e:
return BaseResponse(code=500, msg=str(e))
# 设备注册接口
@app.post("/api/xumu/device/register")
async def register(request: Request):
try:
data = await request.body()
data = data.decode("utf-8")
data = json.loads(data)
clientid = data["clientid"]
payload = data["payload"]
receive_len = len(payload)
payload = json.loads(payload)
send_len = payload["l"]
if receive_len != send_len:
return BaseResponse(code=301, msg=f"data valid error, receive_len:{receive_len}")
deviceId = payload["d"]
iccid = payload["cid"]
type = payload["t"]
res = []
# 查看设备是否已经创建
a_device = await get_device(None, deviceId)
if a_device.code == 200:
v = a_device.data["values"]
t = a_device.data["timestamps"]
if len(v) != 0:
timestamp = t[0]
theIccid = v[0]
# 如果设备上传的注册iccid和数据库查询的iccid一致则说明已经注册过了
if theIccid == iccid:
return BaseResponse(code=302, msg="该设备已经注册过了")
# 如果不一致,则直接进行更新注册
send_json = {
"sql": f"insert into root.device(timestamp, iccid, deviceId, type) values({timestamp}, '{iccid}', '{deviceId}', {type})"
}
r = requests.post(baseHost + nonQueryUri, headers=headers, json=send_json)
res.append(r.json())
else:
# 创建该设备
send_json = {
"sql": f"insert into root.device(iccid, deviceId, type) values('{iccid}', '{deviceId}', {type})"
}
r = requests.post(baseHost + nonQueryUri, headers=headers, json=send_json)
res.append(r.json())
else:
return BaseResponse(code=500, msg="500 Internal error")
# 插入设备状态表
send_json = {
"sql": f"insert into root.farm.clientId(iccid, clientId, is_online, deviceId) values('{iccid}','{clientid}', True, '{deviceId}')"
}
r = requests.post(baseHost + nonQueryUri, headers=headers, json=send_json)
res.append(r.json())
# 创建相应的数据库和字段
sql_list = farm_sql(deviceId, type)
for sql in sql_list:
send_json = {
"sql": sql
}
r = requests.post(baseHost + nonQueryUri, headers=headers, json=send_json)
res.append(r.json())
# 创建相应的数据库和字段
sql_list = warning_sql(deviceId, type)
for sql in sql_list:
send_json = {
"sql": sql
}
r = requests.post(baseHost + nonQueryUri, headers=headers, json=send_json)
res.append(r.json())
return BaseResponse(data=res)
except Exception as e:
return BaseResponse(code=500, msg=str(e))
# 数据发布接口
async def message_publish(data):
try:
payload = data["payload"].strip()
receive_len = len(payload)
payload = json.loads(payload)
send_len = payload["l"]
if receive_len != send_len:
return BaseResponse(code=301, msg=f"data valid error,receive_len:{receive_len}")
m = payload["m"]
v = payload["v"]
t = payload["t"]
deviceId = v[0]
m.append("t")
v.append(t)
res = []
# 判断t是否为00则代表是RFID设备需要做一个映射
if t == 0:
rfid = v[1]
r = await has_rfid(rfid)
if r.code != 200:
sql_list = rfid_deviceId(rfid, deviceId)
sql = sql_list[0]
send_json = {
"sql": sql
}
r = requests.post(baseHost + nonQueryUri, headers=headers, json=send_json)
if r.status_code == 200:
r = r.json()
if r["code"] != 200:
return BaseResponse(code=500, msg="RFID创建映射失败", data=r)
res.append(r)
send_json = {
"devices": ["root.farm." + deviceId],
"timestamps": [int(time.time() * 1000)],
"measurements_list": [m],
"data_types_list": [dataTypes[t]],
"values_list": [v],
"is_aligned": False
}
r = requests.post(baseHost + insertUri, headers=headers, json=send_json)
r = r.json()
res.append(r)
if is_warning(deviceId, v, t):
# 如果是报警数据,则发送到报警数据表
send_json = insert_to_warning_sql(deviceId, v, t)
r = requests.post(baseHost + insertUri, headers=headers, json=send_json)
r = r.json()
res.append(r)
return BaseResponse(data=res)
except Exception as e:
return BaseResponse(code=500, msg=str(e))
# 改变设备状态接口
async def client_change_status(clientid, status):
try:
sql_query = f"select timestamp from root.farm.clientId where clientId='{clientid}'"
r = requests.post(baseHost + queryUri, headers=headers, json={"sql": sql_query})
if r.status_code == 200:
r = r.json()
values = r["timestamps"]
if len(values) > 0:
timestamp = values[0]
sql_update_list = get_client_change_status_sql(timestamp, status)
sql_update = sql_update_list[0]
r = requests.post(baseHost + nonQueryUri, headers=headers, json={"sql": sql_update})
return BaseResponse(data=r.json())
except Exception as e:
return BaseResponse(code=500, msg=str(e))
@app.post("/api/xumu/data/collect")
async def process_data(request: Request):
try:
data = await request.body()
data = data.decode("utf-8")
data = json.loads(data)
event = data["event"]
clientid = data["clientid"]
# print(clientid + "======" + event)
pattern = r'^f\d{1,10}$'
amatch1 = re.match(pattern, clientid)
# 如果两个都不满足,那就错误
if not amatch1:
return BaseResponse(code=500, msg="clientid error")
# 连接成功处理
if event == "client.connected":
return await client_change_status(clientid, True)
# 断开连接处理
elif event == "client.disconnected":
return await client_change_status(clientid, False)
# 消息体处理
elif event == "message.publish":
return await message_publish(data)
return BaseResponse()
except Exception as e:
return BaseResponse(code=500, msg=str(e))
# 报警数据最近7天的数据
@app.get("/api/xumu/warning/past_seven_days")
async def past_seven_days(deviceId):
try:
current_date = datetime.now().date()
past_7_days_date = current_date - timedelta(days=7)
if deviceId is None or deviceId == "" or len(deviceId) != 4:
return BaseResponse(code=500, msg="参数错误")
sql = f"select * from root.warning.{deviceId} where time >= {past_7_days_date} and time <= {current_date}"
send_json = {
"sql": sql
}
r = requests.post(baseHost + queryUri, headers=headers, json=send_json)
return BaseResponse(data=r.json())
except Exception as e:
return BaseResponse(code=500, msg=str(e))
# 报警数据统计接口
@app.get("/api/xumu/warning/statistics")
async def warning_statistics(deviceId):
try:
if deviceId is None or deviceId == "" or len(deviceId) != 4:
return BaseResponse(code=500, msg="参数错误")
sql_for_count = f"select count(iccid) from root.warning.{deviceId}"
send_json = {
"sql": sql_for_count
}
r = requests.post(baseHost + queryUri, headers=headers, json=send_json)
data = r.json()
values = data["values"]
total_count = 0
today_count = 0
if len(values) != 0:
total_count = len(values[0])
return_data = {"totoal_count": total_count, "today_count": today_count}
return BaseResponse(data=return_data)
except Exception as e:
return BaseResponse(code=500, msg=str(e))
if __name__ == '__main__':
uvicorn.run(app, host="0.0.0.0", port=8002)