cross_doctor/app/task_db_func.py

766 lines
37 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
# @Author: Owl
# @Date: 2025/11/10 17:59
# @Description:
# -*- coding:utf-8 -*-
#import logging
import json
import logging
import pymysql
import pymysql.cursors
from datetime import datetime
from flask import g
from app.db_func_base import *
class TaskDbHelper(TableDbHelperBase):
def __init__(self, pool):
self.db_pool = pool
self.DB_Name = 'task'
def query_task(self, taskno, nodeid, area_id):
sql_query = "select * from task.`task` where nodeid='%s' and taskno='%s' and area_id = %s" % (nodeid, taskno, area_id)
tasks = self.do_select(sql_query)
if len(tasks) != 1:
logging.error('query_ledger error! %s' % (sql_query))
return None
return tasks[0]
def query_task_executor(self, nodeid, area_id):
sql_query = "select user_name, userno from user.user where userno in (select userno from user.area_user where nodeid = '%s' and area_id = %s) and department = '信号调优团队'" % (nodeid, area_id)
executors = self.do_select(sql_query)
if len(executors) <= 0:
logging.error('query_task_executor is null! %s' % (sql_query))
return []
return executors
def query_task_src(self, nodeid, area_id):
sql_query = "select distinct src from task.`task_src` where nodeid='%s' and area_id = %s" % (nodeid, area_id)
srcs = self.do_select(sql_query)
if len(srcs) <= 0:
logging.error('query_task_src is null! %s' % (sql_query))
return []
return srcs
def query_task_type(self, nodeid, area_id):
sql_query = "select * from task.`task_type` where (nodeid='%s' or nodeid = 0) and (area_id = %s or area_id = 0)" % (nodeid, area_id)
types = self.do_select(sql_query)
if len(types) <= 0:
logging.error('query_task_type is null! %s' % (sql_query))
return []
return types
def query_all_tak_list(self, nodeid, area_id):
"""查询路口的全部台账履历"""
sql_query = "select * from task.`task` where nodeid='%s' and area_id = %s and record_state=0 order by task_state asc, progress asc, plan_end_time desc" % (nodeid, area_id)
res_list = self.do_select(sql_query)
for res in res_list:
logging.info(res['update_time'])
if res['update_time'] is not None:
res['update_time'] = res['update_time'].strftime('%Y-%m-%d %H:%M:%S')
return res_list
def query_task_list(self, task_name,task_type,data_type, task_class,plan_begin_time,plan_end_time,publish_time,task_state,executor,task_src, nodeid):
"""查询路口的全部台账履历"""
sql_query = "select * from task.`task` where nodeid='%s' and record_state=0" % (nodeid)
if task_name is not None and len(task_name)>0:
sql_query += " and task_name like '%%%s%%'" %(task_name)
if task_type is not None and len(task_type)>0:
sql_query += " and task_type='%s'" %(task_type)
if data_type is not None and len(data_type) > 0:
data_type += " and task_type='%s'" % (data_type)
if task_class is not None and len(task_class)>0:
sql_query += " and task_class='%s'" %(task_class)
if plan_begin_time is not None and len(plan_begin_time)>0:
sql_query += " and plan_begin_time='%s'" %(plan_begin_time)
if plan_end_time is not None and len(plan_end_time)>0:
sql_query += " and plan_end_time='%s'" %(plan_end_time)
if publish_time is not None and len(publish_time)>0:
sql_query += " and publish_time='%s'" %(publish_time)
if task_state is not None and len(task_state)>0:
sql_query += " and task_state='%s'" %(task_state)
if executor is not None and len(executor)>0:
sql_query += " and executor='%s'" %(executor)
if task_src is not None and len(task_src) > 0:
sql_query += " and task_src='%s'" % (task_src)
res_list = self.do_select(sql_query)
if len(res_list) > 0:
for res in res_list:
logging.info(res['update_time'])
if res['update_time'] is not None:
res['update_time'] = res['update_time'].strftime('%Y-%m-%d %H:%M:%S')
return res_list
def query_completed_task_cross_list(self, nodeid, area_id):
sql_query = "select crossid,name from tmnet.cross where crossid in (select crossids from task where task_state=4 and nodeid='%s') and nodeid='%s';" % (nodeid, nodeid)
return self.do_select(sql_query)
def query_completed_task_by_cross(self, crossid, nodeid, area_id):
sql_query = "select * from task.task where task_state=4 and crossids='%s' and nodeid='%s' and area_id = %s;" % (crossid, nodeid, area_id)
return self.do_select(sql_query)
def get_last_taskno(self, creatorid, task_name, nodeid):
sql_query = "select max(taskno) as taskno from task.task where creatorid='%s' and task_name='%s' and nodeid='%s';" % (creatorid,task_name, nodeid)
tasknos = self.do_select(sql_query)
if len(tasknos) != 1:
logging.error('get_last_taskno error! %s' % (sql_query))
return None
return tasknos[0]['taskno']
def get_task_no(self, nodeid, area_id, task_name, task_type, task_class, data_type, plan_begin_time, plan_end_time, executor, task_src, comment):
sql = f"""
select taskno from task.task where nodeid = '{nodeid}' and area_id = {area_id} and task_name = '{task_name}' and task_type = {task_type}
and task_class = {task_class} and data_type = {data_type} and plan_begin_time = {plan_begin_time}
and plan_end_time = {plan_end_time} and executor = '{executor}' and task_src = '{task_src}' and comment = '{comment}'
"""
res = self.do_select(sql)
if len(res) > 0:
return res[0]['taskno']
return None
def delete_task(self, taskno, nodeid, area_id):
taskno_str = ','.join(str(num) for num in taskno)
sql = "update task.`task` set record_state=1 where taskno in (%s) and nodeid='%s' and area_id = %s;" % (taskno_str, nodeid, area_id)
# 执行删除操作
return self.do_execute(sql)
def query_task_history(self, taskno, nodeid, area_id):
"""查询路口的全部台账履历"""
sql_query = "select * from task.task_history where taskno='%s' and nodeid='%s' and area_id = %s order by history_date;" % (taskno, nodeid, area_id)
return self.do_select(sql_query)
def query_task_progress_history(self, yesterday_date, nodeid, area_id):
"""查询路口的全部台账履历"""
sql_query = (f"""
select
t2.task_name,
content
from
(select * from task.task_history where date(history_date)='{yesterday_date}' and nodeid='{nodeid}' and area_id = {area_id} and content like '%%operation%%任务进度%%content%%' order by history_date) t1
left join
(select task_name,taskno from task.task) t2
on t1.taskno = t2.taskno;
""")
return self.do_select(sql_query)
def add_task_history(self, taskno, nodeid, area_id, operator,remark):
"""查询路口的全部台账履历"""
now = datetime.now()
time_str = now.strftime('%Y-%m-%d %H:%M:%S')
sql_insert = "insert into task.task_history (taskno,nodeid, area_id, operator,content, history_date) values('%s',%s,'%s','%s','%s','%s')" % (taskno, nodeid, area_id, operator,remark, time_str)
count = self.do_execute(sql_insert)
if count != 1:
logging.error('add_task_history error! %s' % (sql_insert))
return 0
return count
def add_task(self, timestamp, creatorid, task_name, task_type, data_type, plan_begin_time, plan_end_time,publish_time,
executor, progress, task_state, description, crossids, waveid, wave_name, comment,
record_state, task_src, task_class, nodeid, area_id, task_type_class, full_review):
sql_insert = "insert into task.task (timestamp, creatorid, task_name, task_type, data_type, plan_begin_time, plan_end_time,publish_time," \
" executor, progress, task_state, description, crossids, waveid, wave_name, comment," \
" record_state, task_src, task_class, nodeid, area_id, task_type_class, full_review) values('%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s', %s, %s, %s)" % (
timestamp, creatorid, task_name, task_type, data_type, plan_begin_time, plan_end_time,publish_time,
executor, progress, task_state, description, crossids, waveid, wave_name, comment,
record_state, task_src, task_class, nodeid, area_id, task_type_class, full_review)
count = self.do_execute(sql_insert)
if count != 1:
logging.error('add_task error! %s' % (sql_insert))
return 0
return count
def update_task_info(self, taskno, modify_data, nodeid, area_id):
update_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
sql = f"update task.task set %s, update_time = '%s' where taskno = '%s' and nodeid = %s and area_id = %s" %(modify_data, update_time, taskno, nodeid, area_id)
return self.do_execute(sql)
##########################################################################
def insert_upload_file_record(self, taskno, nodeid, download_url, area_id):
sql = f"""
insert into task.task_upload_file_record (taskno, nodeid, download_url, area_id)
values (%d , %d, '%s', %s)
""" % (int(taskno), int(nodeid), download_url, area_id)
return self.do_execute(sql)
def query_task_file(self, nodeid, area_id):
sql = f"""
select * from task.task_upload_file_record where nodeid = %d and area_id = %s
""" % (int(nodeid), area_id)
row_list = self.do_select(sql)
tmp_dict = {}
for row in row_list:
if row['taskno'] not in tmp_dict.keys():
tmp_dict[row['taskno']] = [{
'id': row['id'],
'download_url': row['download_url']
}]
else:
tmp_dict[row['taskno']].append({
'id': row['id'],
'download_url': row['download_url']
})
return tmp_dict
def query_task_file_by_taskno(self, nodeid, taskno, area_id):
sql = f"""
select id, download_url from task.task_upload_file_record where nodeid = %d and taskno = %d and area_id = %d
""" % (int(nodeid), int(taskno), int(area_id))
row_list = self.do_select(sql)
download_url_list = []
for row in row_list:
download_url_list.append({
'id': row['id'],
'download_url': row['download_url']
})
return download_url_list
def del_task_file(self, nodeid, taskno, file_id):
sql = f"""
delete from task.task_upload_file_record where nodeid = %d and taskno = %d and id = %d
""" % (int(nodeid), int(taskno), int(file_id))
return self.do_execute(sql)
def update_task_state(self, nodeid, area_id, task_no, state):
sql = "update task set task_state='%s' where nodeid='%s' and area_id='%s' and taskno='%s'" % (state, nodeid, area_id, task_no)
return self.do_execute(sql)
def query_ledger_task_list(self, nodeid, area_id):
sql = f"""
select distinct crossid from task.ledger_task_detail where task_no in (select taskno from task.task where nodeid = %d and area_id = %d and task_type_class = 1 and record_state != 1 and task_state != 4) and (submit_status is null or submit_status != 2)
""" % (int(nodeid), int(area_id))
return self.do_select(sql)
def query_ledger_task_crosses_info(self, task_no):
sql = """
select * from ledger_task_detail where task_no = %s
""" % task_no
return self.do_select(sql)
def query_ledger_task_crosses_info_by_area_id(self, nodeid, area_id):
sql = """
select * from task.ledger_task_detail where nodeid = %s and area_id = %s
""" % (nodeid, area_id)
res = self.do_select(sql)
task_info_dict = {}
for row in res:
task_no = row['task_no']
if task_no not in task_info_dict.keys():
task_info_dict[task_no] = []
task_info_dict[task_no].append(row)
task_res_dict = {}
for task_no in task_info_dict.keys():
task_cross_num = len(task_info_dict[task_no])
entered_cross_num = len([cross for cross in task_info_dict[task_no] if cross['ledger_status'] == 2 and cross['phase_status'] in (1, 3)])
approve_cross_num = len([cross for cross in task_info_dict[task_no] if cross['submit_status'] == 2])
entered_percent = int(entered_cross_num * 100 / task_cross_num) if task_cross_num > 0 else 0
approve_percent = int(approve_cross_num * 100 / task_cross_num) if task_cross_num > 0 else 0
task_res_dict[task_no] = {
'task_no': task_no,
'task_cross_num': task_cross_num,
'entered_cross_num': entered_cross_num,
'entered_percent': entered_percent,
'approve_cross_num': approve_cross_num,
'approve_percent': approve_percent
}
return task_res_dict
def query_ledger_task_crosses_pics(self, crossid_list):
crossids = "'" + "', '".join(item for item in crossid_list) + "'"
sql = """
select * from tmnet.user_upload_cross_pics where crossid in (%s)
""" % crossids
return self.do_select(sql)
def drop_old_task_cross(self, crossid_list, old_task_no, nodeid, area_id):
crossids = "'" + "', '".join(item for item in crossid_list) + "'"
sql = """
delete from task.ledger_task_detail where task_no = %s and crossid in (%s)
""" % (old_task_no, crossids)
old_task_info = self.query_task(old_task_no, nodeid, area_id)
old_crossids = old_task_info['crossids'].split(',')
drop_crossids = list(set(old_crossids) - set(crossid_list))
update_task_info_sql = """
update task.task set crossids = '%s' where taskno = %s and nodeid = %s and area_id = %s
""" % (','.join(drop_crossids), old_task_no, nodeid, area_id)
conn, cursor = self.connect()
try:
conn.begin()
logging.info('drop_old_task_cross: %s' % (sql))
logging.info('drop_old_task_cross: %s' % (update_task_info_sql))
update_ret = cursor.execute(update_task_info_sql)
del_ret = cursor.execute(sql)
if update_ret == 1 and del_ret == len(crossid_list):
conn.commit()
return True
else:
conn.rollback()
return False
except Exception as e:
logging.error(e)
conn.rollback()
return False
def insert_ledger_task_cross(self, insert_list, crossid_list, task_no, nodeid):
crossids = "'" + "', '".join(item for item in crossid_list) + "'"
sql = """
insert into task.ledger_task_detail (task_no, crossid, nodeid, area_id) values(%s, %s, %s, %s)
"""
query_cross_ledger_status_sql = """
select crossid, nodeid, status from tmnet.ledger_entering_status where crossid in (%s)
""" % crossids
cross_ledger_status_values = set()
ledger_status_info = self.do_select(query_cross_ledger_status_sql)
cross_ledger_info_dict = {row['crossid']: row['status'] for row in ledger_status_info}
for crossid in crossid_list:
if crossid in cross_ledger_info_dict.keys():
cross_ledger_status_values.add(f"when '{crossid}' then {cross_ledger_info_dict[crossid]} ")
else:
cross_ledger_status_values.add(f"when '{crossid}' then 0 ")
# logging.error(cross_ledger_status_values)
update_ledger_task_cross_status_sql = f"""
update task.ledger_task_detail set ledger_status = case crossid {' '.join(list(cross_ledger_status_values))} end where crossid in ({crossids}) and nodeid = {nodeid} and task_no = {task_no}
"""
logging.info(update_ledger_task_cross_status_sql)
conn, cursor = self.connect()
try:
conn.begin()
ret = cursor.executemany(sql, insert_list)
if ret == len(insert_list):
update_ledger_status_ret = cursor.execute(update_ledger_task_cross_status_sql)
if update_ledger_status_ret == len(cross_ledger_status_values):
conn.commit()
return True
else:
conn.rollback()
logging.error("update ledger_entering_status fail")
return False
else:
conn.rollback()
return False
except Exception as e:
logging.error(e)
conn.rollback()
return False
def query_ledger_task_cross_record(self, task_no, crossid):
sql = "select * from task.ledger_task_detail where task_no = %s and crossid = '%s'" % (task_no, crossid)
res = self.do_select(sql)
if res:
return res[0]
return None
def update_ledger_task_cross_record(self, task_no, crossid, field_name, value):
sql = "update task.ledger_task_detail set %s = %s where task_no = '%s' and crossid = '%s'" % (field_name, value, task_no, crossid)
return self.do_execute(sql)
def approval_ledger_task_cross(self, task_no, crossid, status, approver, approver_id, now_time):
sql = """
update task.ledger_task_detail set submit_status = %s, approver = '%s', approver_id = '%s', approver_time = '%s' where task_no = '%s' and crossid = '%s'
""" % (status, approver, approver_id, now_time, task_no, crossid)
return self.do_execute(sql)
def query_ledger_task_crosses(self, taskno):
sql = """
select * from task.ledger_task_detail where task_no = %s
""" % taskno
return self.do_select(sql)
def query_all_ledger_task_crosses_info(self, nodeid, area_id):
sql = """
select * from task.ledger_task_detail where nodeid = %s and area_id = %s
""" % (nodeid, area_id)
res = self.do_select(sql)
task_cross_dict = {}
for row in res:
if row['task_no'] not in task_cross_dict.keys():
task_cross_dict[row['task_no']] = [row]
else:
task_cross_dict[row['task_no']].append(row)
return task_cross_dict
def query_cross_entering_status(self, crossid_list):
crossids = "'" + "', '".join(item for item in crossid_list) + "'"
sql = """
select crossid, ledger_status, phase_status, update_time from task.ledger_task_detail where crossid in (%s) order by update_time asc
""" % crossids
return self.do_select(sql)
def query_greenwave_task_requirement_validation_info_sql(self, task_no):
sql = """
select * from task.greenwave_task_detail where task_no = %s
""" % (task_no)
res = self.do_select(sql)
if res:
return res[0]
return None
def query_wave_task_additional_info_sql(self, task_no, nodeid, area_id):
sql = """
select * from task.greenwave_task_additional_detail where task_no = %s and nodeid = %s and area_id = %s
""" % (task_no, nodeid, area_id)
res = self.do_select(sql)
if res:
return res[0]
return None
def query_wave_task_crosses(self, task_no):
sql = """
select * from task.greenwave_task_cross_detail where task_no = %s
""" % (task_no)
return self.do_select(sql)
def query_wave_task_tp_info(self, task_no, nodeid, area_id):
sql = """
select * from task.greenwave_task_tp_detail where task_no = %s and nodeid = %s and area_id = %s
""" % (task_no, nodeid, area_id)
return self.do_select(sql)
def query_task_check_res(self, task_no, nodeid, area_id):
sql = """
select * from task.greenwave_task_result_addirm_detail where task_no = %s and nodeid = %s and area_id = %s
""" % (task_no, nodeid, area_id)
res = self.do_select(sql)
tp_info_dict = {}
for row in res:
if row['tp_info_id'] not in tp_info_dict.keys():
tp_info_dict[row['tp_info_id']] = []
tp_info_dict[row['tp_info_id']].append(row)
return tp_info_dict
def query_greenwave_task_adjustment_record(self, task_no):
sql = """
select * from task.greenwave_task_tiny_adjustment_record where task_no = %s
""" % (task_no)
return self.do_select(sql)
def save_task_require_confirm_info(self, task_no, nodeid, area_id, require_confirm_info, creator_id, creator):
base_info = require_confirm_info['base_info']
cross_list = require_confirm_info['cross_list']
tp_info = require_confirm_info['tp_info']
insert_base_info_sql = """
insert into task.greenwave_task_detail(task_no, wave_name, waveid, start_cross_name, end_cross_name, cross_num,
task_src, plan_start_time, plan_end_time, predict_issue_time, is_urgency, wave_status, slc_company,
max_phase_position, left_coor, comment, creator_id, creator, executor_id, executor) values (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s,
%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
params = (
task_no,
base_info['wave_name'],
base_info['waveid'],
base_info['start_cross_name'],
base_info['end_cross_name'],
base_info['cross_num'],
base_info['task_src'],
base_info['plan_start_time'],
base_info['plan_end_time'],
base_info['predict_issue_time'],
base_info['is_urgency'],
base_info['wave_status'],
base_info['slc_company'],
base_info['max_phase_position'],
base_info['left_coor'],
base_info['comment'],
creator_id,
creator,
base_info['executor_id'],
base_info['executor']
)
insert_cross_list = []
for item in cross_list:
influence_factor = item['influence_factor']
phase_info_json = json.dumps(item['phase_info'], ensure_ascii=False)
insert_cross_list.append((task_no, item['crossid'], item['cross_name'], item['location'], influence_factor['intersect_coor'],
influence_factor['phase_pos_adjust'], influence_factor['phase_seq_adjust'], influence_factor['phase_adjust'],
influence_factor['borrow_left'], influence_factor['waiting_left'], influence_factor['waiting_straight'],
influence_factor['waiting_bicycle'], phase_info_json))
insert_cross_info_sql = """
insert into task.greenwave_task_cross_detail(task_no, crossid, cross_name, location, intersect_coor,
phase_pos_adjust, phase_seq_adjust, phase_adjust, borrow_left, waiting_left, waiting_straight,
waiting_bicycle, phase_info) values (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
insert_tp_info = []
for item in tp_info:
insert_tp_info.append((task_no, item['wave_tp_id'], item['tp_start'], item['tp_end'], item['coor_dir'], item['priority_coor_dir'], item['weekday'], nodeid, area_id))
insert_tp_info_sql = """
insert into task.greenwave_task_tp_detail(task_no, wave_tp_id, tp_start, tp_end, coor_dir, priority_coor_dir, weekday,
nodeid, area_id) values (%s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
insert_additional_sql = "insert into task.greenwave_task_additional_detail (task_no, nodeid, area_id) values (%s, %s, %s)"
insert_additional_params = (task_no, nodeid, area_id)
conn, cursor = self.connect()
try:
# logging.warning(cursor.mogrify(insert_base_info_sql, params))
insert_greenwave_task_ret = cursor.execute(insert_base_info_sql, params)
# logging.warning(cursor.mogrify(insert_cross_info_sql, insert_cross_list))
insert_cross_ret = cursor.executemany(insert_cross_info_sql, insert_cross_list)
# logging.warning(cursor.mogrify(insert_tp_info_sql, insert_tp_info))
insert_tp_ret = cursor.executemany(insert_tp_info_sql, insert_tp_info)
insert_additional_ret = cursor.execute(insert_additional_sql, insert_additional_params)
if insert_greenwave_task_ret == 1 and insert_cross_ret == len(cross_list) and insert_tp_ret == len(tp_info) and insert_additional_ret == 1:
conn.commit()
return True
else:
logging.error(insert_greenwave_task_ret, insert_cross_ret, len(cross_list), insert_tp_ret, len(tp_info))
conn.rollback()
return False
except Exception as e:
conn.rollback()
print(e)
logging.error(e)
return False
def init_additional_info(self, task_no, nodeid, area_id, cross_infos):
cross_values = []
for cross_info in cross_infos:
cross_values.append((task_no, cross_info['crossid'], cross_info['name'], cross_info['location']))
insert_additional_sql = "insert into task.greenwave_task_additional_detail (task_no, nodeid, area_id) values (%s, %s, %s)" % (task_no, nodeid, area_id)
insert_cross_sql = """
insert into task.greenwave_task_cross_detail(task_no, crossid, cross_name, location) values (%s, %s, %s, %s)
"""
conn, cursor = self.connect()
try:
insert_additional_ret = cursor.execute(insert_additional_sql)
insert_cross_ret = cursor.executemany(insert_cross_sql, cross_values)
if insert_additional_ret == 1 and insert_cross_ret == len(cross_infos):
conn.commit()
return True
else:
conn.rollback()
return False
except Exception as e:
conn.rollback()
print(e)
logging.error(e)
return False
def async_wave_tp_id(self, wave_tp_info, task_tp_info, task_no, nodeid, area_id):
# 补充同步绿波时段id到任务时段信息表中的逻辑
wave_tp_info_dict = {row['tp_start'] + '-' + row['tp_end'] + '-' + row['weekday']: row['wave_tp_id'] for row in wave_tp_info}
task_tp_info_dict = {row['tp_start'] + '-' + row['tp_end'] + '-' + row['weekday']: row for row in task_tp_info}
for tp_key in task_tp_info_dict:
if tp_key in wave_tp_info_dict:
task_tp_info_dict[tp_key]['wave_tp_id'] = wave_tp_info_dict[tp_key]
else:
return False
# 补充跳过需求确认单没有任务时段信息时需要将
if not task_tp_info_dict:
insert_list = []
for tp_info in wave_tp_info:
insert_list.append((task_no, tp_info['wave_tp_id'], tp_info['tp_start'], tp_info['tp_end'], tp_info['type_str'], tp_info['type_str'], tp_info['weekday'], nodeid, area_id))
insert_sql = """
insert into task.greenwave_task_tp_detail(task_no, wave_tp_id, tp_start, tp_end, coor_dir, priority_coor_dir, weekday, nodeid, area_id) values (%s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
ret = self.do_executemany(insert_sql, insert_list)
return ret == len(insert_list)
else:
update_sql = """
update task.greenwave_task_tp_detail set wave_tp_id = %s where tp_start = %s and tp_end = %s and weekday = %s and task_no = %s and nodeid = %s and area_id = %s
"""
conn, cursor = self.connect()
try:
for tp_info in task_tp_info_dict.values():
cursor.execute(update_sql, (tp_info['wave_tp_id'], tp_info['tp_start'], tp_info['tp_end'], tp_info['weekday'], task_no, nodeid, area_id))
conn.commit()
return True
except Exception as e:
conn.rollback()
print(e)
return False
def update_stage1_info(self, task_no, waveid, wave_name, task_stage, task_info, wave_crosses):
conn, cursor = self.connect()
try:
if task_info['waveid'] and task_info['waveid'] != '' and task_info['wave_name'] and task_info['wave_name'] != '':
# 表示任务已经绑定绿波则该次操作仅需要更新task_stage
update_sql = """
update task.greenwave_task_additional_detail set task_stage = %s where task_no = %s
""" % (task_stage, task_no)
cursor.execute(update_sql)
conn.commit()
return True
else:
crossids = ','.join(wave_crosses)
update_sql1 = """
update task.task set waveid = %s, wave_name = %s, crossids = %s where taskno = %s
"""
update_sql2 = """
update task.greenwave_task_additional_detail set task_stage = %s where task_no = %s
"""
cursor.execute(update_sql1, (waveid, wave_name, crossids, task_no))
cursor.execute(update_sql2, (task_stage, task_no))
conn.commit()
return True
except Exception as e:
conn.rollback()
print(e)
return False
def update_stage2_info(self, task_no, stage_info):
sql = """
update task.greenwave_task_additional_detail set task_stage = '%s' where task_no = %s
""" % (stage_info, task_no)
return self.do_execute(sql)
def update_stage3_info(self, actually_issue_time, task_no, task_stage, delay_reason):
sql = """
update task.greenwave_task_additional_detail set actually_issue_time = '%s', task_stage = '%s', delay_issue_reason = '%s' where task_no = %s
""" % (actually_issue_time, task_stage, delay_reason, task_no)
return self.do_execute(sql)
def update_stage4_info(self, task_no, task_stage, tp_info, nodeid, area_id, task_additional_info):
old_tp_info = self.query_wave_task_tp_info(task_no, nodeid, area_id)
old_tp_info_dict = {row['tp_start'] + '-' + row['tp_end']: row for row in old_tp_info}
update_tp_check_res_set, update_tp_json_set, update_tp_info_ids, has_bad = set(), set(), [], False
for tp_item in tp_info:
tp_info_id = tp_item['tp_info_id']
result_check_res = tp_item['result_check_res']
tp_key = tp_item['tp_start'] + '-' + tp_item['tp_end']
if result_check_res == 1:
has_bad = True
not_pass_detail = tp_item['not_pass_detail']
# 处理 not_pass_detail可能是字符串、列表或 None
if not_pass_detail is None or not not_pass_detail:
not_pass_detail_json = ''
else:
# 如果是字符串,需要先处理转义字符(包括中文)
# 先替换转义的双引号和换行符等
not_pass_detail_json = json.dumps(not_pass_detail, ensure_ascii=False)
# cleaned_str = parsed.replace('\\"', '"').replace('\\\\n', '\\n')
# not_pass_detail_json = json.dumps(parsed, ensure_ascii=False)
if result_check_res != old_tp_info_dict[tp_key]['result_check_res'] or not_pass_detail_json != old_tp_info_dict[tp_key]['not_pass_detail']:
update_tp_check_res_set.add(f"when {tp_info_id} then {result_check_res} ")
update_tp_json_set.add(f"when {tp_info_id} then '{not_pass_detail_json}' ")
update_tp_info_ids.append(tp_info_id)
conn, cursor = self.connect()
try:
update_tp_info_sql = f"""
update task.greenwave_task_tp_detail
set result_check_res =
case id
{' '.join(list(update_tp_check_res_set))}
else result_check_res
end,
not_pass_detail =
case id
{' '.join(list(update_tp_json_set))}
else not_pass_detail
end
where id in ({','.join(str(id) for id in update_tp_info_ids)})
and nodeid = {nodeid}
and area_id = {area_id}
and task_no = {task_no}
"""
cursor.execute(update_tp_info_sql)
if has_bad:
update_stage_sql = """
update task.greenwave_task_additional_detail set task_stage = %s where task_no = %s
"""
cursor.execute(update_stage_sql, (task_stage, task_no))
else:
now_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
update_state_sql = """
update task.greenwave_task_additional_detail set task_stage = %s, affirm_time = %s where task_no = %s
"""
cursor.execute(update_state_sql, (task_stage, now_time, task_no))
conn.commit()
return True, has_bad
except Exception as e:
conn.rollback()
logging.error(e)
print(e)
return False, has_bad
def update_stage5_info(self, task_no, task_stage, affirm_time):
sql = """
update task.greenwave_task_additional_detail set task_stage = '%s', affirm_time = '%s' where task_no = %s
""" % (task_stage, affirm_time, task_no)
return self.do_execute(sql)
def update_stage7_info(self, task_no, task_stage, open_wave_monitor_job, belong_wave_monitor_job_id, belong_wave_monitor_job_name, task_additional_info, phase_update_time_record):
update_phase_list = []
crossids = "'" + "', '".join(item for item in phase_update_time_record) + "'"
for crossid in phase_update_time_record:
update_phase_list.append(f"when '{crossid}' then 1")
update_phase_sql = f"""
update task.greenwave_task_cross_detail set phase_update_status = case crossid {' '.join(update_phase_list)} else phase_update_status end where task_no = {task_no} and crossid in ({crossids})
"""
conn, cursor = self.connect()
try:
cursor.execute(update_phase_sql)
if open_wave_monitor_job != task_additional_info['open_wave_monitor_job'] or belong_wave_monitor_job_id != task_additional_info['belong_wave_monitor_job_id'] or belong_wave_monitor_job_name != task_additional_info['belong_wave_monitor_job_name']:
update_sql = """
update task.greenwave_task_additional_detail set task_stage = %s, open_wave_monitor_job = %s, belong_wave_monitor_job_id = %s, belong_wave_monitor_job_name = %s where task_no = %s
"""
cursor.execute(update_sql, (task_stage, open_wave_monitor_job, belong_wave_monitor_job_id, belong_wave_monitor_job_name, task_no))
conn.commit()
return True
else:
update_sql = """
update task.greenwave_task_additional_detail set task_stage = %s where task_no = %s
"""
cursor.execute(update_sql, (task_stage, task_no))
conn.commit()
return True
except Exception as e:
conn.rollback()
logging.error(e)
print(e)
return False
def update_additional_detail_file_info(self, stage_no, download_url, task_no, nodeid, area_id, upload_time):
if stage_no == 2:
update_sql = """
update task.greenwave_task_additional_detail set wave_optimize_url = '%s', upload_wave_optimize_time = '%s' where task_no = %s and nodeid = %s and area_id = %s
""" % (download_url, upload_time, task_no, nodeid, area_id)
else:
update_sql = """
update task.greenwave_task_additional_detail set compare_report_url = '%s', upload_compare_report_time = '%s' where task_no = %s and nodeid = %s and area_id = %s
""" % (download_url, upload_time, task_no, nodeid, area_id)
return self.do_execute(update_sql)
def insert_greenwave_task_tiny_adjustment_record(self, values):
sql = """
insert into task.greenwave_task_tiny_adjustment_record(task_no, upload_file_path, creator_id, creator) values (%s, %s, %s, %s)
"""
return self.do_executemany(sql, values)
def del_greenwave_task_tiny_adjustment_record_sql(self, id):
sql = """
delete from task.greenwave_task_tiny_adjustment_record where id = %s
""" % id
return self.do_execute(sql)
def update_task_progress(self, task_no, nodeid, area_id, progress):
sql = """
update task.task set progress = %s where taskno = %s and nodeid= %s and area_id = %s
""" % (progress, task_no, nodeid, area_id)
return self.do_execute(sql)
#
# if __name__ == '__main__':
# tt_5min = get_latest_5min_timestamp()
# print(tt_5min)
# print(get_today_str())