xumu_iotdb/xumu.py

352 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import re
import json
from typing import Any
import requests
import uvicorn
from fastapi import FastAPI, Request
from pydantic import BaseModel
from config import *
from sql.sql import *
# fastapi
app = FastAPI()
# 统一返回数据
class BaseResponse(BaseModel):
code: int = 200
msg: str = "success"
data: Any = None
# 监控视频接口
@app.get("/api/xumu/video")
async def video_query(username):
return BaseResponse(data=get_video_url(username))
# 报警数据接口
@app.get("/api/xumu/warning")
async def get_warning(deviceId, limit, offset):
try:
send_json = get_warning_sql(deviceId, limit, offset)
r = requests.post(baseHost + queryUri, headers=headers, json=send_json)
return BaseResponse(data=r.json())
except Exception as e:
return BaseResponse(code=500, msg=str(e))
@app.get("/api/xumu/device/online")
async def device_online_query(iccid, deviceId):
try:
send_json = get_device_online_sql(iccid, deviceId)
r = requests.post(baseHost + queryUri, headers=headers, json=send_json)
return BaseResponse(data=r.json())
except Exception as e:
return BaseResponse(code=500, msg=str(e))
# 数据查询接口
@app.get("/api/xumu/data/query")
async def data_query(deviceId):
try:
if deviceId is None or deviceId == "" or len(deviceId) != 4:
return BaseResponse(code=500, msg="参数错误")
send_json = data_query_sql(deviceId)
r = requests.post(baseHost + queryUri, headers=headers, json=send_json)
return BaseResponse(data=r.json())
except Exception as e:
return BaseResponse(code=500, msg=str(e))
async def rfid_last_query(rfid, deviceId):
try:
if deviceId is None or deviceId == "" or len(deviceId) != 4:
return BaseResponse(code=500, msg="参数错误")
send_json = rfid_last_query_sql(rfid, deviceId)
r = requests.post(baseHost + queryUri, headers=headers, json=send_json)
return BaseResponse(data=r.json())
except Exception as e:
return BaseResponse(code=500, msg=str(e))
# 原生查询接口
@app.post("/api/xumu/rest/v2/query")
async def rest_query(request: Request):
data = await request.json()
r = requests.post(baseHost + queryUri, headers=headers, json=data)
return BaseResponse(data=r.json())
# RFID查询接口
@app.get("/api/xumu/rfid/query")
async def rfid_query(rfid):
# TODO 可能一个rfid有多个iccid设备上传需要获取所有的设备数据然后返回现在这个功能只有一个设备 不一定是最新的
try:
send_json = rfid_query_sql(rfid)
r = requests.post(baseHost + queryUri, headers=headers, json=send_json)
if r.status_code != 200:
return BaseResponse(data=r.json(), code=404, msg="404")
r = r.json()
values = r["values"]
if len(values) == 0:
return BaseResponse(msg="No Such RFID", code=500)
deviceId = values[0][0]
return await rfid_last_query(rfid, deviceId)
except Exception as e:
return BaseResponse(code=500, msg=str(e))
async def has_rfid(rfid):
try:
send_json = has_rfid_sql(rfid)
r = requests.post(baseHost + queryUri, headers=headers, json=send_json)
if r.status_code != 200:
return BaseResponse(data=r.json(), code=404, msg="404")
r = r.json()
values = r["values"]
if len(values) == 0:
return BaseResponse(msg="No Such RFID", code=500)
return BaseResponse(data=r)
except Exception as e:
return BaseResponse(code=500, msg=str(e))
# 设备查询接口
@app.get("/api/xumu/device/query")
async def get_device(iccid, deviceId):
try:
send_json = get_device_sql(iccid, deviceId)
r = requests.post(baseHost + queryUri, headers=headers, json=send_json)
return BaseResponse(data=r.json())
except Exception as e:
return BaseResponse(code=500, msg=str(e))
# 设备注册接口
@app.post("/api/xumu/device/register")
async def register(request: Request):
try:
data = await request.body()
data = data.decode("utf-8")
data = json.loads(data)
clientid = data["clientid"]
payload = data["payload"]
receive_len = len(payload)
payload = json.loads(payload)
send_len = payload["l"]
if receive_len != send_len:
return BaseResponse(code=301, msg=f"data valid error, receive_len:{receive_len}")
deviceId = payload["d"]
iccid = payload["cid"]
type = payload["t"]
res = []
# 查看设备是否已经创建
a_device = await get_device(None, deviceId)
if a_device.code == 200:
v = a_device.data["values"]
t = a_device.data["timestamps"]
if len(v) != 0:
timestamp = t[0]
theIccid = v[0]
# 如果设备上传的注册iccid和数据库查询的iccid一致则说明已经注册过了
if theIccid == iccid:
return BaseResponse(code=302, msg="该设备已经注册过了")
# 如果不一致,则直接进行更新注册
send_json = update_register_sql(timestamp, iccid, deviceId, type)
r = requests.post(baseHost + nonQueryUri, headers=headers, json=send_json)
res.append(r.json())
else:
# 创建该设备
send_json = create_register_sql(iccid, deviceId, type)
r = requests.post(baseHost + nonQueryUri, headers=headers, json=send_json)
res.append(r.json())
# 插入设备状态表,如果状态表有,那么不插入设备表,如果状态表没有,则插入
send_json = has_device_status_sql(clientid)
r = requests.post(baseHost + queryUri, headers=headers, json=send_json)
if r.status_code != 200:
return BaseResponse(data=r.json(), code=500, msg="500 Internal Error")
data = r.json()
if len(data["values"]) == 0:
send_json = insert_device_status_sql(iccid, clientid, deviceId)
r = requests.post(baseHost + nonQueryUri, headers=headers, json=send_json)
res.append(r.json())
else:
return BaseResponse(code=500, msg="500 Internal error")
# 创建相应的数据库和字段
sql_list = farm_sql(deviceId, type)
for sql in sql_list:
send_json = {
"sql": sql
}
r = requests.post(baseHost + nonQueryUri, headers=headers, json=send_json)
res.append(r.json())
# 创建相应的数据库和字段
sql_list = warning_sql(deviceId, type)
for sql in sql_list:
send_json = {
"sql": sql
}
r = requests.post(baseHost + nonQueryUri, headers=headers, json=send_json)
res.append(r.json())
return BaseResponse(data=res)
except Exception as e:
return BaseResponse(code=500, msg=str(e))
# 数据发布接口
async def message_publish(data):
try:
payload = data["payload"].strip()
receive_len = len(payload)
payload = json.loads(payload)
send_len = payload["l"]
if receive_len != send_len:
return BaseResponse(code=301, msg=f"data valid error,receive_len:{receive_len}")
m = payload["m"]
v = payload["v"]
t = payload["t"]
deviceId = v[0]
m.append("t")
v.append(t)
res = []
# 判断t是否为00则代表是RFID设备需要做一个映射
if t == 0:
rfid = v[1]
r = await has_rfid(rfid)
if r.code != 200:
sql_list = rfid_deviceId(rfid, deviceId)
sql = sql_list[0]
send_json = {
"sql": sql
}
r = requests.post(baseHost + nonQueryUri, headers=headers, json=send_json)
if r.status_code == 200:
r = r.json()
if r["code"] != 200:
return BaseResponse(code=500, msg="RFID创建映射失败", data=r)
res.append(r)
send_json = message_publish_sql(deviceId, m, t, v)
r = requests.post(baseHost + insertUri, headers=headers, json=send_json)
r = r.json()
res.append(r)
if is_warning(deviceId, v, t):
# 如果是报警数据,则发送到报警数据表
send_json = insert_to_warning_sql(deviceId, v, t)
r = requests.post(baseHost + insertUri, headers=headers, json=send_json)
r = r.json()
res.append(r)
return BaseResponse(data=res)
except Exception as e:
return BaseResponse(code=500, msg=str(e))
# 改变设备状态接口
async def client_change_status(clientid, status):
try:
send_json = get_has_clientId_sql(clientid)
r = requests.post(baseHost + queryUri, headers=headers, json=send_json)
if r.status_code == 200:
r = r.json()
values = r["timestamps"]
if len(values) > 0:
timestamp = values[0]
sql_update_list = get_client_change_status_sql(timestamp, status)
sql_update = sql_update_list[0]
r = requests.post(baseHost + nonQueryUri, headers=headers, json={"sql": sql_update})
return BaseResponse(data=r.json())
except Exception as e:
return BaseResponse(code=500, msg=str(e))
@app.post("/api/xumu/data/collect")
async def process_data(request: Request):
try:
data = await request.body()
data = data.decode("utf-8")
data = json.loads(data)
event = data["event"]
clientid = data["clientid"]
# print(clientid + "======" + event)
pattern = r'^f\d{1,10}$'
amatch1 = re.match(pattern, clientid)
# 如果两个都不满足,那就错误
if not amatch1:
return BaseResponse(code=500, msg="clientid error")
# 连接成功处理
if event == "client.connected":
return await client_change_status(clientid, True)
# 断开连接处理
elif event == "client.disconnected":
return await client_change_status(clientid, False)
# 消息体处理
elif event == "message.publish":
return await message_publish(data)
return BaseResponse()
except Exception as e:
return BaseResponse(code=500, msg=str(e))
# 报警数据最近7天的数据
@app.get("/api/xumu/warning/past_seven_days")
async def past_seven_days(deviceId):
try:
the_decvice = await get_device(None, deviceId)
device_type = None
if the_decvice.code == 200:
device_values = the_decvice.data["values"]
if len(device_values) != 0:
device_type = device_values[1][0]
# 如果是空气设备,则有两个值
if device_type == 1:
send_json = air_device_past_seven_days_sql(deviceId)
# 如果是其他设备则有一个值RFID不算
else:
send_json = common_device_past_seven_days_sql(deviceId)
r = requests.post(baseHost + queryUri, headers=headers, json=send_json)
data = r.json()
timestamps = data["timestamps"]
times = []
for timestamp in timestamps:
dt = datetime.fromtimestamp(timestamp / 1000) # 因为时间戳是以毫秒为单位的所以需要除以1000
# 将datetime对象格式化为只包含年月日的字符串
year_month_day_str = dt.strftime('%Y-%m-%d')
times.append(year_month_day_str)
data["timestamps"] = times
return BaseResponse(data=data)
except Exception as e:
return BaseResponse(code=500, msg=str(e))
# 报警数据统计接口
@app.get("/api/xumu/warning/statistics")
async def warning_statistics(deviceId):
try:
if deviceId is None or deviceId == "" or len(deviceId) != 4:
return BaseResponse(code=500, msg="参数错误")
# 获取设备总报警数
send_json = get_device_all_warning_sql(deviceId)
r = requests.post(baseHost + queryUri, headers=headers, json=send_json)
data = r.json()
values = data["values"]
total_count = 0
if len(values) != 0:
total_count = values[0][0]
# 获取设备当天报警数
send_json = get_device_today_warning_sql(deviceId)
r = requests.post(baseHost + queryUri, headers=headers, json=send_json)
data = r.json()
values = data["values"]
today_count = 0
if len(values) != 0:
today_count = values[0][0]
return_data = {"totoal_warning_count": total_count, "today_warning_count": today_count}
return BaseResponse(data=return_data)
except Exception as e:
return BaseResponse(code=500, msg=str(e))
if __name__ == '__main__':
uvicorn.run(app, host="0.0.0.0", port=8002)