766 lines
37 KiB
Python
766 lines
37 KiB
Python
# -*- coding: utf-8 -*-
|
||
# @Author: Owl
|
||
# @Date: 2025/11/10 17:59
|
||
# @Description:
|
||
# -*- coding:utf-8 -*-
|
||
#import logging
|
||
import json
|
||
import logging
|
||
|
||
import pymysql
|
||
import pymysql.cursors
|
||
from datetime import datetime
|
||
|
||
from flask import g
|
||
from app.db_func_base import *
|
||
|
||
|
||
class TaskDbHelper(TableDbHelperBase):
|
||
|
||
def __init__(self, pool):
|
||
self.db_pool = pool
|
||
self.DB_Name = 'task'
|
||
|
||
|
||
|
||
def query_task(self, taskno, nodeid, area_id):
|
||
sql_query = "select * from task.`task` where nodeid='%s' and taskno='%s' and area_id = %s" % (nodeid, taskno, area_id)
|
||
tasks = self.do_select(sql_query)
|
||
if len(tasks) != 1:
|
||
logging.error('query_ledger error! %s' % (sql_query))
|
||
return None
|
||
return tasks[0]
|
||
|
||
def query_task_executor(self, nodeid, area_id):
|
||
sql_query = "select user_name, userno from user.user where userno in (select userno from user.area_user where nodeid = '%s' and area_id = %s) and department = '信号调优团队'" % (nodeid, area_id)
|
||
executors = self.do_select(sql_query)
|
||
if len(executors) <= 0:
|
||
logging.error('query_task_executor is null! %s' % (sql_query))
|
||
return []
|
||
return executors
|
||
|
||
|
||
def query_task_src(self, nodeid, area_id):
|
||
sql_query = "select distinct src from task.`task_src` where nodeid='%s' and area_id = %s" % (nodeid, area_id)
|
||
srcs = self.do_select(sql_query)
|
||
if len(srcs) <= 0:
|
||
logging.error('query_task_src is null! %s' % (sql_query))
|
||
return []
|
||
return srcs
|
||
|
||
def query_task_type(self, nodeid, area_id):
|
||
sql_query = "select * from task.`task_type` where (nodeid='%s' or nodeid = 0) and (area_id = %s or area_id = 0)" % (nodeid, area_id)
|
||
types = self.do_select(sql_query)
|
||
if len(types) <= 0:
|
||
logging.error('query_task_type is null! %s' % (sql_query))
|
||
return []
|
||
return types
|
||
|
||
def query_all_tak_list(self, nodeid, area_id):
|
||
"""查询路口的全部台账履历"""
|
||
sql_query = "select * from task.`task` where nodeid='%s' and area_id = %s and record_state=0 order by task_state asc, progress asc, plan_end_time desc" % (nodeid, area_id)
|
||
res_list = self.do_select(sql_query)
|
||
for res in res_list:
|
||
# logging.info(res['update_time'])
|
||
if res['update_time'] is not None:
|
||
res['update_time'] = res['update_time'].strftime('%Y-%m-%d %H:%M:%S')
|
||
return res_list
|
||
|
||
def query_task_list(self, task_name,task_type,data_type, task_class,plan_begin_time,plan_end_time,publish_time,task_state,executor,task_src, nodeid):
|
||
"""查询路口的全部台账履历"""
|
||
sql_query = "select * from task.`task` where nodeid='%s' and record_state=0" % (nodeid)
|
||
|
||
if task_name is not None and len(task_name)>0:
|
||
sql_query += " and task_name like '%%%s%%'" %(task_name)
|
||
|
||
if task_type is not None and len(task_type)>0:
|
||
sql_query += " and task_type='%s'" %(task_type)
|
||
|
||
if data_type is not None and len(data_type) > 0:
|
||
data_type += " and task_type='%s'" % (data_type)
|
||
|
||
if task_class is not None and len(task_class)>0:
|
||
sql_query += " and task_class='%s'" %(task_class)
|
||
|
||
if plan_begin_time is not None and len(plan_begin_time)>0:
|
||
sql_query += " and plan_begin_time='%s'" %(plan_begin_time)
|
||
|
||
if plan_end_time is not None and len(plan_end_time)>0:
|
||
sql_query += " and plan_end_time='%s'" %(plan_end_time)
|
||
|
||
if publish_time is not None and len(publish_time)>0:
|
||
sql_query += " and publish_time='%s'" %(publish_time)
|
||
|
||
if task_state is not None and len(task_state)>0:
|
||
sql_query += " and task_state='%s'" %(task_state)
|
||
|
||
if executor is not None and len(executor)>0:
|
||
sql_query += " and executor='%s'" %(executor)
|
||
|
||
if task_src is not None and len(task_src) > 0:
|
||
sql_query += " and task_src='%s'" % (task_src)
|
||
res_list = self.do_select(sql_query)
|
||
if len(res_list) > 0:
|
||
for res in res_list:
|
||
# logging.info(res['update_time'])
|
||
if res['update_time'] is not None:
|
||
res['update_time'] = res['update_time'].strftime('%Y-%m-%d %H:%M:%S')
|
||
return res_list
|
||
|
||
def query_completed_task_cross_list(self, nodeid, area_id):
|
||
sql_query = "select crossid,name from tmnet.cross where crossid in (select crossids from task where task_state=4 and nodeid='%s') and nodeid='%s';" % (nodeid, nodeid)
|
||
|
||
return self.do_select(sql_query)
|
||
|
||
def query_completed_task_by_cross(self, crossid, nodeid, area_id):
|
||
sql_query = "select * from task.task where task_state=4 and crossids='%s' and nodeid='%s' and area_id = %s;" % (crossid, nodeid, area_id)
|
||
|
||
return self.do_select(sql_query)
|
||
|
||
def get_last_taskno(self, creatorid, task_name, nodeid):
|
||
sql_query = "select max(taskno) as taskno from task.task where creatorid='%s' and task_name='%s' and nodeid='%s';" % (creatorid,task_name, nodeid)
|
||
tasknos = self.do_select(sql_query)
|
||
|
||
if len(tasknos) != 1:
|
||
logging.error('get_last_taskno error! %s' % (sql_query))
|
||
return None
|
||
return tasknos[0]['taskno']
|
||
|
||
def get_task_no(self, nodeid, area_id, task_name, task_type, task_class, data_type, plan_begin_time, plan_end_time, executor, task_src, comment):
|
||
sql = f"""
|
||
select taskno from task.task where nodeid = '{nodeid}' and area_id = {area_id} and task_name = '{task_name}' and task_type = {task_type}
|
||
and task_class = {task_class} and data_type = {data_type} and plan_begin_time = {plan_begin_time}
|
||
and plan_end_time = {plan_end_time} and executor = '{executor}' and task_src = '{task_src}' and comment = '{comment}'
|
||
"""
|
||
res = self.do_select(sql)
|
||
if len(res) > 0:
|
||
return res[0]['taskno']
|
||
return None
|
||
|
||
def delete_task(self, taskno, nodeid, area_id):
|
||
taskno_str = ','.join(str(num) for num in taskno)
|
||
sql = "update task.`task` set record_state=1 where taskno in (%s) and nodeid='%s' and area_id = %s;" % (taskno_str, nodeid, area_id)
|
||
# 执行删除操作
|
||
return self.do_execute(sql)
|
||
|
||
def query_task_history(self, taskno, nodeid, area_id):
|
||
"""查询路口的全部台账履历"""
|
||
sql_query = "select * from task.task_history where taskno='%s' and nodeid='%s' and area_id = %s order by history_date;" % (taskno, nodeid, area_id)
|
||
return self.do_select(sql_query)
|
||
|
||
def query_task_progress_history(self, yesterday_date, nodeid, area_id):
|
||
"""查询路口的全部台账履历"""
|
||
|
||
sql_query = (f"""
|
||
select
|
||
t2.task_name,
|
||
content
|
||
from
|
||
(select * from task.task_history where date(history_date)='{yesterday_date}' and nodeid='{nodeid}' and area_id = {area_id} and content like '%%operation%%任务进度%%content%%' order by history_date) t1
|
||
left join
|
||
(select task_name,taskno from task.task) t2
|
||
on t1.taskno = t2.taskno;
|
||
""")
|
||
|
||
return self.do_select(sql_query)
|
||
|
||
def add_task_history(self, taskno, nodeid, area_id, operator,remark):
|
||
"""查询路口的全部台账履历"""
|
||
now = datetime.now()
|
||
time_str = now.strftime('%Y-%m-%d %H:%M:%S')
|
||
sql_insert = "insert into task.task_history (taskno,nodeid, area_id, operator,content, history_date) values('%s',%s,'%s','%s','%s','%s')" % (taskno, nodeid, area_id, operator,remark, time_str)
|
||
count = self.do_execute(sql_insert)
|
||
if count != 1:
|
||
logging.error('add_task_history error! %s' % (sql_insert))
|
||
return 0
|
||
return count
|
||
|
||
def add_task(self, timestamp, creatorid, task_name, task_type, data_type, plan_begin_time, plan_end_time,publish_time,
|
||
executor, progress, task_state, description, crossids, waveid, wave_name, comment,
|
||
record_state, task_src, task_class, nodeid, area_id, task_type_class, full_review):
|
||
sql_insert = "insert into task.task (timestamp, creatorid, task_name, task_type, data_type, plan_begin_time, plan_end_time,publish_time," \
|
||
" executor, progress, task_state, description, crossids, waveid, wave_name, comment," \
|
||
" record_state, task_src, task_class, nodeid, area_id, task_type_class, full_review) values('%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s', %s, %s, %s)" % (
|
||
timestamp, creatorid, task_name, task_type, data_type, plan_begin_time, plan_end_time,publish_time,
|
||
executor, progress, task_state, description, crossids, waveid, wave_name, comment,
|
||
record_state, task_src, task_class, nodeid, area_id, task_type_class, full_review)
|
||
|
||
count = self.do_execute(sql_insert)
|
||
if count != 1:
|
||
logging.error('add_task error! %s' % (sql_insert))
|
||
return 0
|
||
return count
|
||
|
||
def update_task_info(self, taskno, modify_data, nodeid, area_id):
|
||
update_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
||
sql = f"update task.task set %s, update_time = '%s' where taskno = '%s' and nodeid = %s and area_id = %s" %(modify_data, update_time, taskno, nodeid, area_id)
|
||
return self.do_execute(sql)
|
||
##########################################################################
|
||
|
||
def insert_upload_file_record(self, taskno, nodeid, download_url, area_id):
|
||
sql = f"""
|
||
insert into task.task_upload_file_record (taskno, nodeid, download_url, area_id)
|
||
values (%d , %d, '%s', %s)
|
||
""" % (int(taskno), int(nodeid), download_url, area_id)
|
||
return self.do_execute(sql)
|
||
|
||
def query_task_file(self, nodeid, area_id):
|
||
sql = f"""
|
||
select * from task.task_upload_file_record where nodeid = %d and area_id = %s
|
||
""" % (int(nodeid), area_id)
|
||
row_list = self.do_select(sql)
|
||
tmp_dict = {}
|
||
for row in row_list:
|
||
if row['taskno'] not in tmp_dict.keys():
|
||
tmp_dict[row['taskno']] = [{
|
||
'id': row['id'],
|
||
'download_url': row['download_url']
|
||
}]
|
||
else:
|
||
tmp_dict[row['taskno']].append({
|
||
'id': row['id'],
|
||
'download_url': row['download_url']
|
||
})
|
||
return tmp_dict
|
||
|
||
def query_task_file_by_taskno(self, nodeid, taskno, area_id):
|
||
sql = f"""
|
||
select id, download_url from task.task_upload_file_record where nodeid = %d and taskno = %d and area_id = %d
|
||
""" % (int(nodeid), int(taskno), int(area_id))
|
||
row_list = self.do_select(sql)
|
||
download_url_list = []
|
||
for row in row_list:
|
||
download_url_list.append({
|
||
'id': row['id'],
|
||
'download_url': row['download_url']
|
||
})
|
||
return download_url_list
|
||
|
||
def del_task_file(self, nodeid, taskno, file_id):
|
||
sql = f"""
|
||
delete from task.task_upload_file_record where nodeid = %d and taskno = %d and id = %d
|
||
""" % (int(nodeid), int(taskno), int(file_id))
|
||
return self.do_execute(sql)
|
||
|
||
def update_task_state(self, nodeid, area_id, task_no, state):
|
||
sql = "update task set task_state='%s' where nodeid='%s' and area_id='%s' and taskno='%s'" % (state, nodeid, area_id, task_no)
|
||
return self.do_execute(sql)
|
||
|
||
def query_ledger_task_list(self, nodeid, area_id):
|
||
sql = f"""
|
||
select distinct crossid from task.ledger_task_detail where task_no in (select taskno from task.task where nodeid = %d and area_id = %d and task_type_class = 1 and record_state != 1 and task_state != 4) and (submit_status is null or submit_status != 2)
|
||
""" % (int(nodeid), int(area_id))
|
||
return self.do_select(sql)
|
||
|
||
def query_ledger_task_crosses_info(self, task_no):
|
||
sql = """
|
||
select * from ledger_task_detail where task_no = %s
|
||
""" % task_no
|
||
return self.do_select(sql)
|
||
|
||
def query_ledger_task_crosses_info_by_area_id(self, nodeid, area_id):
|
||
sql = """
|
||
select * from task.ledger_task_detail where nodeid = %s and area_id = %s
|
||
""" % (nodeid, area_id)
|
||
res = self.do_select(sql)
|
||
task_info_dict = {}
|
||
for row in res:
|
||
task_no = row['task_no']
|
||
if task_no not in task_info_dict.keys():
|
||
task_info_dict[task_no] = []
|
||
task_info_dict[task_no].append(row)
|
||
task_res_dict = {}
|
||
for task_no in task_info_dict.keys():
|
||
task_cross_num = len(task_info_dict[task_no])
|
||
entered_cross_num = len([cross for cross in task_info_dict[task_no] if cross['ledger_status'] == 2 and cross['phase_status'] in (1, 3)])
|
||
approve_cross_num = len([cross for cross in task_info_dict[task_no] if cross['submit_status'] == 2])
|
||
entered_percent = int(entered_cross_num * 100 / task_cross_num) if task_cross_num > 0 else 0
|
||
approve_percent = int(approve_cross_num * 100 / task_cross_num) if task_cross_num > 0 else 0
|
||
task_res_dict[task_no] = {
|
||
'task_no': task_no,
|
||
'task_cross_num': task_cross_num,
|
||
'entered_cross_num': entered_cross_num,
|
||
'entered_percent': entered_percent,
|
||
'approve_cross_num': approve_cross_num,
|
||
'approve_percent': approve_percent
|
||
}
|
||
return task_res_dict
|
||
|
||
def query_ledger_task_crosses_pics(self, crossid_list):
|
||
crossids = "'" + "', '".join(item for item in crossid_list) + "'"
|
||
sql = """
|
||
select * from tmnet.user_upload_cross_pics where crossid in (%s)
|
||
""" % crossids
|
||
return self.do_select(sql)
|
||
|
||
def drop_old_task_cross(self, crossid_list, old_task_no, nodeid, area_id):
|
||
crossids = "'" + "', '".join(item for item in crossid_list) + "'"
|
||
sql = """
|
||
delete from task.ledger_task_detail where task_no = %s and crossid in (%s)
|
||
""" % (old_task_no, crossids)
|
||
old_task_info = self.query_task(old_task_no, nodeid, area_id)
|
||
old_crossids = old_task_info['crossids'].split(',')
|
||
drop_crossids = list(set(old_crossids) - set(crossid_list))
|
||
update_task_info_sql = """
|
||
update task.task set crossids = '%s' where taskno = %s and nodeid = %s and area_id = %s
|
||
""" % (','.join(drop_crossids), old_task_no, nodeid, area_id)
|
||
conn, cursor = self.connect()
|
||
try:
|
||
conn.begin()
|
||
logging.info('drop_old_task_cross: %s' % (sql))
|
||
logging.info('drop_old_task_cross: %s' % (update_task_info_sql))
|
||
update_ret = cursor.execute(update_task_info_sql)
|
||
del_ret = cursor.execute(sql)
|
||
if update_ret == 1 and del_ret == len(crossid_list):
|
||
conn.commit()
|
||
return True
|
||
else:
|
||
conn.rollback()
|
||
return False
|
||
except Exception as e:
|
||
logging.error(e)
|
||
conn.rollback()
|
||
return False
|
||
|
||
def insert_ledger_task_cross(self, insert_list, crossid_list, task_no, nodeid):
|
||
crossids = "'" + "', '".join(item for item in crossid_list) + "'"
|
||
sql = """
|
||
insert into task.ledger_task_detail (task_no, crossid, nodeid, area_id) values(%s, %s, %s, %s)
|
||
"""
|
||
query_cross_ledger_status_sql = """
|
||
select crossid, nodeid, status from tmnet.ledger_entering_status where crossid in (%s)
|
||
""" % crossids
|
||
cross_ledger_status_values = set()
|
||
ledger_status_info = self.do_select(query_cross_ledger_status_sql)
|
||
cross_ledger_info_dict = {row['crossid']: row['status'] for row in ledger_status_info}
|
||
for crossid in crossid_list:
|
||
if crossid in cross_ledger_info_dict.keys():
|
||
cross_ledger_status_values.add(f"when '{crossid}' then {cross_ledger_info_dict[crossid]} ")
|
||
else:
|
||
cross_ledger_status_values.add(f"when '{crossid}' then 0 ")
|
||
# logging.error(cross_ledger_status_values)
|
||
update_ledger_task_cross_status_sql = f"""
|
||
update task.ledger_task_detail set ledger_status = case crossid {' '.join(list(cross_ledger_status_values))} end where crossid in ({crossids}) and nodeid = {nodeid} and task_no = {task_no}
|
||
"""
|
||
logging.info(update_ledger_task_cross_status_sql)
|
||
conn, cursor = self.connect()
|
||
try:
|
||
conn.begin()
|
||
ret = cursor.executemany(sql, insert_list)
|
||
if ret == len(insert_list):
|
||
update_ledger_status_ret = cursor.execute(update_ledger_task_cross_status_sql)
|
||
if update_ledger_status_ret == len(cross_ledger_status_values):
|
||
conn.commit()
|
||
return True
|
||
else:
|
||
conn.rollback()
|
||
logging.error("update ledger_entering_status fail")
|
||
return False
|
||
else:
|
||
conn.rollback()
|
||
return False
|
||
except Exception as e:
|
||
logging.error(e)
|
||
conn.rollback()
|
||
return False
|
||
|
||
def query_ledger_task_cross_record(self, task_no, crossid):
|
||
sql = "select * from task.ledger_task_detail where task_no = %s and crossid = '%s'" % (task_no, crossid)
|
||
res = self.do_select(sql)
|
||
if res:
|
||
return res[0]
|
||
return None
|
||
|
||
def update_ledger_task_cross_record(self, task_no, crossid, field_name, value):
|
||
sql = "update task.ledger_task_detail set %s = %s where task_no = '%s' and crossid = '%s'" % (field_name, value, task_no, crossid)
|
||
return self.do_execute(sql)
|
||
|
||
def approval_ledger_task_cross(self, task_no, crossid, status, approver, approver_id, now_time):
|
||
sql = """
|
||
update task.ledger_task_detail set submit_status = %s, approver = '%s', approver_id = '%s', approver_time = '%s' where task_no = '%s' and crossid = '%s'
|
||
""" % (status, approver, approver_id, now_time, task_no, crossid)
|
||
return self.do_execute(sql)
|
||
|
||
def query_ledger_task_crosses(self, taskno):
|
||
sql = """
|
||
select * from task.ledger_task_detail where task_no = %s
|
||
""" % taskno
|
||
return self.do_select(sql)
|
||
|
||
def query_all_ledger_task_crosses_info(self, nodeid, area_id):
|
||
sql = """
|
||
select * from task.ledger_task_detail where nodeid = %s and area_id = %s
|
||
""" % (nodeid, area_id)
|
||
res = self.do_select(sql)
|
||
task_cross_dict = {}
|
||
for row in res:
|
||
if row['task_no'] not in task_cross_dict.keys():
|
||
task_cross_dict[row['task_no']] = [row]
|
||
else:
|
||
task_cross_dict[row['task_no']].append(row)
|
||
return task_cross_dict
|
||
|
||
def query_cross_entering_status(self, crossid_list):
|
||
crossids = "'" + "', '".join(item for item in crossid_list) + "'"
|
||
sql = """
|
||
select crossid, ledger_status, phase_status, update_time from task.ledger_task_detail where crossid in (%s) order by update_time asc
|
||
""" % crossids
|
||
return self.do_select(sql)
|
||
|
||
def query_greenwave_task_requirement_validation_info_sql(self, task_no):
|
||
sql = """
|
||
select * from task.greenwave_task_detail where task_no = %s
|
||
""" % (task_no)
|
||
res = self.do_select(sql)
|
||
if res:
|
||
return res[0]
|
||
return None
|
||
|
||
def query_wave_task_additional_info_sql(self, task_no, nodeid, area_id):
|
||
sql = """
|
||
select * from task.greenwave_task_additional_detail where task_no = %s and nodeid = %s and area_id = %s
|
||
""" % (task_no, nodeid, area_id)
|
||
res = self.do_select(sql)
|
||
if res:
|
||
return res[0]
|
||
return None
|
||
|
||
def query_wave_task_crosses(self, task_no):
|
||
sql = """
|
||
select * from task.greenwave_task_cross_detail where task_no = %s
|
||
""" % (task_no)
|
||
return self.do_select(sql)
|
||
|
||
def query_wave_task_tp_info(self, task_no, nodeid, area_id):
|
||
sql = """
|
||
select * from task.greenwave_task_tp_detail where task_no = %s and nodeid = %s and area_id = %s
|
||
""" % (task_no, nodeid, area_id)
|
||
return self.do_select(sql)
|
||
|
||
def query_task_check_res(self, task_no, nodeid, area_id):
|
||
sql = """
|
||
select * from task.greenwave_task_result_addirm_detail where task_no = %s and nodeid = %s and area_id = %s
|
||
""" % (task_no, nodeid, area_id)
|
||
res = self.do_select(sql)
|
||
tp_info_dict = {}
|
||
for row in res:
|
||
if row['tp_info_id'] not in tp_info_dict.keys():
|
||
tp_info_dict[row['tp_info_id']] = []
|
||
tp_info_dict[row['tp_info_id']].append(row)
|
||
return tp_info_dict
|
||
|
||
def query_greenwave_task_adjustment_record(self, task_no):
|
||
sql = """
|
||
select * from task.greenwave_task_tiny_adjustment_record where task_no = %s
|
||
""" % (task_no)
|
||
return self.do_select(sql)
|
||
|
||
def save_task_require_confirm_info(self, task_no, nodeid, area_id, require_confirm_info, creator_id, creator):
|
||
base_info = require_confirm_info['base_info']
|
||
cross_list = require_confirm_info['cross_list']
|
||
tp_info = require_confirm_info['tp_info']
|
||
insert_base_info_sql = """
|
||
insert into task.greenwave_task_detail(task_no, wave_name, waveid, start_cross_name, end_cross_name, cross_num,
|
||
task_src, plan_start_time, plan_end_time, predict_issue_time, is_urgency, wave_status, slc_company,
|
||
max_phase_position, left_coor, comment, creator_id, creator, executor_id, executor) values (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s,
|
||
%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
|
||
"""
|
||
params = (
|
||
task_no,
|
||
base_info['wave_name'],
|
||
base_info['waveid'],
|
||
base_info['start_cross_name'],
|
||
base_info['end_cross_name'],
|
||
base_info['cross_num'],
|
||
base_info['task_src'],
|
||
base_info['plan_start_time'],
|
||
base_info['plan_end_time'],
|
||
base_info['predict_issue_time'],
|
||
base_info['is_urgency'],
|
||
base_info['wave_status'],
|
||
base_info['slc_company'],
|
||
base_info['max_phase_position'],
|
||
base_info['left_coor'],
|
||
base_info['comment'],
|
||
creator_id,
|
||
creator,
|
||
base_info['executor_id'],
|
||
base_info['executor']
|
||
)
|
||
insert_cross_list = []
|
||
for item in cross_list:
|
||
influence_factor = item['influence_factor']
|
||
phase_info_json = json.dumps(item['phase_info'], ensure_ascii=False)
|
||
insert_cross_list.append((task_no, item['crossid'], item['cross_name'], item['location'], influence_factor['intersect_coor'],
|
||
influence_factor['phase_pos_adjust'], influence_factor['phase_seq_adjust'], influence_factor['phase_adjust'],
|
||
influence_factor['borrow_left'], influence_factor['waiting_left'], influence_factor['waiting_straight'],
|
||
influence_factor['waiting_bicycle'], phase_info_json))
|
||
insert_cross_info_sql = """
|
||
insert into task.greenwave_task_cross_detail(task_no, crossid, cross_name, location, intersect_coor,
|
||
phase_pos_adjust, phase_seq_adjust, phase_adjust, borrow_left, waiting_left, waiting_straight,
|
||
waiting_bicycle, phase_info) values (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
|
||
"""
|
||
insert_tp_info = []
|
||
for item in tp_info:
|
||
insert_tp_info.append((task_no, item['wave_tp_id'], item['tp_start'], item['tp_end'], item['coor_dir'], item['priority_coor_dir'], item['weekday'], nodeid, area_id))
|
||
insert_tp_info_sql = """
|
||
insert into task.greenwave_task_tp_detail(task_no, wave_tp_id, tp_start, tp_end, coor_dir, priority_coor_dir, weekday,
|
||
nodeid, area_id) values (%s, %s, %s, %s, %s, %s, %s, %s, %s)
|
||
"""
|
||
insert_additional_sql = "insert into task.greenwave_task_additional_detail (task_no, nodeid, area_id) values (%s, %s, %s)"
|
||
insert_additional_params = (task_no, nodeid, area_id)
|
||
conn, cursor = self.connect()
|
||
try:
|
||
# logging.warning(cursor.mogrify(insert_base_info_sql, params))
|
||
insert_greenwave_task_ret = cursor.execute(insert_base_info_sql, params)
|
||
# logging.warning(cursor.mogrify(insert_cross_info_sql, insert_cross_list))
|
||
insert_cross_ret = cursor.executemany(insert_cross_info_sql, insert_cross_list)
|
||
# logging.warning(cursor.mogrify(insert_tp_info_sql, insert_tp_info))
|
||
insert_tp_ret = cursor.executemany(insert_tp_info_sql, insert_tp_info)
|
||
insert_additional_ret = cursor.execute(insert_additional_sql, insert_additional_params)
|
||
if insert_greenwave_task_ret == 1 and insert_cross_ret == len(cross_list) and insert_tp_ret == len(tp_info) and insert_additional_ret == 1:
|
||
conn.commit()
|
||
return True
|
||
else:
|
||
logging.error(insert_greenwave_task_ret, insert_cross_ret, len(cross_list), insert_tp_ret, len(tp_info))
|
||
conn.rollback()
|
||
return False
|
||
except Exception as e:
|
||
conn.rollback()
|
||
print(e)
|
||
logging.error(e)
|
||
return False
|
||
|
||
def init_additional_info(self, task_no, nodeid, area_id, cross_infos):
|
||
cross_values = []
|
||
for cross_info in cross_infos:
|
||
cross_values.append((task_no, cross_info['crossid'], cross_info['name'], cross_info['location']))
|
||
insert_additional_sql = "insert into task.greenwave_task_additional_detail (task_no, nodeid, area_id) values (%s, %s, %s)" % (task_no, nodeid, area_id)
|
||
insert_cross_sql = """
|
||
insert into task.greenwave_task_cross_detail(task_no, crossid, cross_name, location) values (%s, %s, %s, %s)
|
||
"""
|
||
conn, cursor = self.connect()
|
||
try:
|
||
insert_additional_ret = cursor.execute(insert_additional_sql)
|
||
insert_cross_ret = cursor.executemany(insert_cross_sql, cross_values)
|
||
if insert_additional_ret == 1 and insert_cross_ret == len(cross_infos):
|
||
conn.commit()
|
||
return True
|
||
else:
|
||
conn.rollback()
|
||
return False
|
||
except Exception as e:
|
||
conn.rollback()
|
||
print(e)
|
||
logging.error(e)
|
||
return False
|
||
|
||
def async_wave_tp_id(self, wave_tp_info, task_tp_info, task_no, nodeid, area_id):
|
||
# 补充同步绿波时段id到任务时段信息表中的逻辑
|
||
wave_tp_info_dict = {row['tp_start'] + '-' + row['tp_end'] + '-' + row['weekday']: row['wave_tp_id'] for row in wave_tp_info}
|
||
task_tp_info_dict = {row['tp_start'] + '-' + row['tp_end'] + '-' + row['weekday']: row for row in task_tp_info}
|
||
for tp_key in task_tp_info_dict:
|
||
if tp_key in wave_tp_info_dict:
|
||
task_tp_info_dict[tp_key]['wave_tp_id'] = wave_tp_info_dict[tp_key]
|
||
else:
|
||
return False
|
||
# 补充跳过需求确认单没有任务时段信息时需要将
|
||
if not task_tp_info_dict:
|
||
insert_list = []
|
||
for tp_info in wave_tp_info:
|
||
insert_list.append((task_no, tp_info['wave_tp_id'], tp_info['tp_start'], tp_info['tp_end'], tp_info['type_str'], tp_info['type_str'], tp_info['weekday'], nodeid, area_id))
|
||
insert_sql = """
|
||
insert into task.greenwave_task_tp_detail(task_no, wave_tp_id, tp_start, tp_end, coor_dir, priority_coor_dir, weekday, nodeid, area_id) values (%s, %s, %s, %s, %s, %s, %s, %s, %s)
|
||
"""
|
||
ret = self.do_executemany(insert_sql, insert_list)
|
||
return ret == len(insert_list)
|
||
else:
|
||
update_sql = """
|
||
update task.greenwave_task_tp_detail set wave_tp_id = %s where tp_start = %s and tp_end = %s and weekday = %s and task_no = %s and nodeid = %s and area_id = %s
|
||
"""
|
||
conn, cursor = self.connect()
|
||
try:
|
||
for tp_info in task_tp_info_dict.values():
|
||
cursor.execute(update_sql, (tp_info['wave_tp_id'], tp_info['tp_start'], tp_info['tp_end'], tp_info['weekday'], task_no, nodeid, area_id))
|
||
conn.commit()
|
||
return True
|
||
except Exception as e:
|
||
conn.rollback()
|
||
print(e)
|
||
return False
|
||
|
||
def update_stage1_info(self, task_no, waveid, wave_name, task_stage, task_info, wave_crosses):
|
||
conn, cursor = self.connect()
|
||
try:
|
||
if task_info['waveid'] and task_info['waveid'] != '' and task_info['wave_name'] and task_info['wave_name'] != '':
|
||
# 表示任务已经绑定绿波,则该次操作仅需要更新task_stage
|
||
update_sql = """
|
||
update task.greenwave_task_additional_detail set task_stage = %s where task_no = %s
|
||
""" % (task_stage, task_no)
|
||
cursor.execute(update_sql)
|
||
conn.commit()
|
||
return True
|
||
else:
|
||
crossids = ','.join(wave_crosses)
|
||
update_sql1 = """
|
||
update task.task set waveid = %s, wave_name = %s, crossids = %s where taskno = %s
|
||
"""
|
||
update_sql2 = """
|
||
update task.greenwave_task_additional_detail set task_stage = %s where task_no = %s
|
||
"""
|
||
cursor.execute(update_sql1, (waveid, wave_name, crossids, task_no))
|
||
cursor.execute(update_sql2, (task_stage, task_no))
|
||
conn.commit()
|
||
return True
|
||
except Exception as e:
|
||
conn.rollback()
|
||
print(e)
|
||
return False
|
||
|
||
def update_stage2_info(self, task_no, stage_info):
|
||
sql = """
|
||
update task.greenwave_task_additional_detail set task_stage = '%s' where task_no = %s
|
||
""" % (stage_info, task_no)
|
||
return self.do_execute(sql)
|
||
|
||
def update_stage3_info(self, actually_issue_time, task_no, task_stage, delay_reason):
|
||
sql = """
|
||
update task.greenwave_task_additional_detail set actually_issue_time = '%s', task_stage = '%s', delay_issue_reason = '%s' where task_no = %s
|
||
""" % (actually_issue_time, task_stage, delay_reason, task_no)
|
||
return self.do_execute(sql)
|
||
|
||
def update_stage4_info(self, task_no, task_stage, tp_info, nodeid, area_id, task_additional_info):
|
||
old_tp_info = self.query_wave_task_tp_info(task_no, nodeid, area_id)
|
||
old_tp_info_dict = {row['tp_start'] + '-' + row['tp_end']: row for row in old_tp_info}
|
||
update_tp_check_res_set, update_tp_json_set, update_tp_info_ids, has_bad = set(), set(), [], False
|
||
for tp_item in tp_info:
|
||
tp_info_id = tp_item['tp_info_id']
|
||
result_check_res = tp_item['result_check_res']
|
||
tp_key = tp_item['tp_start'] + '-' + tp_item['tp_end']
|
||
if result_check_res == 1:
|
||
has_bad = True
|
||
not_pass_detail = tp_item['not_pass_detail']
|
||
# 处理 not_pass_detail,可能是字符串、列表或 None
|
||
if not_pass_detail is None or not not_pass_detail:
|
||
not_pass_detail_json = ''
|
||
else:
|
||
# 如果是字符串,需要先处理转义字符(包括中文)
|
||
# 先替换转义的双引号和换行符等
|
||
not_pass_detail_json = json.dumps(not_pass_detail, ensure_ascii=False)
|
||
# cleaned_str = parsed.replace('\\"', '"').replace('\\\\n', '\\n')
|
||
# not_pass_detail_json = json.dumps(parsed, ensure_ascii=False)
|
||
if result_check_res != old_tp_info_dict[tp_key]['result_check_res'] or not_pass_detail_json != old_tp_info_dict[tp_key]['not_pass_detail']:
|
||
update_tp_check_res_set.add(f"when {tp_info_id} then {result_check_res} ")
|
||
update_tp_json_set.add(f"when {tp_info_id} then '{not_pass_detail_json}' ")
|
||
update_tp_info_ids.append(tp_info_id)
|
||
conn, cursor = self.connect()
|
||
try:
|
||
update_tp_info_sql = f"""
|
||
update task.greenwave_task_tp_detail
|
||
set result_check_res =
|
||
case id
|
||
{' '.join(list(update_tp_check_res_set))}
|
||
else result_check_res
|
||
end,
|
||
not_pass_detail =
|
||
case id
|
||
{' '.join(list(update_tp_json_set))}
|
||
else not_pass_detail
|
||
end
|
||
where id in ({','.join(str(id) for id in update_tp_info_ids)})
|
||
and nodeid = {nodeid}
|
||
and area_id = {area_id}
|
||
and task_no = {task_no}
|
||
"""
|
||
cursor.execute(update_tp_info_sql)
|
||
if has_bad:
|
||
update_stage_sql = """
|
||
update task.greenwave_task_additional_detail set task_stage = %s where task_no = %s
|
||
"""
|
||
cursor.execute(update_stage_sql, (task_stage, task_no))
|
||
else:
|
||
now_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
||
update_state_sql = """
|
||
update task.greenwave_task_additional_detail set task_stage = %s, affirm_time = %s where task_no = %s
|
||
"""
|
||
cursor.execute(update_state_sql, (task_stage, now_time, task_no))
|
||
conn.commit()
|
||
return True, has_bad
|
||
except Exception as e:
|
||
conn.rollback()
|
||
logging.error(e)
|
||
print(e)
|
||
return False, has_bad
|
||
|
||
def update_stage5_info(self, task_no, task_stage, affirm_time):
|
||
sql = """
|
||
update task.greenwave_task_additional_detail set task_stage = '%s', affirm_time = '%s' where task_no = %s
|
||
""" % (task_stage, affirm_time, task_no)
|
||
return self.do_execute(sql)
|
||
|
||
def update_stage7_info(self, task_no, task_stage, open_wave_monitor_job, belong_wave_monitor_job_id, belong_wave_monitor_job_name, task_additional_info, phase_update_time_record):
|
||
|
||
update_phase_list = []
|
||
crossids = "'" + "', '".join(item for item in phase_update_time_record) + "'"
|
||
for crossid in phase_update_time_record:
|
||
update_phase_list.append(f"when '{crossid}' then 1")
|
||
update_phase_sql = f"""
|
||
update task.greenwave_task_cross_detail set phase_update_status = case crossid {' '.join(update_phase_list)} else phase_update_status end where task_no = {task_no} and crossid in ({crossids})
|
||
"""
|
||
conn, cursor = self.connect()
|
||
try:
|
||
cursor.execute(update_phase_sql)
|
||
if open_wave_monitor_job != task_additional_info['open_wave_monitor_job'] or belong_wave_monitor_job_id != task_additional_info['belong_wave_monitor_job_id'] or belong_wave_monitor_job_name != task_additional_info['belong_wave_monitor_job_name']:
|
||
update_sql = """
|
||
update task.greenwave_task_additional_detail set task_stage = %s, open_wave_monitor_job = %s, belong_wave_monitor_job_id = %s, belong_wave_monitor_job_name = %s where task_no = %s
|
||
"""
|
||
cursor.execute(update_sql, (task_stage, open_wave_monitor_job, belong_wave_monitor_job_id, belong_wave_monitor_job_name, task_no))
|
||
conn.commit()
|
||
return True
|
||
else:
|
||
update_sql = """
|
||
update task.greenwave_task_additional_detail set task_stage = %s where task_no = %s
|
||
"""
|
||
cursor.execute(update_sql, (task_stage, task_no))
|
||
conn.commit()
|
||
return True
|
||
except Exception as e:
|
||
conn.rollback()
|
||
logging.error(e)
|
||
print(e)
|
||
return False
|
||
|
||
def update_additional_detail_file_info(self, stage_no, download_url, task_no, nodeid, area_id, upload_time):
|
||
if stage_no == 2:
|
||
update_sql = """
|
||
update task.greenwave_task_additional_detail set wave_optimize_url = '%s', upload_wave_optimize_time = '%s' where task_no = %s and nodeid = %s and area_id = %s
|
||
""" % (download_url, upload_time, task_no, nodeid, area_id)
|
||
else:
|
||
update_sql = """
|
||
update task.greenwave_task_additional_detail set compare_report_url = '%s', upload_compare_report_time = '%s' where task_no = %s and nodeid = %s and area_id = %s
|
||
""" % (download_url, upload_time, task_no, nodeid, area_id)
|
||
return self.do_execute(update_sql)
|
||
|
||
def insert_greenwave_task_tiny_adjustment_record(self, values):
|
||
sql = """
|
||
insert into task.greenwave_task_tiny_adjustment_record(task_no, upload_file_path, creator_id, creator) values (%s, %s, %s, %s)
|
||
"""
|
||
return self.do_executemany(sql, values)
|
||
|
||
def del_greenwave_task_tiny_adjustment_record_sql(self, id):
|
||
sql = """
|
||
delete from task.greenwave_task_tiny_adjustment_record where id = %s
|
||
""" % id
|
||
return self.do_execute(sql)
|
||
|
||
def update_task_progress(self, task_no, nodeid, area_id, progress):
|
||
sql = """
|
||
update task.task set progress = %s where taskno = %s and nodeid= %s and area_id = %s
|
||
""" % (progress, task_no, nodeid, area_id)
|
||
return self.do_execute(sql)
|
||
#
|
||
# if __name__ == '__main__':
|
||
# tt_5min = get_latest_5min_timestamp()
|
||
# print(tt_5min)
|
||
# print(get_today_str())
|