cross_doctor/app/tmnet_db_func.py

214 lines
10 KiB
Python
Raw Normal View History

2025-10-10 14:38:22 +08:00
from app.db_func_base import TableDbHelperBase
class TmnetDbHelper(TableDbHelperBase):
def __init__(self, pool):
self.db_pool = pool
self.DB_Name = 'tmnet'
def reload_cross_info(self, nodeid, last_reload_time):
sql1 = "select * from cross_ledger_update_info where nodeid = %s and crossid is not null" % nodeid
sql2 = "select * from road_ledger_update_info where nodeid = %s" % nodeid
crosses_list, cross_res_list = self.do_select(sql1), []
for cross_row in crosses_list:
if cross_row['update_time'] > last_reload_time:
cross_res_list.append(cross_row)
roads_list, road_res_list = self.do_select(sql2), []
for road_row in roads_list:
if road_row['update_time'] > last_reload_time:
road_res_list.append(road_row)
return cross_res_list, road_res_list
def query_phase_by_cross_list(self, citycode, cross_list):
conn, cursor = self.connect()
try:
sql = f'''select
cm.xl_crossid
from phasetable.cross_mapping as cm
join phasetable.day_schedule as ds on cm.jj_crossid = ds.crossid and cm.nodeid = cm.nodeid
where
cm.nodeid = %s and cm.xl_crossid in %s
group by cm.xl_crossid'''
print(cursor.mogrify(sql,(citycode,cross_list)))
cursor.execute(sql,(citycode,cross_list))
result = cursor.fetchall()
self.close(conn, cursor)
return result, None
except Exception as error:
self.close(conn, cursor)
return None, error
def query_cross_info(self, nodeid):
sql = "select * from `cross` where nodeid = %s" % nodeid
return self.do_select(sql)
def query_cross_info_new(self, citycode):
conn, cursor = self.connect()
try:
sql = f'''select * from `cross` where nodeid = %s'''
print(cursor.mogrify(sql, (citycode)))
cursor.execute(sql, (citycode))
result = cursor.fetchall()
self.close(conn, cursor)
return result, None
except Exception as error:
self.close(conn, cursor)
return None, error
def query_cross_list_sql(self, nodeid, area_id):
sql = """
select
if(t2.name is not null, t2.name, t1.name) as name,
t1.crossid,
if (t2.location is not null, t2.location, t1.location) as location,
t1.nodeid,
t1.area_id
2025-10-30 15:16:10 +08:00
from (select name,crossid, location,nodeid, area_id from `cross` where nodeid = %s and area_id = %s and at_edge=0 and isdeleted=0 ) as t1
left join (select name,crossid, location,nodeid, area_id from `cross_ledger_update_info` where nodeid = %s and area_id = %s and at_edge=0 and isdeleted=0 ) as t2 on t1.crossid=t2.crossid
""" % (nodeid, area_id, nodeid, area_id)
return self.do_select(sql)
def query_cross_inroads(self, crossid, nodeid):
sql = """
select
t1.roadid,
if(t2.from_crossid is not null, t2.from_crossid, t1.from_crossid) as from_crossid,
if(t2.to_crossid is not null, t2.to_crossid, t1.to_crossid) as to_crossid,
if(t2.name is not null, t2.name, t1.name) as name,
if(t2.src_direct is not null, t2.src_direct, t1.src_direct) as src_direct,
if(t2.lane_turn_info is not null, t2.lane_turn_info, t1.lane_turn_info) as lane_turn_info
from
(select * from road where nodeid = '%s' and recordstate=0 and to_crossid='%s' and (is_sup_road is null or is_sup_road<>1)) t1
left join
(select * from road_ledger_update_info where nodeid = '%s' and recordstate=0 and to_crossid='%s' and (is_sup_road is null or is_sup_road<>1)) t2
on t1.roadid = t2.roadid;
""" % (nodeid, crossid, nodeid, crossid)
return self.do_select(sql)
def query_base_info(self, nodeid, area_id):
sql = "select * from ledger.leger_base_info where nodeid= %d and area_id = %s" % (int(nodeid), int(area_id))
return self.do_select(sql)
def query_city_tp_info(self, nodeid, area_id):
sql = f"select tp_desc, peak_tp from cross_doctor_config.area_tp_config where nodeid = {nodeid} and area_id = {area_id}"
return self.do_select(sql)
def query_all_cross_examine_records(self, yes_str):
sql = "select * from cross_doctor_matedata.cross_phase_problems_record where date(update_time) = '%s' and end_date is null" % yes_str
return self.do_select(sql)
def insert_cross_examine_records(self, insert_list):
conn, cursor = self.connect()
try:
sql = """insert into cross_doctor_matedata.cross_phase_problems_record(start_hm,end_hm,crossid,phase_type, phase_detail,cont_times,final_state,level_color,first_date,change_red_daynums)
values(%s, %s, %s, %s,%s, %s, %s, %s,%s, %s)
"""
values = [
(
d['start_hm'], d['end_hm'], d['crossid'], d['phase_type'], d['phase_detail'], d['cont_times'], d['final_state'],
d['level_color'], d['first_date'], d['change_red_daynums']
) for d in insert_list
]
ret = cursor.executemany(sql, values)
if ret == len(insert_list):
conn.commit()
self.close(conn, cursor)
return ret, None
else:
conn.rollback()
self.close(conn, cursor)
return 0, "insert_cross_examine_records error"
except Exception as e:
conn.rollback()
self.close(conn, cursor)
return 0, e
def update_cross_examine_records(self, update_list):
conn, cursor = self.connect()
try:
sql = """
update cross_doctor_matedata.cross_phase_problems_record
set phase_type = %s, phase_detail = %s, cont_times = %s, final_state = %s, level_color = %s, change_red_daynums = %s,
end_date = %s where crossid = %s and start_hm = %s and first_date = %s
"""
values = [
(
d['phase_type'], d['phase_detail'], d['cont_times'], d['final_state'], d['level_color'],
d['change_red_daynums'], d['end_date'], d['crossid'], d['start_hm'], d['first_date']
) for d in update_list
]
ret = cursor.executemany(sql, values)
if ret == len(update_list):
conn.commit()
self.close(conn, cursor)
return ret, None
else:
conn.rollback()
self.close(conn, cursor)
return ret, None
except Exception as e:
conn.rollback()
self.close(conn, cursor)
return 0, e
def query_cross_examine_records(self, start_hm, first_date, crossid, end_date):
sql = """
select * from cross_doctor_matedata.cross_phase_problems_record where crossid = '%s' and start_hm = '%s' and first_date <= %s and (end_date >= '%s' or end_date is null)
""" % (crossid, start_hm, first_date, end_date)
return self.do_select(sql)
def query_out_cross(self, outroadid):
sql = "select to_crossid from road where roadid = '%s'" % outroadid
return self.do_select(sql)
def query_next_cross_info(self, nodeid, area_id, crossid):
sql = """
select
if(t2.name is not null, t2.name, t1.name) as name,
t1.crossid,
if (t2.location is not null, t2.location, t1.location) as location,
t1.nodeid,
t1.area_id
2025-10-31 14:55:28 +08:00
from (select name,crossid, location,nodeid, area_id from `cross` where nodeid = %s and area_id = %s and at_edge=0 and isdeleted=0 and crossid = '%s') as t1
left join (select name,crossid, location,nodeid, area_id from `cross_ledger_update_info` where nodeid = %s and area_id = %s and at_edge=0 and isdeleted=0 and crossid = '%s') as t2 on t1.crossid=t2.crossid
""" % (nodeid, area_id, crossid, nodeid, area_id, crossid)
return self.do_select(sql)
def query_cross_list_road(self, nodeid:str, area_id:str):
conn, cursor = self.connect()
try:
sql = """select if(clui.name is not null, clui.name, c.name) as name,
c.crossid,
if(clui.location is not null, clui.location, c.location) as location,
c.nodeid,
c.area_id,
t2.to_crossid,
t2.name road_name,
t2.src_direct
from `cross` as c
left join `cross_ledger_update_info` as clui on clui.crossid = c.crossid and clui.nodeid = c.nodeid and clui.area_id = c.area_id
left join (
select r.roadid,
if(rlui.from_crossid is not null, rlui.from_crossid, r.from_crossid) as from_crossid,
if(rlui.to_crossid is not null, rlui.to_crossid, r.to_crossid) as to_crossid,
if(rlui.name is not null, rlui.name, r.name) as name,
if(rlui.src_direct is not null, rlui.src_direct, r.src_direct) as src_direct
from road as r
left join road_ledger_update_info as rlui on rlui.nodeid = r.nodeid and rlui.recordstate = r.recordstate and rlui.to_crossid = r.to_crossid and (rlui.is_sup_road is null or rlui.is_sup_road <> 1)
where r.nodeid = %s
and r.recordstate = 0
and (r.is_sup_road is null or r.is_sup_road <> 1)
) as t2 on t2.to_crossid = c.crossid
where c.nodeid = %s
and c.area_id = %s
and c.at_edge = 0
and c.isdeleted = 0"""
print(cursor.mogrify(sql, (nodeid, nodeid, area_id)))
cursor.execute(sql, (nodeid, nodeid, area_id))
result = cursor.fetchall()
self.close(conn, cursor)
return result, None
except Exception as error:
self.close(conn, cursor)
return None, error