lot_manager/api.py

100 lines
2.9 KiB
Python

import os
import subprocess
import uvicorn
from db.models.base import BaseResponse
from db.models.log_data_model import LOT_DATA
from db.repository import add_kb_to_db, get_kb_detail, get_kb_detail_by_time, delete_kb_detail_by_time
from fastapi import FastAPI
app = FastAPI()
@app.post("/add/lot_data/{device_name}")
async def add(data: LOT_DATA, device_name: str):
try:
add_kb_to_db(data, device_name)
return BaseResponse()
except Exception as e:
return BaseResponse(code=500, msg=e)
@app.get("/get/all/lot_data")
async def get_data():
try:
data = get_kb_detail()
return BaseResponse(data=data)
except Exception as e:
return BaseResponse(code=404, msg=e)
@app.get("/get/data/by/time")
async def get_data(start_time, end_time):
try:
data = get_kb_detail_by_time(start_time, end_time)
return BaseResponse(data=data)
except Exception as e:
return BaseResponse(code=404, msg=e)
@app.get("/delete/data/by/time")
async def delete_data(start_time, end_time):
try:
data = delete_kb_detail_by_time(start_time, end_time)
return BaseResponse(data=data)
except Exception as e:
return BaseResponse(code=404, msg=e)
@app.get("/create/db")
async def create():
from db.base import Base, engine
try:
Base.metadata.create_all(bind=engine)
return BaseResponse()
except Exception as e:
return BaseResponse(code=404, msg=e)
@app.get("/start")
def device_test():
## 不管有没有都进行一个杀死
# os.popen()
# stop_cmd = r'kill -9 `lsof -t -i:554`'
# os.system(stop_cmd)
start_cmd = r'nohup ffmpeg -rtsp_transport tcp -re -i rtsp://admin:123456@192.168.0.226:554/mpeg4 -c:v copy -c:a aac -preset ultrafast -r 20 -flvflags no_duration_filesize -f rtsp -rtsp_transport tcp rtsp://127.0.0.1/live/test > app.out 2>&1 & echo $! > app.pid'
p = subprocess.Popen(start_cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = p.communicate()
return_code = p.returncode
if return_code == 0:
return BaseResponse(code=return_code, msg=out,
data="http://192.168.1.27/live/test.live.mp4?secret=gqig2yFKkDpIMic1uWZY1L5MsIo0eflm")
else:
return BaseResponse(code=return_code, msg=err)
@app.get("/stop")
async def device_test2():
## 不管有没有都进行一个杀死
cmd = 'lsof -t -i:554'
p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# 获取执行结果
out, err = p.communicate()
# 获取返回值
return_code = p.returncode
if out == b'':
print("失败")
else:
pid = out
cmd = f'kill -9 {pid}'
print(pid)
os.system(cmd)
if return_code == 0:
return BaseResponse(code=return_code, msg=out)
return BaseResponse(code=return_code, msg=err)
if __name__ == '__main__':
uvicorn.run(app, host="127.0.0.1", port=8000)