742 lines
35 KiB
Python
742 lines
35 KiB
Python
# -*- coding:utf-8 -*-
|
|
|
|
import pymysql
|
|
import pymysql.cursors
|
|
|
|
from flask import g
|
|
from app.db_func_base import *
|
|
|
|
|
|
class PhaseTableDbHelper(TableDbHelperBase):
|
|
|
|
def __init__(self, pool):
|
|
self.db_pool = pool
|
|
self.DB_Name = 'phasetable'
|
|
|
|
def query_all_wave_cross(self, orgid_list):
|
|
conn, cursor = self.connect()
|
|
try:
|
|
sql = f'''select max(wc.waveid) waveid,
|
|
wc.crossid,
|
|
wc.orgid,
|
|
max(wc.location) location,
|
|
max(wc.cross_name) cross_name
|
|
from greenwave.wave_cross as wc
|
|
join greenwave.greenwave as g on wc.orgid = g.orgid and g.status < 2
|
|
where wc.orgid in %s
|
|
group by wc.crossid, wc.orgid
|
|
order by wc.orgid asc'''
|
|
#print(cursor.mogrify(sql, (tuple(orgid_list),)))
|
|
cursor.execute(sql,(tuple(orgid_list),))
|
|
result = cursor.fetchall()
|
|
self.close(conn, cursor)
|
|
return result, None
|
|
except Exception as error:
|
|
self.close(conn, cursor)
|
|
return None, error
|
|
|
|
def query_line_wave_cross(self, crossid):
|
|
conn, cursor = self.connect()
|
|
try:
|
|
sql = f'''select max(wc.waveid) waveid,
|
|
wc.crossid,
|
|
wc.orgid,
|
|
max(wc.location) location,
|
|
max(wc.cross_name) cross_name
|
|
from greenwave.wave_cross as wc
|
|
join greenwave.greenwave as g on wc.orgid = g.orgid
|
|
where wc.crossid = %s
|
|
group by wc.crossid, wc.orgid
|
|
order by wc.orgid asc'''
|
|
cursor.execute(sql, (crossid))
|
|
result = cursor.fetchone()
|
|
self.close(conn, cursor)
|
|
return result, None
|
|
except Exception as error:
|
|
self.close(conn, cursor)
|
|
return None, error
|
|
|
|
def query_cross_mapping(self, nodeid, xl_crossid):
|
|
conn, cursor = self.connect()
|
|
try:
|
|
sql = f'''select *
|
|
from {self.DB_Name}.cross_mapping
|
|
where nodeid = %s and xl_crossid = %s'''
|
|
print(cursor.mogrify(sql, (nodeid, xl_crossid)))
|
|
cursor.execute(sql, (nodeid, xl_crossid))
|
|
result = cursor.fetchone()
|
|
self.close(conn, cursor)
|
|
return result, None
|
|
except Exception as error:
|
|
self.close(conn, cursor)
|
|
return None, error
|
|
|
|
def query_jjcrossid_all(self, nodeid):
|
|
conn, cursor = self.connect()
|
|
try:
|
|
sql = f'''select
|
|
jj_crossid
|
|
from
|
|
{self.DB_Name}.cross_mapping
|
|
where nodeid = %s group by jj_crossid'''
|
|
print(cursor.mogrify(sql, (nodeid)))
|
|
cursor.execute(sql, (nodeid))
|
|
result = cursor.fetchall()
|
|
self.close(conn, cursor)
|
|
return result, None
|
|
except Exception as error:
|
|
self.close(conn, cursor)
|
|
return None, error
|
|
|
|
def insert_onkey_update(self, citycode, xl_crossid, jj_crossid):
|
|
conn, cursor = self.connect()
|
|
try:
|
|
sql = f'''INSERT INTO {self.DB_Name}.cross_mapping
|
|
(`xl_crossid`,`jj_crossid`,`nodeid`) VALUES
|
|
(%s,%s,%s) ON DUPLICATE KEY UPDATE
|
|
`nodeid`= %s'''
|
|
print(cursor.mogrify(sql, (xl_crossid, jj_crossid, citycode, citycode)))
|
|
cursor.execute(sql, (xl_crossid, jj_crossid, citycode, citycode))
|
|
self.close(conn, cursor)
|
|
return None
|
|
except Exception as error:
|
|
self.close(conn, cursor)
|
|
return error
|
|
|
|
def query_db_phase(self, citycode, crossid):
|
|
conn, cursor = self.connect()
|
|
try:
|
|
sql = f'''SELECT * FROM {self.DB_Name}.phase WHERE nodeid = %s and crossid = %s'''
|
|
print(cursor.mogrify(sql, (citycode, crossid)))
|
|
cursor.execute(sql, (citycode, crossid))
|
|
row_list = cursor.fetchall()
|
|
res = []
|
|
for row in row_list:
|
|
res.append({
|
|
'phaseid': row['phaseid'],
|
|
'name': row['name']
|
|
})
|
|
self.close(conn, cursor)
|
|
return res, None
|
|
except Exception as error:
|
|
self.close(conn, cursor)
|
|
return None, error
|
|
|
|
def query_db_arrangement_all(self, citycode, crossid):
|
|
|
|
conn, cursor = self.connect()
|
|
try:
|
|
sql = f'''select
|
|
a.crossid,
|
|
IFNULL(a.`day`, 0) `day`,
|
|
IFNULL(a.`month`, 0) `month`,
|
|
IFNULL(a.`weekday`, 0) `weekday`,
|
|
a.type,
|
|
a.scheduleid,
|
|
a.priority,
|
|
IFNULL(ps.planid, 0) planid,
|
|
ds.control_mode,
|
|
IFNULL(ds.tp_start,'') tp_start,
|
|
ps.name as plan_name,
|
|
ps.cycle,
|
|
ps.coord_phaseid,
|
|
ps.offset plan_stage,
|
|
IFNULL(s.stageid, 0) stageid,
|
|
ps.stage_duration,
|
|
ps.stage_seq,
|
|
IFNULL(s.name, '') stage_name,
|
|
IFNULL(s.green, 0) green,
|
|
IFNULL(s.redyellow, 0) redyellow,
|
|
IFNULL(s.yellow, 0) yellow,
|
|
IFNULL(s.allred, 0) allred,
|
|
IFNULL(s.phases, '') phases,
|
|
IFNULL(max(p.min_green),0) min_green,
|
|
IFNULL(max(p.max_green),0) max_green
|
|
from
|
|
{self.DB_Name}.arrangement as a
|
|
left join {self.DB_Name}.day_schedule as ds on ds.scheduleid = a.scheduleid and ds.crossid = a.crossid and ds.nodeid = a.nodeid
|
|
left join {self.DB_Name}.plan_stage as ps on ps.planid = ds.planid and ds.crossid = ps.crossid and ps.nodeid = a.nodeid
|
|
left join {self.DB_Name}.stage as s on s.stageid = ps.stageid and s.crossid = ps.crossid and s.nodeid = a.nodeid
|
|
left join {self.DB_Name}.phase as p ON FIND_IN_SET(p.phaseid, s.phases) > 0 and p.crossid = s.crossid and p.nodeid = a.nodeid
|
|
WHERE
|
|
a.nodeid = %s and a.crossid = %s
|
|
GROUP BY
|
|
a.crossid, a.`day`, a.`month`, a.weekday, a.type, a.scheduleid, a.priority,
|
|
ds.planid, ds.control_mode, ds.tp_start, ps.name, ps.cycle, ps.coord_phaseid,
|
|
ps.offset, ps.stageid, ps.stage_duration, ps.stage_seq, s.name, s.green, s.redyellow,
|
|
s.yellow, s.allred, s.phases
|
|
ORDER BY a.day asc,a.month asc,a.weekday asc,a.priority asc,ds.tp_start asc,ps.stage_seq asc'''
|
|
print(cursor.mogrify(sql, (citycode, crossid)))
|
|
cursor.execute(sql, (citycode, crossid))
|
|
result = cursor.fetchall()
|
|
self.close(conn, cursor)
|
|
return result, None
|
|
except Exception as error:
|
|
self.close(conn, cursor)
|
|
return None, error
|
|
|
|
def query_phase_record_db(self, citycode, crossid):
|
|
conn, cursor = self.connect()
|
|
try:
|
|
sql = f'''SELECT type,max(`update_time`) update_time FROM {self.DB_Name}.phase_record WHERE nodeid = %s and crossid = %s group by type'''
|
|
print(cursor.mogrify(sql, (citycode, crossid)))
|
|
cursor.execute(sql, (citycode, crossid))
|
|
row_list = cursor.fetchall()
|
|
res = {}
|
|
for row in row_list:
|
|
res[row['type']] = str(row['update_time'])
|
|
self.close(conn, cursor)
|
|
return res, None
|
|
except Exception as error:
|
|
self.close(conn, cursor)
|
|
return None, error
|
|
|
|
def query_exist_plan_id(self, citycode, crossid):
|
|
conn, cursor = self.connect()
|
|
try:
|
|
sql_query = f'''select planid from {self.DB_Name}.plan_stage where nodeid = %s and crossid = %s group by planid'''
|
|
print(cursor.mogrify(sql_query, (citycode, crossid)))
|
|
cursor.execute(sql_query, (citycode, crossid))
|
|
# 获取所有查询结果
|
|
result = cursor.fetchall()
|
|
self.close(conn, cursor)
|
|
return result, None
|
|
except Exception as e:
|
|
self.close(conn, cursor)
|
|
return None, e
|
|
|
|
def query_day_schedule_by_jjcrossid(self, citycode, jjcrossid):
|
|
|
|
conn, cursor = self.connect()
|
|
try:
|
|
sql_query = f'''select * from {self.DB_Name}.day_schedule where nodeid = %s and crossid = %s order by tp_start asc'''
|
|
cursor.execute(sql_query, (citycode, jjcrossid))
|
|
# 获取所有查询结果
|
|
result = cursor.fetchall()
|
|
self.close(conn, cursor)
|
|
return result, None
|
|
except Exception as e:
|
|
self.close(conn, cursor)
|
|
return None, e
|
|
|
|
def query_arrangement_by_jjcrossid(self, citycode, jjcrossid):
|
|
conn, cursor = self.connect()
|
|
try:
|
|
sql = f'''select * from {self.DB_Name}.arrangement where nodeid= %s and crossid = %s order by month,day,weekday asc'''
|
|
cursor.execute(sql, (citycode, jjcrossid))
|
|
# 获取所有查询结果
|
|
result = cursor.fetchall()
|
|
self.close(conn, cursor)
|
|
return result, None
|
|
except Exception as e:
|
|
self.close(conn, cursor)
|
|
return None, e
|
|
|
|
def query_phase_by_jjcrossid(self, citycode, jjcrossid):
|
|
conn, cursor = self.connect()
|
|
try:
|
|
sql_query = f'''select * from {self.DB_Name}.phase where nodeid = %s and crossid = %s order by phaseid asc'''
|
|
cursor.execute(sql_query, (citycode, jjcrossid))
|
|
# 获取所有查询结果
|
|
result = cursor.fetchall()
|
|
self.close(conn, cursor)
|
|
return result, None
|
|
except Exception as e:
|
|
self.close(conn, cursor)
|
|
return None, e
|
|
|
|
def query_plan_stage_by_jjcrossid(self, citycode, jjcrossid):
|
|
conn, cursor = self.connect()
|
|
try:
|
|
sql_query = f'''select ps.planid,
|
|
ps.name,
|
|
ps.crossid,
|
|
ps.cycle,
|
|
ps.coord_phaseid,
|
|
ps.offset,
|
|
ps.stageid,
|
|
ps.stage_duration,
|
|
ps.stage_seq,
|
|
s.name stage_name,
|
|
s.green,
|
|
s.redyellow,
|
|
s.yellow,
|
|
s.allred,
|
|
s.phases,
|
|
s.min_green,
|
|
s.max_green
|
|
from {self.DB_Name}.plan_stage as ps
|
|
join {self.DB_Name}.stage as s on s.crossid = ps.crossid and s.stageid = ps.stageid and ps.nodeid = s.nodeid
|
|
where ps.crossid = %s and ps.nodeid = %s
|
|
order by ps.planid asc, ps.stage_seq asc'''
|
|
cursor.execute(sql_query, (jjcrossid, citycode))
|
|
# 获取所有查询结果
|
|
result = cursor.fetchall()
|
|
self.close(conn, cursor)
|
|
return result, None
|
|
except Exception as e:
|
|
self.close(conn, cursor)
|
|
return None, e
|
|
|
|
def query_schedule_line(self, pid):
|
|
conn, cursor = self.connect()
|
|
try:
|
|
sql = f'''select * from {self.DB_Name}.day_schedule where id = %s'''
|
|
print(cursor.mogrify(sql, pid))
|
|
cursor.execute(sql, pid)
|
|
# 获取所有查询结果
|
|
result = cursor.fetchone()
|
|
self.close(conn, cursor)
|
|
return result, None
|
|
except Exception as e:
|
|
self.close(conn, cursor)
|
|
return None, e
|
|
|
|
def query_schedule_tps(self, citycode, cross_no, scheduleid, tp_start):
|
|
conn, cursor = self.connect()
|
|
try:
|
|
sql_query = f'''select * from {self.DB_Name}.day_schedule where nodeid = %s and crossid = %s and scheduleid = %s and tp_start > %s limit 1'''
|
|
cursor.execute(sql_query, (citycode, cross_no, scheduleid, tp_start))
|
|
# 获取所有查询结果
|
|
result = cursor.fetchone()
|
|
self.close(conn, cursor)
|
|
return result, None
|
|
except Exception as e:
|
|
self.close(conn, cursor)
|
|
return None, e
|
|
|
|
def query_schedule_arrangement(self, citycode, scheduleid, crossid):
|
|
conn, cursor = self.connect()
|
|
try:
|
|
sql_query = f'''select * from {self.DB_Name}.arrangement where nodeid = %s and crossid = %s and scheduleid = %s'''
|
|
cursor.execute(sql_query, (citycode, crossid, scheduleid))
|
|
# 获取所有查询结果
|
|
result = cursor.fetchall()
|
|
self.close(conn, cursor)
|
|
return result, None
|
|
except Exception as e:
|
|
self.close(conn, cursor)
|
|
return None, e
|
|
|
|
# 根据路口获取干线配时方案
|
|
def query_green_waves_by_crossid(self, crossid, nodeid):
|
|
conn, cursor = self.connect()
|
|
try:
|
|
sql_query = f'''select ac.arteryid,
|
|
a.name artery_name,
|
|
gw.waveid,
|
|
gw.name,
|
|
gw.type,
|
|
gw.start_cross_seq,
|
|
gw.cross_num,
|
|
gw.launch_mode,
|
|
gw.week_days,
|
|
gw.tp,
|
|
gw.tp_start,
|
|
gw.tp_end,
|
|
gw.cycle,
|
|
gw.planid_list,
|
|
group_concat(t1.crossid) crossids
|
|
from tmnet.artery_cross as ac
|
|
join tmnet.artery as a on ac.arteryid = a.arteryid
|
|
join tmnet.green_waves as gw on ac.arteryid = gw.arteryid
|
|
left join (select arteryid, crossid
|
|
from tmnet.artery_cross
|
|
where arteryid in (select arteryid
|
|
from tmnet.artery_cross
|
|
where crossid = %s
|
|
and nodeid = %s)
|
|
order by cross_seq asc
|
|
) as t1 on t1.arteryid = ac.arteryid
|
|
where ac.crossid = %s
|
|
and ac.nodeid = %s
|
|
group by ac.arteryid,a.name, gw.waveid, gw.name, gw.type,
|
|
gw.start_cross_seq, gw.cross_num, gw.launch_mode,
|
|
gw.week_days, gw.tp, gw.tp_start, gw.tp_end, gw.cycle, gw.planid_list'''
|
|
cursor.execute(sql_query, (crossid, nodeid, crossid, nodeid))
|
|
# 获取所有查询结果
|
|
result = cursor.fetchall()
|
|
self.close(conn, cursor)
|
|
return result, None
|
|
except Exception as e:
|
|
self.close(conn, cursor)
|
|
return None, e
|
|
|
|
def update_phase(self, citycode, crossid, phaseid, min_green, max_green):
|
|
conn, cursor = self.connect()
|
|
try:
|
|
sql = f"UPDATE {self.DB_Name}.`phase` SET `min_green` = %s,`max_green` = %s where phaseid = %s and crossid = %s and nodeid = %s"
|
|
cursor.execute(sql, (min_green, max_green, phaseid, crossid, citycode))
|
|
self.close(conn, cursor)
|
|
except Exception as error:
|
|
self.close(conn, cursor)
|
|
return error
|
|
return None
|
|
|
|
def insert_phase_db(self, citycode, crossid, name, lanes, min_green, max_green):
|
|
conn, cursor = self.connect()
|
|
try:
|
|
sql = f"delete from {self.DB_Name}.phase where crossid = %s and name = %s and nodeid = %s"
|
|
cursor.execute(sql, (crossid, name, citycode))
|
|
sql = f"select ifnull(max(phaseid),0) max_phaseid from {self.DB_Name}.phase where crossid = %s and nodeid = %s"
|
|
cursor.execute(sql, (crossid, citycode))
|
|
line = cursor.fetchone()
|
|
maxid = line['max_phaseid'] + 1
|
|
# 准备插入数据的 SQL 语句
|
|
sql = f"INSERT INTO {self.DB_Name}.phase (`phaseid`, `name`,`crossid`,`lanes`,`min_green`,`max_green`,`nodeid`) VALUES (%s,%s,%s,%s,%s,%s,%s)"
|
|
cursor.execute(sql, (maxid, name, crossid, lanes, min_green, max_green, citycode))
|
|
self.close(conn, cursor)
|
|
except Exception as error:
|
|
self.close(conn, cursor)
|
|
return error
|
|
return None
|
|
|
|
def update_stage_line(self, citycode, max_stageid, stage_name, crossid, green, redyellow, yellow, allred, phase,
|
|
min_green,
|
|
max_green):
|
|
conn, cursor = self.connect()
|
|
try:
|
|
# 如果数据不存在,则执行插入操作
|
|
insert_sql = f"update {self.DB_Name}.`stage` set `name`=%s, `green`=%s, `redyellow`=%s, `yellow`=%s,`allred`=%s,`phases`=%s,`min_green`=%s,`max_green`=%s where stageid = %s and crossid=%s and nodeid = %s"
|
|
cursor.execute(insert_sql, (
|
|
stage_name, green, redyellow, yellow, allred, phase, min_green, max_green, max_stageid, crossid,
|
|
citycode))
|
|
self.close(conn, cursor)
|
|
return None
|
|
except Exception as error:
|
|
self.close(conn, cursor)
|
|
return error
|
|
|
|
def insert_stage_line(self, citycode, max_stageid, stage_name, crossid, green, redyellow, yellow, allred, phase,
|
|
min_green,
|
|
max_green):
|
|
conn, cursor = self.connect()
|
|
try:
|
|
# 如果数据不存在,则执行插入操作
|
|
insert_sql = f"INSERT INTO {self.DB_Name}.`stage` (`stageid`,`name`, `crossid`, `green`, `redyellow`, `yellow`,`allred`,`phases`,`min_green`,`max_green`,`nodeid`) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)"
|
|
cursor.execute(insert_sql, (
|
|
max_stageid, stage_name, crossid, green, redyellow, yellow, allred, phase, min_green, max_green,
|
|
citycode))
|
|
self.close(conn, cursor)
|
|
return None
|
|
except Exception as error:
|
|
self.close(conn, cursor)
|
|
return error
|
|
|
|
def insert_plan_stage_db(self, citycode, planid, name, crossid, cycle, coord_phaseid, offset, stageid,
|
|
stage_duration,
|
|
stage_seq):
|
|
conn, cursor = self.connect()
|
|
try:
|
|
conn, cursor = self.connect()
|
|
insert_sql = f"INSERT INTO {self.DB_Name}.`plan_stage` (`planid`, `name`, `crossid`, `cycle`, `coord_phaseid`,`offset`,`stageid`,`stage_duration`,`stage_seq`,`nodeid`) VALUES (%s, %s, %s, %s, %s,%s,%s,%s,%s,%s)"
|
|
cursor.execute(insert_sql,
|
|
(planid, name, crossid, cycle, coord_phaseid, offset, stageid, stage_duration, stage_seq,
|
|
citycode))
|
|
self.close(conn, cursor)
|
|
return None
|
|
except Exception as error:
|
|
self.close(conn, cursor)
|
|
return error
|
|
|
|
def delete_stage_line(self, citycode, stageid, jjcrossid):
|
|
conn, cursor = self.connect()
|
|
try:
|
|
# 如果数据不存在,则执行插入操作
|
|
sql = f"delete from {self.DB_Name}.`stage` where stageid = %s and crossid = %s and nodeid = %s"
|
|
cursor.execute(sql, (stageid, jjcrossid, citycode))
|
|
self.close(conn, cursor)
|
|
return None
|
|
except Exception as error:
|
|
self.close(conn, cursor)
|
|
return error
|
|
|
|
def delete_plan_stage_db(self, citycode, crossid, planid):
|
|
conn, cursor = self.connect()
|
|
try:
|
|
sql = f"delete FROM {self.DB_Name}.`plan_stage` WHERE `planid` = %s and `crossid` = %s and nodeid = %s"
|
|
cursor.execute(sql, (planid, crossid, citycode))
|
|
self.close(conn, cursor)
|
|
return None
|
|
except Exception as error:
|
|
self.close(conn, cursor)
|
|
return error
|
|
|
|
def update_schedule(self, id, control_mode, tp_start, planid):
|
|
conn, cursor = self.connect()
|
|
try:
|
|
sql = f"UPDATE {self.DB_Name}.`day_schedule` SET `control_mode` = %s,`tp_start` = %s,`planid` = %s where id = %s"
|
|
cursor.execute(sql, (control_mode, tp_start, planid, id))
|
|
self.close(conn, cursor)
|
|
return None
|
|
except Exception as error:
|
|
self.close(conn, cursor)
|
|
return error
|
|
|
|
def insert_schedule(self, citycode, crossid, control_mode, tp_start, planid, scheduleid):
|
|
conn, cursor = self.connect()
|
|
try:
|
|
sql = f"insert into {self.DB_Name}.`day_schedule`(`crossid`,`control_mode`,`tp_start`,`planid`,`scheduleid`,`nodeid`) values(%s,%s,%s,%s,%s,%s)"
|
|
cursor.execute(sql, (crossid, control_mode, tp_start, planid, scheduleid, citycode))
|
|
self.close(conn, cursor)
|
|
return None
|
|
except Exception as error:
|
|
self.close(conn, cursor)
|
|
return error
|
|
|
|
def query_plan_stage_by_planid(self, citycode, jjcrossid, planid):
|
|
conn, cursor = self.connect()
|
|
try:
|
|
sql_query = f'''select * from {self.DB_Name}.plan_stage where crossid = %s and planid = %s and nodeid = %s'''
|
|
cursor.execute(sql_query, (jjcrossid, planid, citycode))
|
|
# 获取所有查询结果
|
|
result = cursor.fetchall()
|
|
self.close(conn, cursor)
|
|
return result, None
|
|
except Exception as e:
|
|
self.close(conn, cursor)
|
|
return None, e
|
|
|
|
def query_day_schedule_by_planid(self, citycode, jjcrossid, planid):
|
|
conn, cursor = self.connect()
|
|
try:
|
|
sql_query = f'''select * from {self.DB_Name}.day_schedule where crossid = %s and planid = %s and nodeid = %s'''
|
|
cursor.execute(sql_query, (jjcrossid, planid, citycode))
|
|
# 获取所有查询结果
|
|
result = cursor.fetchall()
|
|
self.close(conn, cursor)
|
|
return result, None
|
|
except Exception as e:
|
|
self.close(conn, cursor)
|
|
return None, e
|
|
|
|
def delete_day_schedule(self, id):
|
|
conn, cursor = self.connect()
|
|
try:
|
|
sql_query = f'''delete from {self.DB_Name}.day_schedule where id = %s'''
|
|
cursor.execute(sql_query, id)
|
|
# 获取所有查询结果
|
|
self.close(conn, cursor)
|
|
return None
|
|
except Exception as e:
|
|
self.close(conn, cursor)
|
|
return e
|
|
|
|
def inset_onkey_update_info(self, param):
|
|
conn, cursor = self.connect()
|
|
try:
|
|
sql_query = f'''INSERT INTO {self.DB_Name}.update_info
|
|
(`value`,`param`) VALUES
|
|
(UNIX_TIMESTAMP(),%s) ON DUPLICATE KEY UPDATE
|
|
`value`= UNIX_TIMESTAMP()'''
|
|
cursor.execute(sql_query, param)
|
|
# 获取所有查询结果
|
|
self.close(conn, cursor)
|
|
return None
|
|
except Exception as e:
|
|
self.close(conn, cursor)
|
|
return e
|
|
|
|
def multi_insert_arrangement_db(self, citycode, data, jj_crossid):
|
|
conn, cursor = self.connect()
|
|
try:
|
|
conn.begin()
|
|
sql = f"DELETE FROM {self.DB_Name}.arrangement WHERE crossid = %s and nodeid = %s"
|
|
cursor.execute(sql, (jj_crossid, citycode))
|
|
# 准备插入数据的 SQL 语句
|
|
sql = f"INSERT INTO {self.DB_Name}.arrangement (`crossid`, `type`,`month`,`day`,`weekday`,`scheduleid`,`priority`,`nodeid`) VALUES (%s,%s,%s,%s,%s,%s,%s,%s)"
|
|
inserts = []
|
|
for value in data:
|
|
inserts.append((
|
|
jj_crossid,
|
|
value['type'] if value['type'] != '' else 0,
|
|
value['month'] if value['month'] != '' else None,
|
|
value['day'] if value['day'] != '' else None,
|
|
value['weekday'] if value['weekday'] != '' else None,
|
|
value['scheduleid'] if value['scheduleid'] != '' else 0,
|
|
value['priority'] if value['priority'] != '' else 0,
|
|
citycode,
|
|
))
|
|
# 执行批量插入操作
|
|
cursor.executemany(sql, inserts)
|
|
conn.commit()
|
|
self.close(conn, cursor)
|
|
return None
|
|
except Exception as error:
|
|
conn.rollback()
|
|
self.close(conn, cursor)
|
|
return error
|
|
|
|
def insert_phase_record_db(self, citycode, jj_crossid, type, userid=None):
|
|
conn, cursor = self.connect()
|
|
try:
|
|
sql = f"INSERT INTO {self.DB_Name}.`phase_record` (`crossid`,`type`,`nodeid`,`user`) VALUES (%s, %s, %s,%s)"
|
|
cursor.execute(sql, (jj_crossid, type, citycode, userid))
|
|
self.close(conn, cursor)
|
|
return None
|
|
except Exception as error:
|
|
self.close(conn, cursor)
|
|
return error
|
|
|
|
def multi_insert_plan_stage_db(self, citycode, data, jj_crossid, all = False):
|
|
conn, cursor = self.connect()
|
|
try:
|
|
if all:
|
|
select_sql = f"delete FROM {self.DB_Name}.`plan_stage` WHERE `crossid` = %s and `nodeid` = %s"
|
|
cursor.execute(select_sql, (jj_crossid, citycode))
|
|
else:
|
|
for value in data:
|
|
select_sql = f"delete FROM {self.DB_Name}.`plan_stage` WHERE `planid` = %s and `crossid` = %s and `nodeid` = %s"
|
|
cursor.execute(select_sql, (value['planid'], jj_crossid, citycode))
|
|
for value in data:
|
|
# select_sql = "SELECT * FROM `plan_stage` WHERE `planid` = %s and `crossid` = '%s'" % (
|
|
# value['planid'], value['stageid'], crossid)
|
|
# cursor.execute(select_sql)
|
|
# result = cursor.fetchone()
|
|
# if result:
|
|
# update_sql = "UPDATE `plan_stage` SET `name` = '%s', `cycle` = %s, `stageid` = %s, `stage_duration` = %s,`stage_seq` = %s,`coord_phaseid`=%s,`offset`=%s WHERE `planid` = %s AND `stageid` = '%s' and `crossid` = '%s'" % (
|
|
# value['name'], value['cycle'], value['stageid'], value['stage_duration'], value['stage_seq'], 0,
|
|
# 0, value['planid'], value['stageid'], crossid)
|
|
# cursor.execute(update_sql)
|
|
# else:
|
|
# 如果数据不存在,则执行插入操作
|
|
coord_phaseid = value['coord_phaseid'] if value.get('coord_phaseid') else 0
|
|
offset = value['offset'] if value.get('offset') else 0
|
|
insert_sql = f"INSERT INTO {self.DB_Name}.`plan_stage` (`planid`, `name`, `crossid`, `cycle`, `coord_phaseid`,`offset`,`stageid`,`stage_duration`,`stage_seq`,`nodeid`) VALUES (%s, %s, %s, %s, %s,%s,%s,%s,%s,%s)"
|
|
cursor.execute(insert_sql, (
|
|
value['planid'], value['name'], jj_crossid, value['cycle'],coord_phaseid,offset, value['stageid'],
|
|
value['stage_duration'], value['stage_seq'], citycode))
|
|
self.close(conn, cursor)
|
|
return None
|
|
except Exception as error:
|
|
self.close(conn, cursor)
|
|
return error
|
|
|
|
def multi_insert_stage_db(self, citycode, data, jj_crossid, all=False):
|
|
conn, cursor = self.connect()
|
|
try:
|
|
if all:
|
|
sql = f"DELETE FROM {self.DB_Name}.stage WHERE crossid = %s and nodeid = %s"
|
|
cursor.execute(sql, (jj_crossid, citycode))
|
|
for value in data:
|
|
if all:
|
|
# 如果数据不存在,则执行插入操作
|
|
insert_sql = f"INSERT INTO {self.DB_Name}.`stage` (`stageid`,`name`, `crossid`, `green`, `redyellow`, `yellow`,`allred`,`phases`,`nodeid`) VALUES (%s, %s, %s, %s, 0,%s,%s,%s,%s)"
|
|
cursor.execute(insert_sql, (
|
|
value['stageid'], value['name'], jj_crossid, value['green'], value['yellow'], value['allred'],
|
|
value['phases'], citycode))
|
|
continue
|
|
select_sql = f"SELECT * FROM {self.DB_Name}.`stage` WHERE `stageid` = %s AND `crossid` = %s and `nodeid` = %s"
|
|
cursor.execute(select_sql, (value['stageid'], jj_crossid, citycode))
|
|
result = cursor.fetchone()
|
|
|
|
if result:
|
|
update_sql = f"UPDATE {self.DB_Name}.`stage` SET `name` = %s, `green` = %s, `redyellow` = %s, `allred` = %s,`phases` = %s,`yellow`= %s WHERE `stageid` = %s and `crossid` = %s and `nodeid` = %s"
|
|
cursor.execute(update_sql, (
|
|
value['name'], value['green'], 0, value['allred'], value['phases'], value['yellow'],
|
|
value['stageid'], jj_crossid, citycode))
|
|
else:
|
|
# 如果数据不存在,则执行插入操作
|
|
insert_sql = f"INSERT INTO {self.DB_Name}.`stage` (`stageid`,`name`, `crossid`, `green`, `redyellow`, `yellow`,`allred`,`phases`,`nodeid`) VALUES (%s, %s, %s, %s, 0,%s,%s,%s,%s)"
|
|
cursor.execute(insert_sql, (
|
|
value['stageid'], value['name'], jj_crossid, value['green'], value['yellow'], value['allred'],
|
|
value['phases'], citycode))
|
|
self.close(conn, cursor)
|
|
return None
|
|
except Exception as error:
|
|
self.close(conn, cursor)
|
|
return error
|
|
|
|
def multi_insert_phase_db(self, citycode, data, jj_crossid):
|
|
conn, cursor = self.connect()
|
|
try:
|
|
sql = f"DELETE FROM phasetable.phase WHERE crossid = %s and nodeid = %s"
|
|
cursor.execute(sql, (jj_crossid, citycode))
|
|
for value in data:
|
|
# 准备插入数据的 SQL 语句
|
|
sql = f"INSERT INTO phasetable.phase (`phaseid`, `name`,`crossid`,`lanes`,`min_green`,`max_green`,`nodeid`) VALUES (%s,%s,%s,%s,%s,%s,%s)"
|
|
cursor.execute(sql, (
|
|
value['phaseid'], value['name'], jj_crossid, None, value['min_green'], value['max_green'],
|
|
citycode))
|
|
self.close(conn, cursor)
|
|
return None
|
|
except Exception as error:
|
|
self.close(conn, cursor)
|
|
return error
|
|
|
|
def multi_insert_day_schedule_db(self, citycode, data, jj_crossid, all = False):
|
|
conn, cursor = self.connect()
|
|
try:
|
|
if all:
|
|
sql = f"DELETE FROM {self.DB_Name}.day_schedule WHERE crossid = %s and nodeid = %s"
|
|
cursor.execute(sql, (jj_crossid, citycode))
|
|
for value in data:
|
|
if all:
|
|
insert_sql = f"INSERT INTO {self.DB_Name}.`day_schedule` (`scheduleid`, `crossid`, `control_mode`, `tp_start`, `planid`, `nodeid`) VALUES (%s, %s, %s, %s, %s, %s)"
|
|
cursor.execute(insert_sql, (
|
|
value['scheduleid'], jj_crossid, value['control_mode'], value['tp_start'], value['planid'],
|
|
citycode))
|
|
continue
|
|
select_sql = f"SELECT * FROM {self.DB_Name}.`day_schedule` WHERE `scheduleid` = %s AND `tp_start` = %s and `crossid` = %s and `nodeid` = %s"
|
|
cursor.execute(select_sql, (value['scheduleid'], value['tp_start'], jj_crossid, citycode))
|
|
result = cursor.fetchone()
|
|
|
|
if result:
|
|
update_sql = f"UPDATE {self.DB_Name}.`day_schedule` SET `control_mode` = %s, `planid` = %s WHERE `scheduleid` = %s AND `tp_start` = %s and `crossid` = %s and `nodeid` = %s"
|
|
cursor.execute(update_sql, (
|
|
value['control_mode'], value['planid'], value['scheduleid'], value['tp_start'], jj_crossid,
|
|
citycode))
|
|
else:
|
|
# 如果数据不存在,则执行插入操作
|
|
insert_sql = f"INSERT INTO {self.DB_Name}.`day_schedule` (`scheduleid`, `crossid`, `control_mode`, `tp_start`, `planid`, `nodeid`) VALUES (%s, %s, %s, %s, %s, %s)"
|
|
cursor.execute(insert_sql, (
|
|
value['scheduleid'], jj_crossid, value['control_mode'], value['tp_start'], value['planid'],
|
|
citycode))
|
|
self.close(conn, cursor)
|
|
return None
|
|
except Exception as error:
|
|
self.close(conn, cursor)
|
|
return error
|
|
|
|
def query_phase_by_cross_list(self, citycode, cross_list):
|
|
conn, cursor = self.connect()
|
|
try:
|
|
sql = f'''select
|
|
cm.xl_crossid
|
|
from cross_mapping as cm
|
|
join day_schedule as ds on cm.jj_crossid = ds.crossid and cm.nodeid = cm.nodeid
|
|
where
|
|
cm.nodeid = %s and cm.xl_crossid in %s
|
|
group by cm.xl_crossid'''
|
|
print(cursor.mogrify(sql,(citycode,cross_list)))
|
|
cursor.execute(sql,(citycode,cross_list))
|
|
result = cursor.fetchall()
|
|
self.close(conn, cursor)
|
|
return result, None
|
|
except Exception as error:
|
|
self.close(conn, cursor)
|
|
return None, error
|
|
|
|
def query_phase_by_cross_all(self, citycode):
|
|
conn, cursor = self.connect()
|
|
try:
|
|
sql = f'''select
|
|
cm.xl_crossid,cm.nodeid
|
|
from cross_mapping as cm
|
|
join day_schedule as ds on cm.jj_crossid = ds.crossid and cm.nodeid = cm.nodeid
|
|
where
|
|
cm.nodeid in %s
|
|
group by cm.xl_crossid,cm.nodeid'''
|
|
print(cursor.mogrify(sql,[citycode]))
|
|
cursor.execute(sql,[citycode])
|
|
result = cursor.fetchall()
|
|
self.close(conn, cursor)
|
|
return result, None
|
|
except Exception as error:
|
|
self.close(conn, cursor)
|
|
return None, error
|
|
# if __name__ == '__main__':
|
|
# tt_5min = get_latest_5min_timestamp()
|
|
# print(tt_5min)
|
|
# print(get_today_str())
|