2025-11-11 10:07:25 +08:00
# -*- coding: utf-8 -*-
# @Author: Owl
# @Date: 2025/11/10 17:59
# @Description:
# -*- coding:utf-8 -*-
#import logging
2026-04-07 09:48:43 +08:00
import json
2026-03-12 18:01:22 +08:00
import logging
2025-11-11 10:07:25 +08:00
import pymysql
import pymysql . cursors
from datetime import datetime
from flask import g
from app . db_func_base import *
2026-03-12 18:01:22 +08:00
2025-11-11 10:07:25 +08:00
class TaskDbHelper ( TableDbHelperBase ) :
def __init__ ( self , pool ) :
self . db_pool = pool
self . DB_Name = ' task '
def query_task ( self , taskno , nodeid , area_id ) :
2026-04-07 09:48:43 +08:00
sql_query = " select * from task.`task` where nodeid= ' %s ' and taskno= ' %s ' and area_id = %s " % ( nodeid , taskno , area_id )
2025-11-11 10:07:25 +08:00
tasks = self . do_select ( sql_query )
if len ( tasks ) != 1 :
logging . error ( ' query_ledger error! %s ' % ( sql_query ) )
return None
return tasks [ 0 ]
def query_task_executor ( self , nodeid , area_id ) :
2026-04-07 09:48:43 +08:00
sql_query = " select user_name, userno from user.user where userno in (select userno from user.area_user where nodeid = ' %s ' and area_id = %s ) and department = ' 信号调优团队 ' " % ( nodeid , area_id )
2025-11-11 10:07:25 +08:00
executors = self . do_select ( sql_query )
if len ( executors ) < = 0 :
logging . error ( ' query_task_executor is null! %s ' % ( sql_query ) )
return [ ]
return executors
def query_task_src ( self , nodeid , area_id ) :
2026-04-07 09:48:43 +08:00
sql_query = " select distinct src from task.`task_src` where nodeid= ' %s ' and area_id = %s " % ( nodeid , area_id )
2025-11-11 10:07:25 +08:00
srcs = self . do_select ( sql_query )
if len ( srcs ) < = 0 :
logging . error ( ' query_task_src is null! %s ' % ( sql_query ) )
return [ ]
return srcs
def query_task_type ( self , nodeid , area_id ) :
2026-04-07 09:48:43 +08:00
sql_query = " select * from task.`task_type` where (nodeid= ' %s ' or nodeid = 0) and (area_id = %s or area_id = 0) " % ( nodeid , area_id )
2025-11-11 10:07:25 +08:00
types = self . do_select ( sql_query )
if len ( types ) < = 0 :
logging . error ( ' query_task_type is null! %s ' % ( sql_query ) )
return [ ]
return types
def query_all_tak_list ( self , nodeid , area_id ) :
""" 查询路口的全部台账履历 """
2026-04-07 09:48:43 +08:00
sql_query = " select * from task.`task` where nodeid= ' %s ' and area_id = %s and record_state=0 order by task_state asc, progress asc, plan_end_time desc " % ( nodeid , area_id )
2025-11-11 10:07:25 +08:00
res_list = self . do_select ( sql_query )
for res in res_list :
logging . info ( res [ ' update_time ' ] )
if res [ ' update_time ' ] is not None :
res [ ' update_time ' ] = res [ ' update_time ' ] . strftime ( ' % Y- % m- %d % H: % M: % S ' )
return res_list
def query_task_list ( self , task_name , task_type , data_type , task_class , plan_begin_time , plan_end_time , publish_time , task_state , executor , task_src , nodeid ) :
""" 查询路口的全部台账履历 """
2026-04-07 09:48:43 +08:00
sql_query = " select * from task.`task` where nodeid= ' %s ' and record_state=0 " % ( nodeid )
2025-11-11 10:07:25 +08:00
if task_name is not None and len ( task_name ) > 0 :
sql_query + = " and task_name like ' %% %s %% ' " % ( task_name )
if task_type is not None and len ( task_type ) > 0 :
sql_query + = " and task_type= ' %s ' " % ( task_type )
if data_type is not None and len ( data_type ) > 0 :
data_type + = " and task_type= ' %s ' " % ( data_type )
if task_class is not None and len ( task_class ) > 0 :
sql_query + = " and task_class= ' %s ' " % ( task_class )
if plan_begin_time is not None and len ( plan_begin_time ) > 0 :
sql_query + = " and plan_begin_time= ' %s ' " % ( plan_begin_time )
if plan_end_time is not None and len ( plan_end_time ) > 0 :
sql_query + = " and plan_end_time= ' %s ' " % ( plan_end_time )
if publish_time is not None and len ( publish_time ) > 0 :
sql_query + = " and publish_time= ' %s ' " % ( publish_time )
if task_state is not None and len ( task_state ) > 0 :
sql_query + = " and task_state= ' %s ' " % ( task_state )
if executor is not None and len ( executor ) > 0 :
sql_query + = " and executor= ' %s ' " % ( executor )
if task_src is not None and len ( task_src ) > 0 :
sql_query + = " and task_src= ' %s ' " % ( task_src )
res_list = self . do_select ( sql_query )
if len ( res_list ) > 0 :
for res in res_list :
logging . info ( res [ ' update_time ' ] )
if res [ ' update_time ' ] is not None :
res [ ' update_time ' ] = res [ ' update_time ' ] . strftime ( ' % Y- % m- %d % H: % M: % S ' )
return res_list
def query_completed_task_cross_list ( self , nodeid , area_id ) :
sql_query = " select crossid,name from tmnet.cross where crossid in (select crossids from task where task_state=4 and nodeid= ' %s ' ) and nodeid= ' %s ' ; " % ( nodeid , nodeid )
return self . do_select ( sql_query )
def query_completed_task_by_cross ( self , crossid , nodeid , area_id ) :
2026-04-07 09:48:43 +08:00
sql_query = " select * from task.task where task_state=4 and crossids= ' %s ' and nodeid= ' %s ' and area_id = %s ; " % ( crossid , nodeid , area_id )
2025-11-11 10:07:25 +08:00
return self . do_select ( sql_query )
def get_last_taskno ( self , creatorid , task_name , nodeid ) :
2026-04-07 09:48:43 +08:00
sql_query = " select max(taskno) as taskno from task.task where creatorid= ' %s ' and task_name= ' %s ' and nodeid= ' %s ' ; " % ( creatorid , task_name , nodeid )
2025-11-11 10:07:25 +08:00
tasknos = self . do_select ( sql_query )
if len ( tasknos ) != 1 :
logging . error ( ' get_last_taskno error! %s ' % ( sql_query ) )
return None
return tasknos [ 0 ] [ ' taskno ' ]
def get_task_no ( self , nodeid , area_id , task_name , task_type , task_class , data_type , plan_begin_time , plan_end_time , executor , task_src , comment ) :
sql = f """
2026-04-07 09:48:43 +08:00
select taskno from task . task where nodeid = ' {nodeid} ' and area_id = { area_id } and task_name = ' {task_name} ' and task_type = { task_type }
2025-11-11 10:07:25 +08:00
and task_class = { task_class } and data_type = { data_type } and plan_begin_time = { plan_begin_time }
and plan_end_time = { plan_end_time } and executor = ' {executor} ' and task_src = ' {task_src} ' and comment = ' {comment} '
"""
res = self . do_select ( sql )
if len ( res ) > 0 :
return res [ 0 ] [ ' taskno ' ]
return None
def delete_task ( self , taskno , nodeid , area_id ) :
taskno_str = ' , ' . join ( str ( num ) for num in taskno )
2026-04-07 09:48:43 +08:00
sql = " update task.`task` set record_state=1 where taskno in ( %s ) and nodeid= ' %s ' and area_id = %s ; " % ( taskno_str , nodeid , area_id )
2025-11-11 10:07:25 +08:00
# 执行删除操作
return self . do_execute ( sql )
def query_task_history ( self , taskno , nodeid , area_id ) :
""" 查询路口的全部台账履历 """
2026-04-07 09:48:43 +08:00
sql_query = " select * from task.task_history where taskno= ' %s ' and nodeid= ' %s ' and area_id = %s order by history_date; " % ( taskno , nodeid , area_id )
2025-11-11 10:07:25 +08:00
return self . do_select ( sql_query )
def query_task_progress_history ( self , yesterday_date , nodeid , area_id ) :
""" 查询路口的全部台账履历 """
sql_query = ( f """
select
t2 . task_name ,
content
from
( select * from task . task_history where date ( history_date ) = ' {yesterday_date} ' and nodeid = ' {nodeid} ' and area_id = { area_id } and content like ' %% operation %% 任务进度 %% content %% ' order by history_date ) t1
left join
( select task_name , taskno from task . task ) t2
on t1 . taskno = t2 . taskno ;
""" )
return self . do_select ( sql_query )
def add_task_history ( self , taskno , nodeid , area_id , operator , remark ) :
""" 查询路口的全部台账履历 """
now = datetime . now ( )
time_str = now . strftime ( ' % Y- % m- %d % H: % M: % S ' )
2026-04-07 09:48:43 +08:00
sql_insert = " insert into task.task_history (taskno,nodeid, area_id, operator,content, history_date) values( ' %s ' , %s , ' %s ' , ' %s ' , ' %s ' , ' %s ' ) " % ( taskno , nodeid , area_id , operator , remark , time_str )
2025-11-11 10:07:25 +08:00
count = self . do_execute ( sql_insert )
if count != 1 :
logging . error ( ' add_task_history error! %s ' % ( sql_insert ) )
return 0
return count
def add_task ( self , timestamp , creatorid , task_name , task_type , data_type , plan_begin_time , plan_end_time , publish_time ,
2026-03-19 14:26:36 +08:00
executor , progress , task_state , description , crossids , waveid , wave_name , comment ,
2026-03-12 18:01:22 +08:00
record_state , task_src , task_class , nodeid , area_id , task_type_class , full_review ) :
2026-04-07 09:48:43 +08:00
sql_insert = " insert into task.task (timestamp, creatorid, task_name, task_type, data_type, plan_begin_time, plan_end_time,publish_time, " \
2026-03-19 14:26:36 +08:00
" executor, progress, task_state, description, crossids, waveid, wave_name, comment, " \
2026-03-12 18:01:22 +08:00
" record_state, task_src, task_class, nodeid, area_id, task_type_class, full_review) values( ' %s ' , ' %s ' , ' %s ' , ' %s ' , ' %s ' , ' %s ' , ' %s ' , ' %s ' , ' %s ' , ' %s ' , ' %s ' , ' %s ' , ' %s ' , ' %s ' , ' %s ' , ' %s ' , ' %s ' , ' %s ' , ' %s ' , ' %s ' , %s , %s , %s ) " % (
2025-11-11 10:07:25 +08:00
timestamp , creatorid , task_name , task_type , data_type , plan_begin_time , plan_end_time , publish_time ,
2026-03-19 14:26:36 +08:00
executor , progress , task_state , description , crossids , waveid , wave_name , comment ,
2026-03-12 18:01:22 +08:00
record_state , task_src , task_class , nodeid , area_id , task_type_class , full_review )
2025-11-11 10:07:25 +08:00
count = self . do_execute ( sql_insert )
if count != 1 :
logging . error ( ' add_task error! %s ' % ( sql_insert ) )
return 0
return count
def update_task_info ( self , taskno , modify_data , nodeid , area_id ) :
update_time = datetime . now ( ) . strftime ( ' % Y- % m- %d % H: % M: % S ' )
sql = f " update task.task set %s, update_time = ' %s ' where taskno = ' %s ' and nodeid = %s and area_id = %s " % ( modify_data , update_time , taskno , nodeid , area_id )
return self . do_execute ( sql )
##########################################################################
2025-12-22 17:05:55 +08:00
def insert_upload_file_record ( self , taskno , nodeid , download_url , area_id ) :
2025-11-11 10:07:25 +08:00
sql = f """
2025-12-22 17:05:55 +08:00
insert into task . task_upload_file_record ( taskno , nodeid , download_url , area_id )
values ( % d , % d , ' %s ' , % s )
""" % (int(taskno), int(nodeid), download_url, area_id)
2025-11-11 10:07:25 +08:00
return self . do_execute ( sql )
def query_task_file ( self , nodeid , area_id ) :
sql = f """
select * from task . task_upload_file_record where nodeid = % d and area_id = % s
""" % (int(nodeid), area_id)
row_list = self . do_select ( sql )
tmp_dict = { }
for row in row_list :
if row [ ' taskno ' ] not in tmp_dict . keys ( ) :
tmp_dict [ row [ ' taskno ' ] ] = [ {
' id ' : row [ ' id ' ] ,
' download_url ' : row [ ' download_url ' ]
} ]
else :
tmp_dict [ row [ ' taskno ' ] ] . append ( {
' id ' : row [ ' id ' ] ,
' download_url ' : row [ ' download_url ' ]
} )
return tmp_dict
def query_task_file_by_taskno ( self , nodeid , taskno , area_id ) :
sql = f """
select id , download_url from task . task_upload_file_record where nodeid = % d and taskno = % d and area_id = % d
""" % (int(nodeid), int(taskno), int(area_id))
row_list = self . do_select ( sql )
download_url_list = [ ]
for row in row_list :
download_url_list . append ( {
' id ' : row [ ' id ' ] ,
' download_url ' : row [ ' download_url ' ]
} )
return download_url_list
2026-01-13 15:44:17 +08:00
def del_task_file ( self , nodeid , taskno , file_id ) :
2025-11-11 10:07:25 +08:00
sql = f """
2026-01-13 15:44:17 +08:00
delete from task . task_upload_file_record where nodeid = % d and taskno = % d and id = % d
""" % (int(nodeid), int(taskno), int(file_id))
2025-11-11 10:07:25 +08:00
return self . do_execute ( sql )
def update_task_state ( self , nodeid , area_id , task_no , state ) :
2025-12-22 15:28:23 +08:00
sql = " update task set task_state= ' %s ' where nodeid= ' %s ' and area_id= ' %s ' and taskno= ' %s ' " % ( state , nodeid , area_id , task_no )
2025-11-11 10:07:25 +08:00
return self . do_execute ( sql )
2026-03-12 18:01:22 +08:00
def query_ledger_task_list ( self , nodeid , area_id ) :
sql = f """
2026-04-07 09:48:43 +08:00
select distinct crossid from task . ledger_task_detail where task_no in ( select taskno from task . task where nodeid = % d and area_id = % d and task_type_class = 1 and record_state != 1 and task_state != 4 ) and ( submit_status is null or submit_status != 2 )
2026-03-12 18:01:22 +08:00
""" % (int(nodeid), int(area_id))
return self . do_select ( sql )
def query_ledger_task_crosses_info ( self , task_no ) :
sql = """
select * from ledger_task_detail where task_no = % s
""" % task_no
return self . do_select ( sql )
2026-03-19 14:26:36 +08:00
def query_ledger_task_crosses_info_by_area_id ( self , nodeid , area_id ) :
sql = """
2026-04-07 09:48:43 +08:00
select * from task . ledger_task_detail where nodeid = % s and area_id = % s
2026-03-19 14:26:36 +08:00
""" % (nodeid, area_id)
res = self . do_select ( sql )
task_info_dict = { }
for row in res :
task_no = row [ ' task_no ' ]
if task_no not in task_info_dict . keys ( ) :
task_info_dict [ task_no ] = [ ]
task_info_dict [ task_no ] . append ( row )
task_res_dict = { }
for task_no in task_info_dict . keys ( ) :
task_cross_num = len ( task_info_dict [ task_no ] )
entered_cross_num = len ( [ cross for cross in task_info_dict [ task_no ] if cross [ ' ledger_status ' ] == 2 and cross [ ' phase_status ' ] in ( 1 , 3 ) ] )
approve_cross_num = len ( [ cross for cross in task_info_dict [ task_no ] if cross [ ' submit_status ' ] == 2 ] )
entered_percent = int ( entered_cross_num * 100 / task_cross_num ) if task_cross_num > 0 else 0
approve_percent = int ( approve_cross_num * 100 / task_cross_num ) if task_cross_num > 0 else 0
task_res_dict [ task_no ] = {
' task_no ' : task_no ,
' task_cross_num ' : task_cross_num ,
' entered_cross_num ' : entered_cross_num ,
' entered_percent ' : entered_percent ,
' approve_cross_num ' : approve_cross_num ,
' approve_percent ' : approve_percent
}
return task_res_dict
2026-03-12 18:01:22 +08:00
def query_ledger_task_crosses_pics ( self , crossid_list ) :
crossids = " ' " + " ' , ' " . join ( item for item in crossid_list ) + " ' "
sql = """
select * from tmnet . user_upload_cross_pics where crossid in ( % s )
""" % c rossids
return self . do_select ( sql )
def drop_old_task_cross ( self , crossid_list , old_task_no , nodeid , area_id ) :
crossids = " ' " + " ' , ' " . join ( item for item in crossid_list ) + " ' "
sql = """
2026-04-07 09:48:43 +08:00
delete from task . ledger_task_detail where task_no = % s and crossid in ( % s )
2026-03-12 18:01:22 +08:00
""" % (old_task_no, crossids)
old_task_info = self . query_task ( old_task_no , nodeid , area_id )
old_crossids = old_task_info [ ' crossids ' ] . split ( ' , ' )
drop_crossids = list ( set ( old_crossids ) - set ( crossid_list ) )
update_task_info_sql = """
2026-04-07 09:48:43 +08:00
update task . task set crossids = ' %s ' where taskno = % s and nodeid = % s and area_id = % s
2026-03-12 18:01:22 +08:00
""" % ( ' , ' .join(drop_crossids), old_task_no, nodeid, area_id)
conn , cursor = self . connect ( )
try :
conn . begin ( )
logging . info ( ' drop_old_task_cross: %s ' % ( sql ) )
logging . info ( ' drop_old_task_cross: %s ' % ( update_task_info_sql ) )
update_ret = cursor . execute ( update_task_info_sql )
del_ret = cursor . execute ( sql )
if update_ret == 1 and del_ret == len ( crossid_list ) :
conn . commit ( )
return True
else :
conn . rollback ( )
return False
except Exception as e :
logging . error ( e )
conn . rollback ( )
return False
def insert_ledger_task_cross ( self , insert_list , crossid_list , task_no , nodeid ) :
crossids = " ' " + " ' , ' " . join ( item for item in crossid_list ) + " ' "
sql = """
2026-04-07 09:48:43 +08:00
insert into task . ledger_task_detail ( task_no , crossid , nodeid , area_id ) values ( % s , % s , % s , % s )
2026-03-12 18:01:22 +08:00
"""
query_cross_ledger_status_sql = """
select crossid , nodeid , status from tmnet . ledger_entering_status where crossid in ( % s )
""" % c rossids
cross_ledger_status_values = set ( )
ledger_status_info = self . do_select ( query_cross_ledger_status_sql )
cross_ledger_info_dict = { row [ ' crossid ' ] : row [ ' status ' ] for row in ledger_status_info }
for crossid in crossid_list :
if crossid in cross_ledger_info_dict . keys ( ) :
cross_ledger_status_values . add ( f " when ' { crossid } ' then { cross_ledger_info_dict [ crossid ] } " )
else :
cross_ledger_status_values . add ( f " when ' { crossid } ' then 0 " )
# logging.error(cross_ledger_status_values)
update_ledger_task_cross_status_sql = f """
2026-04-07 09:48:43 +08:00
update task . ledger_task_detail set ledger_status = case crossid { ' ' . join ( list ( cross_ledger_status_values ) ) } end where crossid in ( { crossids } ) and nodeid = { nodeid } and task_no = { task_no }
2026-03-12 18:01:22 +08:00
"""
logging . info ( update_ledger_task_cross_status_sql )
conn , cursor = self . connect ( )
try :
conn . begin ( )
ret = cursor . executemany ( sql , insert_list )
if ret == len ( insert_list ) :
update_ledger_status_ret = cursor . execute ( update_ledger_task_cross_status_sql )
if update_ledger_status_ret == len ( cross_ledger_status_values ) :
conn . commit ( )
return True
else :
conn . rollback ( )
logging . error ( " update ledger_entering_status fail " )
return False
else :
conn . rollback ( )
return False
except Exception as e :
logging . error ( e )
conn . rollback ( )
return False
def query_ledger_task_cross_record ( self , task_no , crossid ) :
2026-04-07 09:48:43 +08:00
sql = " select * from task.ledger_task_detail where task_no = %s and crossid = ' %s ' " % ( task_no , crossid )
2026-03-12 18:01:22 +08:00
res = self . do_select ( sql )
if res :
return res [ 0 ]
return None
def update_ledger_task_cross_record ( self , task_no , crossid , field_name , value ) :
2026-04-07 09:48:43 +08:00
sql = " update task.ledger_task_detail set %s = %s where task_no = ' %s ' and crossid = ' %s ' " % ( field_name , value , task_no , crossid )
2026-03-12 18:01:22 +08:00
return self . do_execute ( sql )
def approval_ledger_task_cross ( self , task_no , crossid , status , approver , approver_id , now_time ) :
sql = """
2026-04-07 09:48:43 +08:00
update task . ledger_task_detail set submit_status = % s , approver = ' %s ' , approver_id = ' %s ' , approver_time = ' %s ' where task_no = ' %s ' and crossid = ' %s '
2026-03-12 18:01:22 +08:00
""" % (status, approver, approver_id, now_time, task_no, crossid)
return self . do_execute ( sql )
2025-11-11 10:07:25 +08:00
2026-03-12 18:01:22 +08:00
def query_ledger_task_crosses ( self , taskno ) :
sql = """
2026-04-07 09:48:43 +08:00
select * from task . ledger_task_detail where task_no = % s
2026-03-12 18:01:22 +08:00
""" % taskno
return self . do_select ( sql )
2025-11-11 10:07:25 +08:00
2026-04-07 09:48:43 +08:00
def query_all_ledger_task_crosses_info ( self , nodeid , area_id ) :
sql = """
select * from task . ledger_task_detail where nodeid = % s and area_id = % s
""" % (nodeid, area_id)
res = self . do_select ( sql )
task_cross_dict = { }
for row in res :
if row [ ' task_no ' ] not in task_cross_dict . keys ( ) :
task_cross_dict [ row [ ' task_no ' ] ] = [ row ]
else :
task_cross_dict [ row [ ' task_no ' ] ] . append ( row )
return task_cross_dict
2026-03-19 14:26:36 +08:00
def query_cross_entering_status ( self , crossid_list ) :
crossids = " ' " + " ' , ' " . join ( item for item in crossid_list ) + " ' "
sql = """
2026-04-07 09:48:43 +08:00
select crossid , ledger_status , phase_status , update_time from task . ledger_task_detail where crossid in ( % s ) order by update_time asc
2026-03-19 14:26:36 +08:00
""" % c rossids
return self . do_select ( sql )
2026-04-07 09:48:43 +08:00
def query_greenwave_task_requirement_validation_info_sql ( self , task_no ) :
sql = """
select * from task . greenwave_task_detail where task_no = % s
""" % (task_no)
res = self . do_select ( sql )
if res :
return res [ 0 ]
return None
def query_wave_task_additional_info_sql ( self , task_no , nodeid , area_id ) :
2026-03-19 14:26:36 +08:00
sql = """
2026-04-07 09:48:43 +08:00
select * from task . greenwave_task_additional_detail where task_no = % s and nodeid = % s and area_id = % s
2026-03-19 14:26:36 +08:00
""" % (task_no, nodeid, area_id)
res = self . do_select ( sql )
if res :
2026-04-07 09:48:43 +08:00
return res [ 0 ]
2026-03-19 14:26:36 +08:00
return None
2026-04-07 09:48:43 +08:00
def query_wave_task_crosses ( self , task_no ) :
sql = """
select * from task . greenwave_task_cross_detail where task_no = % s
""" % (task_no)
return self . do_select ( sql )
def query_wave_task_tp_info ( self , task_no , nodeid , area_id ) :
sql = """
select * from task . greenwave_task_tp_detail where task_no = % s and nodeid = % s and area_id = % s
""" % (task_no, nodeid, area_id)
return self . do_select ( sql )
def query_task_check_res ( self , task_no , nodeid , area_id ) :
sql = """
select * from task . greenwave_task_result_addirm_detail where task_no = % s and nodeid = % s and area_id = % s
""" % (task_no, nodeid, area_id)
res = self . do_select ( sql )
tp_info_dict = { }
for row in res :
if row [ ' tp_info_id ' ] not in tp_info_dict . keys ( ) :
tp_info_dict [ row [ ' tp_info_id ' ] ] = [ ]
tp_info_dict [ row [ ' tp_info_id ' ] ] . append ( row )
return tp_info_dict
def query_greenwave_task_adjustment_record ( self , task_no ) :
sql = """
select * from task . greenwave_task_tiny_adjustment_record where task_no = % s
""" % (task_no)
return self . do_select ( sql )
def save_task_require_confirm_info ( self , task_no , nodeid , area_id , require_confirm_info , creator_id , creator ) :
base_info = require_confirm_info [ ' base_info ' ]
cross_list = require_confirm_info [ ' cross_list ' ]
tp_info = require_confirm_info [ ' tp_info ' ]
insert_base_info_sql = """
insert into task . greenwave_task_detail ( task_no , wave_name , waveid , start_cross_name , end_cross_name , cross_num ,
task_src , plan_start_time , plan_end_time , predict_issue_time , is_urgency , wave_status , slc_company ,
max_phase_position , left_coor , comment , creator_id , creator , executor_id , executor ) values ( % s , % s , % s , % s , % s , % s , % s , % s , % s , % s ,
% s , % s , % s , % s , % s , % s , % s , % s , % s , % s )
"""
params = (
task_no ,
base_info [ ' wave_name ' ] ,
base_info [ ' waveid ' ] ,
base_info [ ' start_cross_name ' ] ,
base_info [ ' end_cross_name ' ] ,
base_info [ ' cross_num ' ] ,
base_info [ ' task_src ' ] ,
base_info [ ' plan_start_time ' ] ,
base_info [ ' plan_end_time ' ] ,
base_info [ ' predict_issue_time ' ] ,
base_info [ ' is_urgency ' ] ,
base_info [ ' wave_status ' ] ,
base_info [ ' slc_company ' ] ,
base_info [ ' max_phase_position ' ] ,
base_info [ ' left_coor ' ] ,
base_info [ ' comment ' ] ,
creator_id ,
creator ,
base_info [ ' executor_id ' ] ,
base_info [ ' executor ' ]
)
insert_cross_list = [ ]
for item in cross_list :
influence_factor = item [ ' influence_factor ' ]
phase_info_json = json . dumps ( item [ ' phase_info ' ] , ensure_ascii = False )
insert_cross_list . append ( ( task_no , item [ ' crossid ' ] , item [ ' cross_name ' ] , item [ ' location ' ] , influence_factor [ ' intersect_coor ' ] ,
influence_factor [ ' phase_pos_adjust ' ] , influence_factor [ ' phase_seq_adjust ' ] , influence_factor [ ' phase_adjust ' ] ,
influence_factor [ ' borrow_left ' ] , influence_factor [ ' waiting_left ' ] , influence_factor [ ' waiting_straight ' ] ,
influence_factor [ ' waiting_bicycle ' ] , phase_info_json ) )
insert_cross_info_sql = """
insert into task . greenwave_task_cross_detail ( task_no , crossid , cross_name , location , intersect_coor ,
phase_pos_adjust , phase_seq_adjust , phase_adjust , borrow_left , waiting_left , waiting_straight ,
waiting_bicycle , phase_info ) values ( % s , % s , % s , % s , % s , % s , % s , % s , % s , % s , % s , % s , % s )
"""
insert_tp_info = [ ]
for item in tp_info :
insert_tp_info . append ( ( task_no , item [ ' wave_tp_id ' ] , item [ ' tp_start ' ] , item [ ' tp_end ' ] , item [ ' coor_dir ' ] , item [ ' priority_coor_dir ' ] , item [ ' weekday ' ] , nodeid , area_id ) )
insert_tp_info_sql = """
insert into task . greenwave_task_tp_detail ( task_no , wave_tp_id , tp_start , tp_end , coor_dir , priority_coor_dir , weekday ,
nodeid , area_id ) values ( % s , % s , % s , % s , % s , % s , % s , % s , % s )
"""
insert_additional_sql = " insert into task.greenwave_task_additional_detail (task_no, nodeid, area_id) values ( %s , %s , %s ) "
insert_additional_params = ( task_no , nodeid , area_id )
conn , cursor = self . connect ( )
try :
# logging.warning(cursor.mogrify(insert_base_info_sql, params))
insert_greenwave_task_ret = cursor . execute ( insert_base_info_sql , params )
# logging.warning(cursor.mogrify(insert_cross_info_sql, insert_cross_list))
insert_cross_ret = cursor . executemany ( insert_cross_info_sql , insert_cross_list )
# logging.warning(cursor.mogrify(insert_tp_info_sql, insert_tp_info))
insert_tp_ret = cursor . executemany ( insert_tp_info_sql , insert_tp_info )
insert_additional_ret = cursor . execute ( insert_additional_sql , insert_additional_params )
if insert_greenwave_task_ret == 1 and insert_cross_ret == len ( cross_list ) and insert_tp_ret == len ( tp_info ) and insert_additional_ret == 1 :
conn . commit ( )
return True
else :
logging . error ( insert_greenwave_task_ret , insert_cross_ret , len ( cross_list ) , insert_tp_ret , len ( tp_info ) )
conn . rollback ( )
return False
except Exception as e :
conn . rollback ( )
print ( e )
logging . error ( e )
return False
2026-04-07 15:54:14 +08:00
def init_additional_info ( self , task_no , nodeid , area_id , cross_infos ) :
cross_values = [ ]
for cross_info in cross_infos :
cross_values . append ( ( task_no , cross_info [ ' crossid ' ] , cross_info [ ' name ' ] , cross_info [ ' location ' ] ) )
2026-04-07 14:53:58 +08:00
insert_additional_sql = " insert into task.greenwave_task_additional_detail (task_no, nodeid, area_id) values ( %s , %s , %s ) " % ( task_no , nodeid , area_id )
2026-04-07 15:54:14 +08:00
insert_cross_sql = """
insert into task . greenwave_task_cross_detail ( task_no , crossid , cross_name , location ) values ( % s , % s , % s , % s )
"""
conn , cursor = self . connect ( )
try :
insert_additional_ret = cursor . execute ( insert_additional_sql )
insert_cross_ret = cursor . executemany ( insert_cross_sql , cross_values )
if insert_additional_ret == 1 and insert_cross_ret == len ( cross_infos ) :
conn . commit ( )
return True
else :
conn . rollback ( )
return False
except Exception as e :
conn . rollback ( )
print ( e )
logging . error ( e )
return False
2026-04-07 14:53:58 +08:00
2026-04-07 09:48:43 +08:00
def async_wave_tp_id ( self , wave_tp_info , task_tp_info , task_no , nodeid , area_id ) :
# 补充同步绿波时段id到任务时段信息表中的逻辑
wave_tp_info_dict = { row [ ' tp_start ' ] + ' - ' + row [ ' tp_end ' ] + ' - ' + row [ ' weekday ' ] : row [ ' wave_tp_id ' ] for row in wave_tp_info }
task_tp_info_dict = { row [ ' tp_start ' ] + ' - ' + row [ ' tp_end ' ] + ' - ' + row [ ' weekday ' ] : row for row in task_tp_info }
for tp_key in task_tp_info_dict :
if tp_key in wave_tp_info_dict :
task_tp_info_dict [ tp_key ] [ ' wave_tp_id ' ] = wave_tp_info_dict [ tp_key ]
else :
return False
update_sql = """
update task . greenwave_task_tp_detail set wave_tp_id = % s where tp_start = % s and tp_end = % s and weekday = % s and task_no = % s and nodeid = % s and area_id = % s
"""
conn , cursor = self . connect ( )
try :
for tp_info in task_tp_info_dict . values ( ) :
cursor . execute ( update_sql , ( tp_info [ ' wave_tp_id ' ] , tp_info [ ' tp_start ' ] , tp_info [ ' tp_end ' ] , tp_info [ ' weekday ' ] , task_no , nodeid , area_id ) )
conn . commit ( )
return True
except Exception as e :
conn . rollback ( )
print ( e )
return False
def update_stage1_info ( self , task_no , waveid , wave_name , task_stage , task_info , wave_crosses ) :
conn , cursor = self . connect ( )
try :
if task_info [ ' waveid ' ] and task_info [ ' waveid ' ] != ' ' and task_info [ ' wave_name ' ] and task_info [ ' wave_name ' ] != ' ' :
# 表示任务已经绑定绿波, 则该次操作仅需要更新task_stage
update_sql = """
update task . greenwave_task_additional_detail set task_stage = % s where task_no = % s
""" % (task_stage, task_no)
cursor . execute ( update_sql )
conn . commit ( )
return True
else :
crossids = ' , ' . join ( wave_crosses )
update_sql1 = """
update task . task set waveid = % s , wave_name = % s , crossids = % s where taskno = % s
"""
update_sql2 = """
update task . greenwave_task_additional_detail set task_stage = % s where task_no = % s
"""
cursor . execute ( update_sql1 , ( waveid , wave_name , crossids , task_no ) )
cursor . execute ( update_sql2 , ( task_stage , task_no ) )
conn . commit ( )
return True
except Exception as e :
conn . rollback ( )
print ( e )
return False
def update_stage2_info ( self , task_no , stage_info ) :
sql = """
update task . greenwave_task_additional_detail set task_stage = ' %s ' where task_no = % s
""" % (stage_info, task_no)
return self . do_execute ( sql )
def update_stage3_info ( self , actually_issue_time , task_no , task_stage , delay_reason ) :
sql = """
update task . greenwave_task_additional_detail set actually_issue_time = ' %s ' , task_stage = ' %s ' , delay_issue_reason = ' %s ' where task_no = % s
""" % (actually_issue_time, task_stage, delay_reason, task_no)
return self . do_execute ( sql )
def update_stage4_info ( self , task_no , task_stage , tp_info , nodeid , area_id , task_additional_info ) :
old_tp_info = self . query_wave_task_tp_info ( task_no , nodeid , area_id )
old_tp_info_dict = { row [ ' tp_start ' ] + ' - ' + row [ ' tp_end ' ] : row for row in old_tp_info }
update_tp_check_res_set , update_tp_json_set , update_tp_info_ids , has_bad = set ( ) , set ( ) , [ ] , False
for tp_item in tp_info :
tp_info_id = tp_item [ ' tp_info_id ' ]
result_check_res = tp_item [ ' result_check_res ' ]
tp_key = tp_item [ ' tp_start ' ] + ' - ' + tp_item [ ' tp_end ' ]
if result_check_res == 1 :
has_bad = True
not_pass_detail = tp_item [ ' not_pass_detail ' ]
# 处理 not_pass_detail, 可能是字符串、列表或 None
if not_pass_detail is None or not not_pass_detail :
not_pass_detail_json = ' '
else :
# 如果是字符串,需要先处理转义字符(包括中文)
# 先替换转义的双引号和换行符等
not_pass_detail_json = json . dumps ( not_pass_detail , ensure_ascii = False )
# cleaned_str = parsed.replace('\\"', '"').replace('\\\\n', '\\n')
# not_pass_detail_json = json.dumps(parsed, ensure_ascii=False)
if result_check_res != old_tp_info_dict [ tp_key ] [ ' result_check_res ' ] or not_pass_detail_json != old_tp_info_dict [ tp_key ] [ ' not_pass_detail ' ] :
update_tp_check_res_set . add ( f " when { tp_info_id } then { result_check_res } " )
update_tp_json_set . add ( f " when { tp_info_id } then ' { not_pass_detail_json } ' " )
update_tp_info_ids . append ( tp_info_id )
conn , cursor = self . connect ( )
try :
update_tp_info_sql = f """
update task . greenwave_task_tp_detail
set result_check_res =
case id
{ ' ' . join ( list ( update_tp_check_res_set ) ) }
else result_check_res
end ,
not_pass_detail =
case id
{ ' ' . join ( list ( update_tp_json_set ) ) }
else not_pass_detail
end
where id in ( { ' , ' . join ( str ( id ) for id in update_tp_info_ids ) } )
and nodeid = { nodeid }
and area_id = { area_id }
and task_no = { task_no }
"""
cursor . execute ( update_tp_info_sql )
if has_bad :
update_stage_sql = """
update task . greenwave_task_additional_detail set task_stage = % s where task_no = % s
"""
cursor . execute ( update_stage_sql , ( task_stage , task_no ) )
else :
now_time = datetime . now ( ) . strftime ( ' % Y- % m- %d % H: % M: % S ' )
update_state_sql = """
update task . greenwave_task_additional_detail set task_stage = % s , affirm_time = % s where task_no = % s
"""
cursor . execute ( update_state_sql , ( task_stage , now_time , task_no ) )
conn . commit ( )
return True , has_bad
except Exception as e :
conn . rollback ( )
logging . error ( e )
print ( e )
return False , has_bad
def update_stage5_info ( self , task_no , task_stage , affirm_time ) :
sql = """
update task . greenwave_task_additional_detail set task_stage = ' %s ' , affirm_time = ' %s ' where task_no = % s
""" % (task_stage, affirm_time, task_no)
return self . do_execute ( sql )
def update_stage7_info ( self , task_no , task_stage , open_wave_monitor_job , belong_wave_monitor_job_id , belong_wave_monitor_job_name , task_additional_info , phase_update_time_record ) :
update_phase_list = [ ]
crossids = " ' " + " ' , ' " . join ( item for item in phase_update_time_record ) + " ' "
for crossid in phase_update_time_record :
update_phase_list . append ( f " when ' { crossid } ' then 1 " )
update_phase_sql = f """
update task . greenwave_task_cross_detail set phase_update_status = case crossid { ' ' . join ( update_phase_list ) } else phase_update_status end where task_no = { task_no } and crossid in ( { crossids } )
"""
conn , cursor = self . connect ( )
try :
cursor . execute ( update_phase_sql )
if open_wave_monitor_job != task_additional_info [ ' open_wave_monitor_job ' ] or belong_wave_monitor_job_id != task_additional_info [ ' belong_wave_monitor_job_id ' ] or belong_wave_monitor_job_name != task_additional_info [ ' belong_wave_monitor_job_name ' ] :
update_sql = """
update task . greenwave_task_additional_detail set task_stage = % s , open_wave_monitor_job = % s , belong_wave_monitor_job_id = % s , belong_wave_monitor_job_name = % s where task_no = % s
"""
cursor . execute ( update_sql , ( task_stage , open_wave_monitor_job , belong_wave_monitor_job_id , belong_wave_monitor_job_name , task_no ) )
conn . commit ( )
return True
else :
update_sql = """
update task . greenwave_task_additional_detail set task_stage = % s where task_no = % s
"""
cursor . execute ( update_sql , ( task_stage , task_no ) )
conn . commit ( )
return True
except Exception as e :
conn . rollback ( )
logging . error ( e )
print ( e )
return False
def update_additional_detail_file_info ( self , stage_no , download_url , task_no , nodeid , area_id , upload_time ) :
if stage_no == 2 :
update_sql = """
update task . greenwave_task_additional_detail set wave_optimize_url = ' %s ' , upload_wave_optimize_time = ' %s ' where task_no = % s and nodeid = % s and area_id = % s
""" % (download_url, upload_time, task_no, nodeid, area_id)
else :
update_sql = """
update task . greenwave_task_additional_detail set compare_report_url = ' %s ' , upload_compare_report_time = ' %s ' where task_no = % s and nodeid = % s and area_id = % s
""" % (download_url, upload_time, task_no, nodeid, area_id)
return self . do_execute ( update_sql )
def insert_greenwave_task_tiny_adjustment_record ( self , values ) :
sql = """
insert into task . greenwave_task_tiny_adjustment_record ( task_no , upload_file_path , creator_id , creator ) values ( % s , % s , % s , % s )
"""
return self . do_executemany ( sql , values )
def del_greenwave_task_tiny_adjustment_record_sql ( self , id ) :
sql = """
delete from task . greenwave_task_tiny_adjustment_record where id = % s
""" % i d
return self . do_execute ( sql )
def update_task_progress ( self , task_no , nodeid , area_id , progress ) :
sql = """
update task . task set progress = % s where taskno = % s and nodeid = % s and area_id = % s
""" % (progress, task_no, nodeid, area_id)
return self . do_execute ( sql )
2025-11-11 10:07:25 +08:00
#
# if __name__ == '__main__':
# tt_5min = get_latest_5min_timestamp()
# print(tt_5min)
# print(get_today_str())