2025-10-10 14:38:22 +08:00
# -*- coding:utf-8 -*-
import pymysql
import pymysql . cursors
from flask import g
from app . db_func_base import *
class PhaseTableDbHelper ( TableDbHelperBase ) :
def __init__ ( self , pool ) :
self . db_pool = pool
self . DB_Name = ' phasetable '
def query_all_wave_cross ( self , orgid_list ) :
conn , cursor = self . connect ( )
try :
sql = f ''' select max(wc.waveid) waveid,
wc . crossid ,
wc . orgid ,
max ( wc . location ) location ,
max ( wc . cross_name ) cross_name
from greenwave . wave_cross as wc
join greenwave . greenwave as g on wc . orgid = g . orgid and g . status < 2
where wc . orgid in % s
group by wc . crossid , wc . orgid
order by wc . orgid asc '''
#print(cursor.mogrify(sql, (tuple(orgid_list),)))
cursor . execute ( sql , ( tuple ( orgid_list ) , ) )
result = cursor . fetchall ( )
self . close ( conn , cursor )
return result , None
except Exception as error :
self . close ( conn , cursor )
return None , error
def query_line_wave_cross ( self , crossid ) :
conn , cursor = self . connect ( )
try :
sql = f ''' select max(wc.waveid) waveid,
wc . crossid ,
wc . orgid ,
max ( wc . location ) location ,
max ( wc . cross_name ) cross_name
from greenwave . wave_cross as wc
join greenwave . greenwave as g on wc . orgid = g . orgid
where wc . crossid = % s
group by wc . crossid , wc . orgid
order by wc . orgid asc '''
cursor . execute ( sql , ( crossid ) )
result = cursor . fetchone ( )
self . close ( conn , cursor )
return result , None
except Exception as error :
self . close ( conn , cursor )
return None , error
def query_cross_mapping ( self , nodeid , xl_crossid ) :
conn , cursor = self . connect ( )
try :
sql = f ''' select *
from { self . DB_Name } . cross_mapping
where nodeid = % s and xl_crossid = % s '''
print ( cursor . mogrify ( sql , ( nodeid , xl_crossid ) ) )
cursor . execute ( sql , ( nodeid , xl_crossid ) )
result = cursor . fetchone ( )
self . close ( conn , cursor )
return result , None
except Exception as error :
self . close ( conn , cursor )
return None , error
def query_jjcrossid_all ( self , nodeid ) :
conn , cursor = self . connect ( )
try :
sql = f ''' select
jj_crossid
from
{ self . DB_Name } . cross_mapping
where nodeid = % s group by jj_crossid '''
print ( cursor . mogrify ( sql , ( nodeid ) ) )
cursor . execute ( sql , ( nodeid ) )
result = cursor . fetchall ( )
self . close ( conn , cursor )
return result , None
except Exception as error :
self . close ( conn , cursor )
return None , error
def insert_onkey_update ( self , citycode , xl_crossid , jj_crossid ) :
conn , cursor = self . connect ( )
try :
sql = f ''' INSERT INTO { self . DB_Name } .cross_mapping
( ` xl_crossid ` , ` jj_crossid ` , ` nodeid ` ) VALUES
( % s , % s , % s ) ON DUPLICATE KEY UPDATE
` nodeid ` = % s '''
print ( cursor . mogrify ( sql , ( xl_crossid , jj_crossid , citycode , citycode ) ) )
cursor . execute ( sql , ( xl_crossid , jj_crossid , citycode , citycode ) )
self . close ( conn , cursor )
return None
except Exception as error :
self . close ( conn , cursor )
return error
def query_db_phase ( self , citycode , crossid ) :
conn , cursor = self . connect ( )
try :
sql = f ''' SELECT * FROM { self . DB_Name } .phase WHERE nodeid = %s and crossid = %s '''
print ( cursor . mogrify ( sql , ( citycode , crossid ) ) )
cursor . execute ( sql , ( citycode , crossid ) )
row_list = cursor . fetchall ( )
res = [ ]
for row in row_list :
res . append ( {
' phaseid ' : row [ ' phaseid ' ] ,
' name ' : row [ ' name ' ]
} )
self . close ( conn , cursor )
return res , None
except Exception as error :
self . close ( conn , cursor )
return None , error
def query_db_arrangement_all ( self , citycode , crossid ) :
conn , cursor = self . connect ( )
try :
sql = f ''' select
a . crossid ,
IFNULL ( a . ` day ` , 0 ) ` day ` ,
IFNULL ( a . ` month ` , 0 ) ` month ` ,
IFNULL ( a . ` weekday ` , 0 ) ` weekday ` ,
a . type ,
a . scheduleid ,
a . priority ,
IFNULL ( ps . planid , 0 ) planid ,
ds . control_mode ,
IFNULL ( ds . tp_start , ' ' ) tp_start ,
ps . name as plan_name ,
ps . cycle ,
ps . coord_phaseid ,
ps . offset plan_stage ,
IFNULL ( s . stageid , 0 ) stageid ,
ps . stage_duration ,
ps . stage_seq ,
IFNULL ( s . name , ' ' ) stage_name ,
IFNULL ( s . green , 0 ) green ,
IFNULL ( s . redyellow , 0 ) redyellow ,
IFNULL ( s . yellow , 0 ) yellow ,
IFNULL ( s . allred , 0 ) allred ,
IFNULL ( s . phases , ' ' ) phases ,
IFNULL ( max ( p . min_green ) , 0 ) min_green ,
IFNULL ( max ( p . max_green ) , 0 ) max_green
from
{ self . DB_Name } . arrangement as a
left join { self . DB_Name } . day_schedule as ds on ds . scheduleid = a . scheduleid and ds . crossid = a . crossid and ds . nodeid = a . nodeid
left join { self . DB_Name } . plan_stage as ps on ps . planid = ds . planid and ds . crossid = ps . crossid and ps . nodeid = a . nodeid
left join { self . DB_Name } . stage as s on s . stageid = ps . stageid and s . crossid = ps . crossid and s . nodeid = a . nodeid
left join { self . DB_Name } . phase as p ON FIND_IN_SET ( p . phaseid , s . phases ) > 0 and p . crossid = s . crossid and p . nodeid = a . nodeid
WHERE
a . nodeid = % s and a . crossid = % s
GROUP BY
a . crossid , a . ` day ` , a . ` month ` , a . weekday , a . type , a . scheduleid , a . priority ,
ds . planid , ds . control_mode , ds . tp_start , ps . name , ps . cycle , ps . coord_phaseid ,
ps . offset , ps . stageid , ps . stage_duration , ps . stage_seq , s . name , s . green , s . redyellow ,
s . yellow , s . allred , s . phases
ORDER BY a . day asc , a . month asc , a . weekday asc , a . priority asc , ds . tp_start asc , ps . stage_seq asc '''
print ( cursor . mogrify ( sql , ( citycode , crossid ) ) )
cursor . execute ( sql , ( citycode , crossid ) )
result = cursor . fetchall ( )
self . close ( conn , cursor )
return result , None
except Exception as error :
self . close ( conn , cursor )
return None , error
def query_phase_record_db ( self , citycode , crossid ) :
conn , cursor = self . connect ( )
try :
sql = f ''' SELECT type,max(`update_time`) update_time FROM { self . DB_Name } .phase_record WHERE nodeid = %s and crossid = %s group by type '''
print ( cursor . mogrify ( sql , ( citycode , crossid ) ) )
cursor . execute ( sql , ( citycode , crossid ) )
row_list = cursor . fetchall ( )
res = { }
for row in row_list :
res [ row [ ' type ' ] ] = str ( row [ ' update_time ' ] )
self . close ( conn , cursor )
return res , None
except Exception as error :
self . close ( conn , cursor )
return None , error
def query_exist_plan_id ( self , citycode , crossid ) :
conn , cursor = self . connect ( )
try :
sql_query = f ''' select planid from { self . DB_Name } .plan_stage where nodeid = %s and crossid = %s group by planid '''
print ( cursor . mogrify ( sql_query , ( citycode , crossid ) ) )
cursor . execute ( sql_query , ( citycode , crossid ) )
# 获取所有查询结果
result = cursor . fetchall ( )
self . close ( conn , cursor )
return result , None
except Exception as e :
self . close ( conn , cursor )
return None , e
def query_day_schedule_by_jjcrossid ( self , citycode , jjcrossid ) :
conn , cursor = self . connect ( )
try :
sql_query = f ''' select * from { self . DB_Name } .day_schedule where nodeid = %s and crossid = %s order by tp_start asc '''
cursor . execute ( sql_query , ( citycode , jjcrossid ) )
# 获取所有查询结果
result = cursor . fetchall ( )
self . close ( conn , cursor )
return result , None
except Exception as e :
self . close ( conn , cursor )
return None , e
def query_arrangement_by_jjcrossid ( self , citycode , jjcrossid ) :
conn , cursor = self . connect ( )
try :
sql = f ''' select * from { self . DB_Name } .arrangement where nodeid= %s and crossid = %s order by month,day,weekday asc '''
cursor . execute ( sql , ( citycode , jjcrossid ) )
# 获取所有查询结果
result = cursor . fetchall ( )
self . close ( conn , cursor )
return result , None
except Exception as e :
self . close ( conn , cursor )
return None , e
def query_phase_by_jjcrossid ( self , citycode , jjcrossid ) :
conn , cursor = self . connect ( )
try :
sql_query = f ''' select * from { self . DB_Name } .phase where nodeid = %s and crossid = %s order by phaseid asc '''
cursor . execute ( sql_query , ( citycode , jjcrossid ) )
# 获取所有查询结果
result = cursor . fetchall ( )
self . close ( conn , cursor )
return result , None
except Exception as e :
self . close ( conn , cursor )
return None , e
def query_plan_stage_by_jjcrossid ( self , citycode , jjcrossid ) :
conn , cursor = self . connect ( )
try :
sql_query = f ''' select ps.planid,
ps . name ,
ps . crossid ,
ps . cycle ,
ps . coord_phaseid ,
ps . offset ,
ps . stageid ,
ps . stage_duration ,
ps . stage_seq ,
s . name stage_name ,
s . green ,
s . redyellow ,
s . yellow ,
s . allred ,
s . phases ,
s . min_green ,
s . max_green
from { self . DB_Name } . plan_stage as ps
join { self . DB_Name } . stage as s on s . crossid = ps . crossid and s . stageid = ps . stageid and ps . nodeid = s . nodeid
where ps . crossid = % s and ps . nodeid = % s
order by ps . planid asc , ps . stage_seq asc '''
cursor . execute ( sql_query , ( jjcrossid , citycode ) )
# 获取所有查询结果
result = cursor . fetchall ( )
self . close ( conn , cursor )
return result , None
except Exception as e :
self . close ( conn , cursor )
return None , e
def query_schedule_line ( self , pid ) :
conn , cursor = self . connect ( )
try :
sql = f ''' select * from { self . DB_Name } .day_schedule where id = %s '''
print ( cursor . mogrify ( sql , pid ) )
cursor . execute ( sql , pid )
# 获取所有查询结果
result = cursor . fetchone ( )
self . close ( conn , cursor )
return result , None
except Exception as e :
self . close ( conn , cursor )
return None , e
def query_schedule_tps ( self , citycode , cross_no , scheduleid , tp_start ) :
conn , cursor = self . connect ( )
try :
sql_query = f ''' select * from { self . DB_Name } .day_schedule where nodeid = %s and crossid = %s and scheduleid = %s and tp_start > %s limit 1 '''
cursor . execute ( sql_query , ( citycode , cross_no , scheduleid , tp_start ) )
# 获取所有查询结果
result = cursor . fetchone ( )
self . close ( conn , cursor )
return result , None
except Exception as e :
self . close ( conn , cursor )
return None , e
def query_schedule_arrangement ( self , citycode , scheduleid , crossid ) :
conn , cursor = self . connect ( )
try :
sql_query = f ''' select * from { self . DB_Name } .arrangement where nodeid = %s and crossid = %s and scheduleid = %s '''
cursor . execute ( sql_query , ( citycode , crossid , scheduleid ) )
# 获取所有查询结果
result = cursor . fetchall ( )
self . close ( conn , cursor )
return result , None
except Exception as e :
self . close ( conn , cursor )
return None , e
# 根据路口获取干线配时方案
def query_green_waves_by_crossid ( self , crossid , nodeid ) :
conn , cursor = self . connect ( )
try :
sql_query = f ''' select ac.arteryid,
a . name artery_name ,
gw . waveid ,
gw . name ,
gw . type ,
gw . start_cross_seq ,
gw . cross_num ,
gw . launch_mode ,
gw . week_days ,
gw . tp ,
gw . tp_start ,
gw . tp_end ,
gw . cycle ,
gw . planid_list ,
group_concat ( t1 . crossid ) crossids
from tmnet . artery_cross as ac
join tmnet . artery as a on ac . arteryid = a . arteryid
join tmnet . green_waves as gw on ac . arteryid = gw . arteryid
left join ( select arteryid , crossid
from tmnet . artery_cross
where arteryid in ( select arteryid
from tmnet . artery_cross
where crossid = % s
and nodeid = % s )
order by cross_seq asc
) as t1 on t1 . arteryid = ac . arteryid
where ac . crossid = % s
and ac . nodeid = % s
group by ac . arteryid , a . name , gw . waveid , gw . name , gw . type ,
gw . start_cross_seq , gw . cross_num , gw . launch_mode ,
gw . week_days , gw . tp , gw . tp_start , gw . tp_end , gw . cycle , gw . planid_list '''
cursor . execute ( sql_query , ( crossid , nodeid , crossid , nodeid ) )
# 获取所有查询结果
result = cursor . fetchall ( )
self . close ( conn , cursor )
return result , None
except Exception as e :
self . close ( conn , cursor )
return None , e
def update_phase ( self , citycode , crossid , phaseid , min_green , max_green ) :
conn , cursor = self . connect ( )
try :
sql = f " UPDATE { self . DB_Name } .`phase` SET `min_green` = %s,`max_green` = %s where phaseid = %s and crossid = %s and nodeid = %s "
cursor . execute ( sql , ( min_green , max_green , phaseid , crossid , citycode ) )
self . close ( conn , cursor )
except Exception as error :
self . close ( conn , cursor )
return error
return None
def insert_phase_db ( self , citycode , crossid , name , lanes , min_green , max_green ) :
conn , cursor = self . connect ( )
try :
sql = f " delete from { self . DB_Name } .phase where crossid = %s and name = %s and nodeid = %s "
cursor . execute ( sql , ( crossid , name , citycode ) )
sql = f " select ifnull(max(phaseid),0) max_phaseid from { self . DB_Name } .phase where crossid = %s and nodeid = %s "
cursor . execute ( sql , ( crossid , citycode ) )
line = cursor . fetchone ( )
maxid = line [ ' max_phaseid ' ] + 1
# 准备插入数据的 SQL 语句
sql = f " INSERT INTO { self . DB_Name } .phase (`phaseid`, `name`,`crossid`,`lanes`,`min_green`,`max_green`,`nodeid`) VALUES (%s,%s,%s,%s,%s,%s,%s) "
cursor . execute ( sql , ( maxid , name , crossid , lanes , min_green , max_green , citycode ) )
self . close ( conn , cursor )
except Exception as error :
self . close ( conn , cursor )
return error
return None
def update_stage_line ( self , citycode , max_stageid , stage_name , crossid , green , redyellow , yellow , allred , phase ,
min_green ,
max_green ) :
conn , cursor = self . connect ( )
try :
# 如果数据不存在,则执行插入操作
insert_sql = f " update { self . DB_Name } .`stage` set `name`=%s, `green`=%s, `redyellow`=%s, `yellow`=%s,`allred`=%s,`phases`=%s,`min_green`=%s,`max_green`=%s where stageid = %s and crossid=%s and nodeid = %s "
cursor . execute ( insert_sql , (
stage_name , green , redyellow , yellow , allred , phase , min_green , max_green , max_stageid , crossid ,
citycode ) )
self . close ( conn , cursor )
return None
except Exception as error :
self . close ( conn , cursor )
return error
def insert_stage_line ( self , citycode , max_stageid , stage_name , crossid , green , redyellow , yellow , allred , phase ,
min_green ,
max_green ) :
conn , cursor = self . connect ( )
try :
# 如果数据不存在,则执行插入操作
insert_sql = f " INSERT INTO { self . DB_Name } .`stage` (`stageid`,`name`, `crossid`, `green`, `redyellow`, `yellow`,`allred`,`phases`,`min_green`,`max_green`,`nodeid`) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s) "
cursor . execute ( insert_sql , (
max_stageid , stage_name , crossid , green , redyellow , yellow , allred , phase , min_green , max_green ,
citycode ) )
self . close ( conn , cursor )
return None
except Exception as error :
self . close ( conn , cursor )
return error
def insert_plan_stage_db ( self , citycode , planid , name , crossid , cycle , coord_phaseid , offset , stageid ,
stage_duration ,
stage_seq ) :
conn , cursor = self . connect ( )
try :
conn , cursor = self . connect ( )
insert_sql = f " INSERT INTO { self . DB_Name } .`plan_stage` (`planid`, `name`, `crossid`, `cycle`, `coord_phaseid`,`offset`,`stageid`,`stage_duration`,`stage_seq`,`nodeid`) VALUES (%s, %s, %s, %s, %s,%s,%s,%s,%s,%s) "
cursor . execute ( insert_sql ,
( planid , name , crossid , cycle , coord_phaseid , offset , stageid , stage_duration , stage_seq ,
citycode ) )
self . close ( conn , cursor )
return None
except Exception as error :
self . close ( conn , cursor )
return error
def delete_stage_line ( self , citycode , stageid , jjcrossid ) :
conn , cursor = self . connect ( )
try :
# 如果数据不存在,则执行插入操作
sql = f " delete from { self . DB_Name } .`stage` where stageid = %s and crossid = %s and nodeid = %s "
cursor . execute ( sql , ( stageid , jjcrossid , citycode ) )
self . close ( conn , cursor )
return None
except Exception as error :
self . close ( conn , cursor )
return error
def delete_plan_stage_db ( self , citycode , crossid , planid ) :
conn , cursor = self . connect ( )
try :
sql = f " delete FROM { self . DB_Name } .`plan_stage` WHERE `planid` = %s and `crossid` = %s and nodeid = %s "
cursor . execute ( sql , ( planid , crossid , citycode ) )
self . close ( conn , cursor )
return None
except Exception as error :
self . close ( conn , cursor )
return error
def update_schedule ( self , id , control_mode , tp_start , planid ) :
conn , cursor = self . connect ( )
try :
sql = f " UPDATE { self . DB_Name } .`day_schedule` SET `control_mode` = %s,`tp_start` = %s,`planid` = %s where id = %s "
cursor . execute ( sql , ( control_mode , tp_start , planid , id ) )
self . close ( conn , cursor )
return None
except Exception as error :
self . close ( conn , cursor )
return error
def insert_schedule ( self , citycode , crossid , control_mode , tp_start , planid , scheduleid ) :
conn , cursor = self . connect ( )
try :
sql = f " insert into { self . DB_Name } .`day_schedule`(`crossid`,`control_mode`,`tp_start`,`planid`,`scheduleid`,`nodeid`) values(%s,%s,%s,%s,%s,%s) "
cursor . execute ( sql , ( crossid , control_mode , tp_start , planid , scheduleid , citycode ) )
self . close ( conn , cursor )
return None
except Exception as error :
self . close ( conn , cursor )
return error
def query_plan_stage_by_planid ( self , citycode , jjcrossid , planid ) :
conn , cursor = self . connect ( )
try :
sql_query = f ''' select * from { self . DB_Name } .plan_stage where crossid = %s and planid = %s and nodeid = %s '''
cursor . execute ( sql_query , ( jjcrossid , planid , citycode ) )
# 获取所有查询结果
result = cursor . fetchall ( )
self . close ( conn , cursor )
return result , None
except Exception as e :
self . close ( conn , cursor )
return None , e
def query_day_schedule_by_planid ( self , citycode , jjcrossid , planid ) :
conn , cursor = self . connect ( )
try :
sql_query = f ''' select * from { self . DB_Name } .day_schedule where crossid = %s and planid = %s and nodeid = %s '''
cursor . execute ( sql_query , ( jjcrossid , planid , citycode ) )
# 获取所有查询结果
result = cursor . fetchall ( )
self . close ( conn , cursor )
return result , None
except Exception as e :
self . close ( conn , cursor )
return None , e
def delete_day_schedule ( self , id ) :
conn , cursor = self . connect ( )
try :
sql_query = f ''' delete from { self . DB_Name } .day_schedule where id = %s '''
cursor . execute ( sql_query , id )
# 获取所有查询结果
self . close ( conn , cursor )
return None
except Exception as e :
self . close ( conn , cursor )
return e
def inset_onkey_update_info ( self , param ) :
conn , cursor = self . connect ( )
try :
sql_query = f ''' INSERT INTO { self . DB_Name } .update_info
( ` value ` , ` param ` ) VALUES
( UNIX_TIMESTAMP ( ) , % s ) ON DUPLICATE KEY UPDATE
` value ` = UNIX_TIMESTAMP ( ) '''
cursor . execute ( sql_query , param )
# 获取所有查询结果
self . close ( conn , cursor )
return None
except Exception as e :
self . close ( conn , cursor )
return e
def multi_insert_arrangement_db ( self , citycode , data , jj_crossid ) :
conn , cursor = self . connect ( )
try :
conn . begin ( )
sql = f " DELETE FROM { self . DB_Name } .arrangement WHERE crossid = %s and nodeid = %s "
cursor . execute ( sql , ( jj_crossid , citycode ) )
# 准备插入数据的 SQL 语句
sql = f " INSERT INTO { self . DB_Name } .arrangement (`crossid`, `type`,`month`,`day`,`weekday`,`scheduleid`,`priority`,`nodeid`) VALUES (%s,%s,%s,%s,%s,%s,%s,%s) "
inserts = [ ]
for value in data :
inserts . append ( (
jj_crossid ,
value [ ' type ' ] if value [ ' type ' ] != ' ' else 0 ,
value [ ' month ' ] if value [ ' month ' ] != ' ' else None ,
value [ ' day ' ] if value [ ' day ' ] != ' ' else None ,
value [ ' weekday ' ] if value [ ' weekday ' ] != ' ' else None ,
value [ ' scheduleid ' ] if value [ ' scheduleid ' ] != ' ' else 0 ,
value [ ' priority ' ] if value [ ' priority ' ] != ' ' else 0 ,
citycode ,
) )
# 执行批量插入操作
cursor . executemany ( sql , inserts )
conn . commit ( )
self . close ( conn , cursor )
return None
except Exception as error :
conn . rollback ( )
self . close ( conn , cursor )
return error
def insert_phase_record_db ( self , citycode , jj_crossid , type , userid = None ) :
conn , cursor = self . connect ( )
try :
sql = f " INSERT INTO { self . DB_Name } .`phase_record` (`crossid`,`type`,`nodeid`,`user`) VALUES (%s, %s, %s,%s) "
cursor . execute ( sql , ( jj_crossid , type , citycode , userid ) )
self . close ( conn , cursor )
return None
except Exception as error :
self . close ( conn , cursor )
return error
def multi_insert_plan_stage_db ( self , citycode , data , jj_crossid , all = False ) :
conn , cursor = self . connect ( )
try :
if all :
select_sql = f " delete FROM { self . DB_Name } .`plan_stage` WHERE `crossid` = %s and `nodeid` = %s "
cursor . execute ( select_sql , ( jj_crossid , citycode ) )
else :
for value in data :
select_sql = f " delete FROM { self . DB_Name } .`plan_stage` WHERE `planid` = %s and `crossid` = %s and `nodeid` = %s "
cursor . execute ( select_sql , ( value [ ' planid ' ] , jj_crossid , citycode ) )
for value in data :
# select_sql = "SELECT * FROM `plan_stage` WHERE `planid` = %s and `crossid` = '%s'" % (
# value['planid'], value['stageid'], crossid)
# cursor.execute(select_sql)
# result = cursor.fetchone()
# if result:
# update_sql = "UPDATE `plan_stage` SET `name` = '%s', `cycle` = %s, `stageid` = %s, `stage_duration` = %s,`stage_seq` = %s,`coord_phaseid`=%s,`offset`=%s WHERE `planid` = %s AND `stageid` = '%s' and `crossid` = '%s'" % (
# value['name'], value['cycle'], value['stageid'], value['stage_duration'], value['stage_seq'], 0,
# 0, value['planid'], value['stageid'], crossid)
# cursor.execute(update_sql)
# else:
# 如果数据不存在,则执行插入操作
coord_phaseid = value [ ' coord_phaseid ' ] if value . get ( ' coord_phaseid ' ) else 0
offset = value [ ' offset ' ] if value . get ( ' offset ' ) else 0
insert_sql = f " INSERT INTO { self . DB_Name } .`plan_stage` (`planid`, `name`, `crossid`, `cycle`, `coord_phaseid`,`offset`,`stageid`,`stage_duration`,`stage_seq`,`nodeid`) VALUES (%s, %s, %s, %s, %s,%s,%s,%s,%s,%s) "
cursor . execute ( insert_sql , (
value [ ' planid ' ] , value [ ' name ' ] , jj_crossid , value [ ' cycle ' ] , coord_phaseid , offset , value [ ' stageid ' ] ,
value [ ' stage_duration ' ] , value [ ' stage_seq ' ] , citycode ) )
self . close ( conn , cursor )
return None
except Exception as error :
self . close ( conn , cursor )
return error
def multi_insert_stage_db ( self , citycode , data , jj_crossid , all = False ) :
conn , cursor = self . connect ( )
try :
if all :
sql = f " DELETE FROM { self . DB_Name } .stage WHERE crossid = %s and nodeid = %s "
cursor . execute ( sql , ( jj_crossid , citycode ) )
for value in data :
if all :
# 如果数据不存在,则执行插入操作
insert_sql = f " INSERT INTO { self . DB_Name } .`stage` (`stageid`,`name`, `crossid`, `green`, `redyellow`, `yellow`,`allred`,`phases`,`nodeid`) VALUES (%s, %s, %s, %s, 0,%s,%s,%s,%s) "
cursor . execute ( insert_sql , (
value [ ' stageid ' ] , value [ ' name ' ] , jj_crossid , value [ ' green ' ] , value [ ' yellow ' ] , value [ ' allred ' ] ,
value [ ' phases ' ] , citycode ) )
continue
select_sql = f " SELECT * FROM { self . DB_Name } .`stage` WHERE `stageid` = %s AND `crossid` = %s and `nodeid` = %s "
cursor . execute ( select_sql , ( value [ ' stageid ' ] , jj_crossid , citycode ) )
result = cursor . fetchone ( )
if result :
update_sql = f " UPDATE { self . DB_Name } .`stage` SET `name` = %s, `green` = %s, `redyellow` = %s, `allred` = %s,`phases` = %s,`yellow`= %s WHERE `stageid` = %s and `crossid` = %s and `nodeid` = %s "
cursor . execute ( update_sql , (
value [ ' name ' ] , value [ ' green ' ] , 0 , value [ ' allred ' ] , value [ ' phases ' ] , value [ ' yellow ' ] ,
value [ ' stageid ' ] , jj_crossid , citycode ) )
else :
# 如果数据不存在,则执行插入操作
insert_sql = f " INSERT INTO { self . DB_Name } .`stage` (`stageid`,`name`, `crossid`, `green`, `redyellow`, `yellow`,`allred`,`phases`,`nodeid`) VALUES (%s, %s, %s, %s, 0,%s,%s,%s,%s) "
cursor . execute ( insert_sql , (
value [ ' stageid ' ] , value [ ' name ' ] , jj_crossid , value [ ' green ' ] , value [ ' yellow ' ] , value [ ' allred ' ] ,
value [ ' phases ' ] , citycode ) )
self . close ( conn , cursor )
return None
except Exception as error :
self . close ( conn , cursor )
return error
def multi_insert_phase_db ( self , citycode , data , jj_crossid ) :
conn , cursor = self . connect ( )
try :
sql = f " DELETE FROM phasetable.phase WHERE crossid = %s and nodeid = %s "
cursor . execute ( sql , ( jj_crossid , citycode ) )
for value in data :
# 准备插入数据的 SQL 语句
sql = f " INSERT INTO phasetable.phase (`phaseid`, `name`,`crossid`,`lanes`,`min_green`,`max_green`,`nodeid`) VALUES (%s,%s,%s,%s,%s,%s,%s) "
cursor . execute ( sql , (
value [ ' phaseid ' ] , value [ ' name ' ] , jj_crossid , None , value [ ' min_green ' ] , value [ ' max_green ' ] ,
citycode ) )
self . close ( conn , cursor )
return None
except Exception as error :
self . close ( conn , cursor )
return error
def multi_insert_day_schedule_db ( self , citycode , data , jj_crossid , all = False ) :
conn , cursor = self . connect ( )
try :
if all :
sql = f " DELETE FROM { self . DB_Name } .day_schedule WHERE crossid = %s and nodeid = %s "
cursor . execute ( sql , ( jj_crossid , citycode ) )
for value in data :
if all :
insert_sql = f " INSERT INTO { self . DB_Name } .`day_schedule` (`scheduleid`, `crossid`, `control_mode`, `tp_start`, `planid`, `nodeid`) VALUES (%s, %s, %s, %s, %s, %s) "
cursor . execute ( insert_sql , (
value [ ' scheduleid ' ] , jj_crossid , value [ ' control_mode ' ] , value [ ' tp_start ' ] , value [ ' planid ' ] ,
citycode ) )
continue
select_sql = f " SELECT * FROM { self . DB_Name } .`day_schedule` WHERE `scheduleid` = %s AND `tp_start` = %s and `crossid` = %s and `nodeid` = %s "
cursor . execute ( select_sql , ( value [ ' scheduleid ' ] , value [ ' tp_start ' ] , jj_crossid , citycode ) )
result = cursor . fetchone ( )
if result :
update_sql = f " UPDATE { self . DB_Name } .`day_schedule` SET `control_mode` = %s, `planid` = %s WHERE `scheduleid` = %s AND `tp_start` = %s and `crossid` = %s and `nodeid` = %s "
cursor . execute ( update_sql , (
value [ ' control_mode ' ] , value [ ' planid ' ] , value [ ' scheduleid ' ] , value [ ' tp_start ' ] , jj_crossid ,
citycode ) )
else :
# 如果数据不存在,则执行插入操作
insert_sql = f " INSERT INTO { self . DB_Name } .`day_schedule` (`scheduleid`, `crossid`, `control_mode`, `tp_start`, `planid`, `nodeid`) VALUES (%s, %s, %s, %s, %s, %s) "
cursor . execute ( insert_sql , (
value [ ' scheduleid ' ] , jj_crossid , value [ ' control_mode ' ] , value [ ' tp_start ' ] , value [ ' planid ' ] ,
citycode ) )
self . close ( conn , cursor )
return None
except Exception as error :
self . close ( conn , cursor )
return error
def query_phase_by_cross_list ( self , citycode , cross_list ) :
conn , cursor = self . connect ( )
try :
sql = f ''' select
cm . xl_crossid
from cross_mapping as cm
join day_schedule as ds on cm . jj_crossid = ds . crossid and cm . nodeid = cm . nodeid
where
cm . nodeid = % s and cm . xl_crossid in % s
group by cm . xl_crossid '''
print ( cursor . mogrify ( sql , ( citycode , cross_list ) ) )
cursor . execute ( sql , ( citycode , cross_list ) )
result = cursor . fetchall ( )
self . close ( conn , cursor )
return result , None
except Exception as error :
self . close ( conn , cursor )
return None , error
def query_phase_by_cross_all ( self , citycode ) :
conn , cursor = self . connect ( )
try :
sql = f ''' select
cm . xl_crossid , cm . nodeid
from cross_mapping as cm
join day_schedule as ds on cm . jj_crossid = ds . crossid and cm . nodeid = cm . nodeid
where
cm . nodeid in % s
group by cm . xl_crossid , cm . nodeid '''
print ( cursor . mogrify ( sql , [ citycode ] ) )
cursor . execute ( sql , [ citycode ] )
result = cursor . fetchall ( )
self . close ( conn , cursor )
return result , None
except Exception as error :
self . close ( conn , cursor )
return None , error
2025-12-02 16:25:42 +08:00
def query_cross_tps_by_crossid ( self , citycode , crossid ) :
conn , cursor = self . connect ( )
try :
sql = f ''' select * from { self . DB_Name } .cross_tps where citycode = %s and crossid = %s order by weekday , tp_start asc '''
cursor . execute ( sql , ( citycode , crossid ) )
# 获取所有查询结果
result = cursor . fetchall ( )
return result , None
except Exception as e :
return None , e
finally :
self . close ( conn , cursor )
def day_schedule_by_xlcrossid ( self , citycode , xlcrossid , scheduleid ) :
conn , cursor = self . connect ( )
try :
sql_query = f ''' select ds.scheduleid,
ds . crossid ,
ds . control_mode ,
ds . tp_start ,
ds . planid ,
ps . name ,
ds . nodeid
from cross_mapping as cm
join day_schedule as ds on ds . crossid = cm . jj_crossid and ds . nodeid = cm . nodeid
left join plan_stage as ps on ps . crossid = cm . jj_crossid and ps . nodeid = cm . nodeid and ps . planid = ds . planid
where cm . xl_crossid = % s
and cm . nodeid = % s
and ds . scheduleid = % s
group by ds . scheduleid , ds . crossid , ds . nodeid , ds . control_mode , ds . tp_start , ds . planid , ps . name
order by ds . tp_start asc '''
print ( cursor . mogrify ( sql_query , ( xlcrossid , citycode , scheduleid ) ) )
cursor . execute ( sql_query , ( xlcrossid , citycode , scheduleid ) )
# 获取所有查询结果
result = cursor . fetchall ( )
return result , None
except Exception as e :
return None , e
finally :
self . close ( conn , cursor )
def query_cross_runing_phasetable ( self , citycode , crossid ) :
conn , cursor = self . connect ( )
try :
sql = f ''' select t1.*,
group_concat ( t2 . name ) as phase_name , - - 相位名称
IFNULL ( max ( t2 . min_green ) , 0 ) min_green , - - 最小绿
IFNULL ( max ( t2 . max_green ) , 0 ) max_green - - 最大绿
from (
select cm . nodeid , - - 城市ID
cm . xl_crossid , - - 路口ID
cm . jj_crossid ,
ct . scheduleid , - - 日计划号
ct . weekday ,
ds . tp_start , - - 开始时段
ds . control_mode , - - 控制模式
ds . planid , - - 方案ID
ps . name , - - 方案名称
ps . cycle , - - 周期
ps . coord_phaseid , - - 协调阶段号
ps . offset , - - 绝对相位差
ps . stageid , - - 阶段号
ps . stage_duration , - - 阶段时长
ps . stage_seq , - - 阶段排序
s . name stage_name , - - 阶段名称
s . green , - - 绿灯时长
s . redyellow , - - 红黄时长
s . yellow , - - 黄灯时长
s . allred , - - 全红时长
s . phases - - 相位ID
from { self . DB_Name } . cross_mapping as cm
join { self . DB_Name } . cross_tps as ct on ct . crossid = cm . xl_crossid and ct . citycode = cm . nodeid
join { self . DB_Name } . day_schedule as ds on ds . crossid = cm . jj_crossid and cm . nodeid = ds . nodeid and ds . scheduleid = ct . scheduleid
left join { self . DB_Name } . plan_stage as ps on ds . crossid = ps . crossid and ps . nodeid = ds . nodeid and ps . planid = ds . planid
left join { self . DB_Name } . stage as s on s . crossid = ps . crossid and s . nodeid = ps . nodeid and s . stageid = ps . stageid
where cm . xl_crossid = % s and cm . nodeid = % s
group by cm . nodeid , cm . xl_crossid , ct . scheduleid , ct . weekday , ds . tp_start , ds . control_mode , ds . planid ,
ps . name , ps . cycle , ps . coord_phaseid , ps . offset , ps . stageid , ps . stage_duration , ps . stage_seq ,
s . name , s . green , s . redyellow , s . yellow , s . allred , s . phases
) as t1 left join { self . DB_Name } . phase as t2 ON FIND_IN_SET ( t2 . phaseid , t1 . phases ) > 0 and t2 . crossid = t1 . jj_crossid and t2 . nodeid = t1 . nodeid
group by t1 . nodeid , t1 . xl_crossid , t1 . scheduleid , t1 . weekday , t1 . tp_start , t1 . control_mode , t1 . planid ,
t1 . name , t1 . cycle , t1 . coord_phaseid , t1 . offset , t1 . stageid , t1 . stage_duration , t1 . stage_seq ,
t1 . stage_name , t1 . green , t1 . redyellow , t1 . yellow , t1 . allred , t1 . phases
order by t1 . weekday , t1 . tp_start , t1 . stage_seq asc '''
print ( cursor . mogrify ( sql , ( crossid , citycode ) ) )
cursor . execute ( sql , ( crossid , citycode ) )
# 获取所有查询结果
result = cursor . fetchall ( )
return result , None
except Exception as e :
return None , e
finally :
self . close ( conn , cursor )
2025-10-10 14:38:22 +08:00
# if __name__ == '__main__':
# tt_5min = get_latest_5min_timestamp()
# print(tt_5min)
# print(get_today_str())