cross_doctor/app/task_db_func.py

410 lines
19 KiB
Python

# -*- coding: utf-8 -*-
# @Author: Owl
# @Date: 2025/11/10 17:59
# @Description:
# -*- coding:utf-8 -*-
#import logging
import logging
import pymysql
import pymysql.cursors
from datetime import datetime
from flask import g
from app.db_func_base import *
class TaskDbHelper(TableDbHelperBase):
def __init__(self, pool):
self.db_pool = pool
self.DB_Name = 'task'
def query_task(self, taskno, nodeid, area_id):
sql_query = "select * from `task` where nodeid='%s' and taskno='%s' and area_id = %s" % (nodeid, taskno, area_id)
tasks = self.do_select(sql_query)
if len(tasks) != 1:
logging.error('query_ledger error! %s' % (sql_query))
return None
return tasks[0]
def query_task_executor(self, nodeid, area_id):
sql_query = "select user_name from user.user where userno in (select userno from user.area_user where nodeid = '%s' and area_id = %s) and department = '信号调优团队'" % (nodeid, area_id)
executors = self.do_select(sql_query)
if len(executors) <= 0:
logging.error('query_task_executor is null! %s' % (sql_query))
return []
return executors
def query_task_src(self, nodeid, area_id):
sql_query = "select distinct src from `task_src` where nodeid='%s' and area_id = %s" % (nodeid, area_id)
srcs = self.do_select(sql_query)
if len(srcs) <= 0:
logging.error('query_task_src is null! %s' % (sql_query))
return []
return srcs
def query_task_type(self, nodeid, area_id):
sql_query = "select * from `task_type` where nodeid='%s' and area_id = %s" % (nodeid, area_id)
types = self.do_select(sql_query)
if len(types) <= 0:
logging.error('query_task_type is null! %s' % (sql_query))
return []
return types
def query_all_tak_list(self, nodeid, area_id):
"""查询路口的全部台账履历"""
sql_query = "select * from `task` where nodeid='%s' and area_id = %s and record_state=0 order by task_state asc, progress asc, plan_end_time desc" % (nodeid, area_id)
res_list = self.do_select(sql_query)
for res in res_list:
logging.info(res['update_time'])
if res['update_time'] is not None:
res['update_time'] = res['update_time'].strftime('%Y-%m-%d %H:%M:%S')
return res_list
def query_task_list(self, task_name,task_type,data_type, task_class,plan_begin_time,plan_end_time,publish_time,task_state,executor,task_src, nodeid):
"""查询路口的全部台账履历"""
sql_query = "select * from `task` where nodeid='%s' and record_state=0" % (nodeid)
if task_name is not None and len(task_name)>0:
sql_query += " and task_name like '%%%s%%'" %(task_name)
if task_type is not None and len(task_type)>0:
sql_query += " and task_type='%s'" %(task_type)
if data_type is not None and len(data_type) > 0:
data_type += " and task_type='%s'" % (data_type)
if task_class is not None and len(task_class)>0:
sql_query += " and task_class='%s'" %(task_class)
if plan_begin_time is not None and len(plan_begin_time)>0:
sql_query += " and plan_begin_time='%s'" %(plan_begin_time)
if plan_end_time is not None and len(plan_end_time)>0:
sql_query += " and plan_end_time='%s'" %(plan_end_time)
if publish_time is not None and len(publish_time)>0:
sql_query += " and publish_time='%s'" %(publish_time)
if task_state is not None and len(task_state)>0:
sql_query += " and task_state='%s'" %(task_state)
if executor is not None and len(executor)>0:
sql_query += " and executor='%s'" %(executor)
if task_src is not None and len(task_src) > 0:
sql_query += " and task_src='%s'" % (task_src)
res_list = self.do_select(sql_query)
if len(res_list) > 0:
for res in res_list:
logging.info(res['update_time'])
if res['update_time'] is not None:
res['update_time'] = res['update_time'].strftime('%Y-%m-%d %H:%M:%S')
return res_list
def query_completed_task_cross_list(self, nodeid, area_id):
sql_query = "select crossid,name from tmnet.cross where crossid in (select crossids from task where task_state=4 and nodeid='%s') and nodeid='%s';" % (nodeid, nodeid)
return self.do_select(sql_query)
def query_completed_task_by_cross(self, crossid, nodeid, area_id):
sql_query = "select * from task where task_state=4 and crossids='%s' and nodeid='%s' and area_id = %s;" % (crossid, nodeid, area_id)
return self.do_select(sql_query)
def get_last_taskno(self, creatorid, task_name, nodeid):
sql_query = "select max(taskno) as taskno from task where creatorid='%s' and task_name='%s' and nodeid='%s';" % (creatorid,task_name, nodeid)
tasknos = self.do_select(sql_query)
if len(tasknos) != 1:
logging.error('get_last_taskno error! %s' % (sql_query))
return None
return tasknos[0]['taskno']
def get_task_no(self, nodeid, area_id, task_name, task_type, task_class, data_type, plan_begin_time, plan_end_time, executor, task_src, comment):
sql = f"""
select taskno from task where nodeid = '{nodeid}' and area_id = {area_id} and task_name = '{task_name}' and task_type = {task_type}
and task_class = {task_class} and data_type = {data_type} and plan_begin_time = {plan_begin_time}
and plan_end_time = {plan_end_time} and executor = '{executor}' and task_src = '{task_src}' and comment = '{comment}'
"""
res = self.do_select(sql)
if len(res) > 0:
return res[0]['taskno']
return None
def delete_task(self, taskno, nodeid, area_id):
taskno_str = ','.join(str(num) for num in taskno)
sql = "update `task` set record_state=1 where taskno in (%s) and nodeid='%s' and area_id = %s;" % (taskno_str, nodeid, area_id)
# 执行删除操作
return self.do_execute(sql)
def query_task_history(self, taskno, nodeid, area_id):
"""查询路口的全部台账履历"""
sql_query = "select * from task_history where taskno='%s' and nodeid='%s' and area_id = %s order by history_date;" % (taskno, nodeid, area_id)
return self.do_select(sql_query)
def query_task_progress_history(self, yesterday_date, nodeid, area_id):
"""查询路口的全部台账履历"""
sql_query = (f"""
select
t2.task_name,
content
from
(select * from task.task_history where date(history_date)='{yesterday_date}' and nodeid='{nodeid}' and area_id = {area_id} and content like '%%operation%%任务进度%%content%%' order by history_date) t1
left join
(select task_name,taskno from task.task) t2
on t1.taskno = t2.taskno;
""")
return self.do_select(sql_query)
def add_task_history(self, taskno, nodeid, area_id, operator,remark):
"""查询路口的全部台账履历"""
now = datetime.now()
time_str = now.strftime('%Y-%m-%d %H:%M:%S')
sql_insert = "insert into task_history (taskno,nodeid, area_id, operator,content, history_date) values('%s',%s,'%s','%s','%s','%s')" % (taskno, nodeid, area_id, operator,remark, time_str)
count = self.do_execute(sql_insert)
if count != 1:
logging.error('add_task_history error! %s' % (sql_insert))
return 0
return count
def add_task(self, timestamp, creatorid, task_name, task_type, data_type, plan_begin_time, plan_end_time,publish_time,
executor, progress, task_state, description, crossids, waveid, wave_name, comment,
record_state, task_src, task_class, nodeid, area_id, task_type_class, full_review):
sql_insert = "insert into task (timestamp, creatorid, task_name, task_type, data_type, plan_begin_time, plan_end_time,publish_time," \
" executor, progress, task_state, description, crossids, waveid, wave_name, comment," \
" record_state, task_src, task_class, nodeid, area_id, task_type_class, full_review) values('%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s', %s, %s, %s)" % (
timestamp, creatorid, task_name, task_type, data_type, plan_begin_time, plan_end_time,publish_time,
executor, progress, task_state, description, crossids, waveid, wave_name, comment,
record_state, task_src, task_class, nodeid, area_id, task_type_class, full_review)
count = self.do_execute(sql_insert)
if count != 1:
logging.error('add_task error! %s' % (sql_insert))
return 0
return count
def update_task_info(self, taskno, modify_data, nodeid, area_id):
update_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
sql = f"update task.task set %s, update_time = '%s' where taskno = '%s' and nodeid = %s and area_id = %s" %(modify_data, update_time, taskno, nodeid, area_id)
return self.do_execute(sql)
##########################################################################
def insert_upload_file_record(self, taskno, nodeid, download_url, area_id):
sql = f"""
insert into task.task_upload_file_record (taskno, nodeid, download_url, area_id)
values (%d , %d, '%s', %s)
""" % (int(taskno), int(nodeid), download_url, area_id)
return self.do_execute(sql)
def query_task_file(self, nodeid, area_id):
sql = f"""
select * from task.task_upload_file_record where nodeid = %d and area_id = %s
""" % (int(nodeid), area_id)
row_list = self.do_select(sql)
tmp_dict = {}
for row in row_list:
if row['taskno'] not in tmp_dict.keys():
tmp_dict[row['taskno']] = [{
'id': row['id'],
'download_url': row['download_url']
}]
else:
tmp_dict[row['taskno']].append({
'id': row['id'],
'download_url': row['download_url']
})
return tmp_dict
def query_task_file_by_taskno(self, nodeid, taskno, area_id):
sql = f"""
select id, download_url from task.task_upload_file_record where nodeid = %d and taskno = %d and area_id = %d
""" % (int(nodeid), int(taskno), int(area_id))
row_list = self.do_select(sql)
download_url_list = []
for row in row_list:
download_url_list.append({
'id': row['id'],
'download_url': row['download_url']
})
return download_url_list
def del_task_file(self, nodeid, taskno, file_id):
sql = f"""
delete from task.task_upload_file_record where nodeid = %d and taskno = %d and id = %d
""" % (int(nodeid), int(taskno), int(file_id))
return self.do_execute(sql)
def update_task_state(self, nodeid, area_id, task_no, state):
sql = "update task set task_state='%s' where nodeid='%s' and area_id='%s' and taskno='%s'" % (state, nodeid, area_id, task_no)
return self.do_execute(sql)
def query_ledger_task_list(self, nodeid, area_id):
sql = f"""
select distinct crossid from ledger_task_detail where task_no in (select taskno from task.task where nodeid = %d and area_id = %d and task_type_class = 1 and record_state != 1 and task_state != 4) and (submit_status is null or submit_status != 2)
""" % (int(nodeid), int(area_id))
return self.do_select(sql)
def query_ledger_task_crosses_info(self, task_no):
sql = """
select * from ledger_task_detail where task_no = %s
""" % task_no
return self.do_select(sql)
def query_ledger_task_crosses_info_by_area_id(self, nodeid, area_id):
sql = """
select * from ledger_task_detail where nodeid = %s and area_id = %s
""" % (nodeid, area_id)
res = self.do_select(sql)
task_info_dict = {}
for row in res:
task_no = row['task_no']
if task_no not in task_info_dict.keys():
task_info_dict[task_no] = []
task_info_dict[task_no].append(row)
task_res_dict = {}
for task_no in task_info_dict.keys():
task_cross_num = len(task_info_dict[task_no])
entered_cross_num = len([cross for cross in task_info_dict[task_no] if cross['ledger_status'] == 2 and cross['phase_status'] in (1, 3)])
approve_cross_num = len([cross for cross in task_info_dict[task_no] if cross['submit_status'] == 2])
entered_percent = int(entered_cross_num * 100 / task_cross_num) if task_cross_num > 0 else 0
approve_percent = int(approve_cross_num * 100 / task_cross_num) if task_cross_num > 0 else 0
task_res_dict[task_no] = {
'task_no': task_no,
'task_cross_num': task_cross_num,
'entered_cross_num': entered_cross_num,
'entered_percent': entered_percent,
'approve_cross_num': approve_cross_num,
'approve_percent': approve_percent
}
return task_res_dict
def query_ledger_task_crosses_pics(self, crossid_list):
crossids = "'" + "', '".join(item for item in crossid_list) + "'"
sql = """
select * from tmnet.user_upload_cross_pics where crossid in (%s)
""" % crossids
return self.do_select(sql)
def drop_old_task_cross(self, crossid_list, old_task_no, nodeid, area_id):
crossids = "'" + "', '".join(item for item in crossid_list) + "'"
sql = """
delete from ledger_task_detail where task_no = %s and crossid in (%s)
""" % (old_task_no, crossids)
old_task_info = self.query_task(old_task_no, nodeid, area_id)
old_crossids = old_task_info['crossids'].split(',')
drop_crossids = list(set(old_crossids) - set(crossid_list))
update_task_info_sql = """
update task set crossids = '%s' where taskno = %s and nodeid = %s and area_id = %s
""" % (','.join(drop_crossids), old_task_no, nodeid, area_id)
conn, cursor = self.connect()
try:
conn.begin()
logging.info('drop_old_task_cross: %s' % (sql))
logging.info('drop_old_task_cross: %s' % (update_task_info_sql))
update_ret = cursor.execute(update_task_info_sql)
del_ret = cursor.execute(sql)
if update_ret == 1 and del_ret == len(crossid_list):
conn.commit()
return True
else:
conn.rollback()
return False
except Exception as e:
logging.error(e)
conn.rollback()
return False
def insert_ledger_task_cross(self, insert_list, crossid_list, task_no, nodeid):
crossids = "'" + "', '".join(item for item in crossid_list) + "'"
sql = """
insert into ledger_task_detail (task_no, crossid, nodeid, area_id) values(%s, %s, %s, %s)
"""
query_cross_ledger_status_sql = """
select crossid, nodeid, status from tmnet.ledger_entering_status where crossid in (%s)
""" % crossids
cross_ledger_status_values = set()
ledger_status_info = self.do_select(query_cross_ledger_status_sql)
cross_ledger_info_dict = {row['crossid']: row['status'] for row in ledger_status_info}
for crossid in crossid_list:
if crossid in cross_ledger_info_dict.keys():
cross_ledger_status_values.add(f"when '{crossid}' then {cross_ledger_info_dict[crossid]} ")
else:
cross_ledger_status_values.add(f"when '{crossid}' then 0 ")
# logging.error(cross_ledger_status_values)
update_ledger_task_cross_status_sql = f"""
update ledger_task_detail set ledger_status = case crossid {' '.join(list(cross_ledger_status_values))} end where crossid in ({crossids}) and nodeid = {nodeid} and task_no = {task_no}
"""
logging.info(update_ledger_task_cross_status_sql)
conn, cursor = self.connect()
try:
conn.begin()
ret = cursor.executemany(sql, insert_list)
if ret == len(insert_list):
update_ledger_status_ret = cursor.execute(update_ledger_task_cross_status_sql)
if update_ledger_status_ret == len(cross_ledger_status_values):
conn.commit()
return True
else:
conn.rollback()
logging.error("update ledger_entering_status fail")
return False
else:
conn.rollback()
return False
except Exception as e:
logging.error(e)
conn.rollback()
return False
def query_ledger_task_cross_record(self, task_no, crossid):
sql = "select * from ledger_task_detail where task_no = %s and crossid = '%s'" % (task_no, crossid)
res = self.do_select(sql)
if res:
return res[0]
return None
def update_ledger_task_cross_record(self, task_no, crossid, field_name, value):
sql = "update ledger_task_detail set %s = %s where task_no = '%s' and crossid = '%s'" % (field_name, value, task_no, crossid)
return self.do_execute(sql)
def approval_ledger_task_cross(self, task_no, crossid, status, approver, approver_id, now_time):
sql = """
update ledger_task_detail set submit_status = %s, approver = '%s', approver_id = '%s', approver_time = '%s' where task_no = '%s' and crossid = '%s'
""" % (status, approver, approver_id, now_time, task_no, crossid)
return self.do_execute(sql)
def query_ledger_task_crosses(self, taskno):
sql = """
select * from ledger_task_detail where task_no = %s
""" % taskno
return self.do_select(sql)
def query_cross_entering_status(self, crossid_list):
crossids = "'" + "', '".join(item for item in crossid_list) + "'"
sql = """
select crossid, ledger_status, phase_status, update_time from ledger_task_detail where crossid in (%s) order by update_time asc
""" % crossids
return self.do_select(sql)
def query_greenwave_task_requirement_validation_info_sql(self, task_no, nodeid, area_id):
sql = """
select * from greenwave_task_detail where task_no = %s and nodeid = %s and area_id = %s
""" % (task_no, nodeid, area_id)
res = self.do_select(sql)
if res:
return res[0]['requirement_validation']
return None
#
# if __name__ == '__main__':
# tt_5min = get_latest_5min_timestamp()
# print(tt_5min)
# print(get_today_str())