cross_doctor/app/views_task.py

149 lines
3.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
# @Author: Owl
# @Date: 2025/11/10 18:12
# @Description:
import json
from flask import Flask, request, jsonify, redirect
from flask_cors import CORS
from app.models import *
from app.task_worker import *
from app.cross_eva_views import *
#app = Flask(__name__)
#CORS(app, resources={r"/api/*": {"origins": "*"}})
#获取路口台账的查询时的基础参数
@app.route('/api/get_task_list_parameter', methods=['GET'])
def cross_task_list_parameter():
return do_query_task_list_parameter(dict(request.args))
#获取全部符合条件的任务
#task_name
#task_type
#task_class
#plan_begin_time
#plan_end_time
#state
#executor
#task_src
@app.route('/api/get_task_list', methods=['GET'])
def get_task_list():
return do_query_task_list(dict(request.args))
#获取所有具有'已完成'任务的路口
#params
#nodeid
@app.route('/api/get_completed_task_cross_list', methods=['GET'])
def get_completed_task_cross_list():
return do_query_completed_task_cross_list(dict(request.args))
#获取指定路口的所有的'已完成'任务
#params
#nodeid
#crossid
@app.route('/api/get_completed_task_by_cross', methods=['GET'])
def get_completed_task_by_cross():
return do_query_completed_task_by_cross(dict(request.args))
#参数:
# "nodeid": "9660"
#creatorid
#task_name
#task_type
#plan_begin_time
#plan_end_time
#executor
#description
#crossids 路口id
#sectionids 路段id选填
#comment
#task_src
#task_class
@app.route('/api/add_task', methods=['POST'])
def add_task():
try:
vle = do_add_task(dict(request.form))
return vle
except pymysql.Error as e:
# 捕获 PyMySQL 异常
logging.error(f"add_task except: {e}")
return json.dumps(make_common_res(-1, '发生异常'))
#删除任务
#参数:
# "nodeid": "9660"
# "taskno"
# 20250103 更新为post接口以支持同时删除多条记录
@app.route('/api/remove_task', methods=['POST'])
def remove_task():
return do_remove_task(request.get_json())
#下发任务
#参数:
# "nodeid": "9660"
# "taskno"
@app.route('/api/distribute_task', methods=['POST'])
def distribute_task():
return do_distribute_task(dict(request.form))
#审批任务
#参数:
# "nodeid": "9660"
# "taskno"
@app.route('/api/approval_task', methods=['POST'])
def approval_task():
return do_approval(request.get_json())
#完成任务
#参数:
# "nodeid": "9660"
# "taskno"
@app.route('/api/complete_task', methods=['GET'])
def complete_task():
return do_complete_task(dict(request.args))
@app.route('/api/update_task', methods=['POST'])
def update_task():
return do_update_task(dict(request.form))
#参数:
# "nodeid": "9660"
# "taskno"
@app.route('/api/get_task_history', methods=['GET'])
def get_task_history():
return do_query_task_history(dict(request.args))
#参数:
# "nodeid": "9660"
# "cross_id"
@app.route('/api/get_task_detail', methods=['GET'])
def get_task_detail():
return do_query_task_detail(dict(request.args))
@app.route('/api/upload_task_file', methods=['POST'])
def task_upload():
return do_task_upload(dict(request.args))
@app.route('/api/download_task_file', methods=['GET'])
def task_download():
return do_task_file_download(dict(request.args))
@app.route('/api/del_task_file', methods=['GET'])
def task_del():
return del_task_file_api(dict(request.args))
# if __name__ == '__main__':
# init()
# app.run(debug=True)