# -*- coding: utf-8 -*- # @Author: Owl # @Date: 2025/10/10 16:17 # @Description: from app.db_func_base import * class CrossDbHelper(TableDbHelperBase): def __init__(self, pool): self.db_pool = pool self.DB_Name = 'ledger' def query_cross_usable_date_sql(self, crossid, nodeid): sql = f""" select distinct day from traffic_{nodeid}.cross_delay where crossid = '{crossid}' """ return self.do_select(sql) def query_cross_delay_info(self, crossid, nodeid, date_list, tp_start): date_list = ','.join(["'" + str(item) + "'" for item in date_list]) sql = f""" select * from traffic_{nodeid}.cross_delay where crossid = '{crossid}' and day in ({date_list}) and tp_start = '{tp_start}' order by day """ return self.do_select(sql) def query_cross_delay_whole_day_hours(self, crossid, nodeid, query_date): sql = f""" select * from traffic_{nodeid}.cross_delay where crossid = '{crossid}' and day = '{query_date}' and tp_start like 'h%' order by CAST(REGEXP_SUBSTR(tp_start, '[0-9]+') AS UNSIGNED) """ return self.do_select(sql) def query_cross_examine_records(self, start_hm, first_date, crossid, end_date): sql = """ select * from cross_doctor_matedata.cross_phase_problems_record where crossid = '%s' and start_hm = '%s' and first_date <= %s and (end_date >= '%s' or end_date is null) """ % (crossid, start_hm, first_date, end_date) return self.do_select(sql) def query_cross_examine_record_detail(self, start_hm, first_date, crossid): sql = """ select * from cross_doctor_matedata.cross_phase_problems_record where crossid = '%s' and start_hm = '%s' and first_date = %s """ % (crossid, start_hm, first_date) return self.do_select(sql) def query_crosses_examine_records(self, crossid_list, first_date, min_date): crossids = ','.join(["'" + str(item) + "'" for item in crossid_list]) sql = """ select * from cross_doctor_matedata.cross_phase_problems_record where crossid in (%s) and first_date <= '%s' and (end_date >= '%s' or end_date is null) """ % (crossids, first_date, min_date) return self.do_select(sql) def query_cross_examine_records_nostarthm(self, first_date, crossid, end_date): sql = """ select * from cross_doctor_matedata.cross_phase_problems_record where crossid = '%s' and first_date <= %s and (end_date >= '%s' or end_date is null) """ % (crossid, first_date, end_date) return self.do_select(sql) def update_cross_examine_record_state_sql(self, crossid, first_date, end_date, start_hm, state): sql = """ update cross_doctor_matedata.cross_phase_problems_record set final_state = '%s', end_date = '%s' where crossid = '%s' and first_date = '%s' and start_hm = %s """ % (state, end_date, crossid, first_date, start_hm) return self.do_execute(sql) def query_monitor_task_dates(self, nodeid, area_id): day_sql = f"select distinct day from traffic_{nodeid}.cross_inspect where citycode = {nodeid} and area_id = {area_id} and type = 'day'" week_sql = f"select distinct day from traffic_{nodeid}.cross_inspect where citycode = {nodeid} and area_id = {area_id} and type = 'week'" day_date_list = self.do_select(day_sql) week_date_list = self.do_select(week_sql) day_list = [item['day'] for item in day_date_list] week_list = [item['day'] for item in week_date_list] return day_list, week_list def query_monitor_data_sql(self, nodeid, area_id, date_type, query_date): if date_type in ['week', 'workday', 'weekend']: date_type = 'week' sql = f""" select * from traffic_{nodeid}.cross_inspect where citycode = {nodeid} and area_id = {area_id} and type = '{date_type}' and day = '{query_date}' """ return self.do_select(sql) def query_monitor_data_trend_sql(self, nodeid, area_id, date_type): limit_num = 30 if date_type in ['week', 'workday', 'weekend']: date_type = 'week' limit_num = 10 sql = f""" select * from traffic_{nodeid}.cross_inspect where area_id = {area_id} and type = '{date_type}' order by day desc limit {limit_num} """ return self.do_select(sql) def query_cross_tp_problem_record(self, yesterday, date_type, area_id): sql = f""" select * from cross_doctor_matedata.cross_tp_problem_record where area_id = {area_id} and date_type = '{date_type}' and date(update_time) = '{yesterday}' and time_range is not null and time_range != '' """ return self.do_select(sql) def query_tide_cross_records(self, yesterday, area_id): sql = f""" select * from cross_doctor_matedata.cross_tp_problem_record where area_id = {area_id} and date(update_time) = '{yesterday}' """ return self.do_select(sql) def insert_cross_tp_problem_record(self, insert_list): conn, cursor = self.connect() sql = f""" insert into cross_doctor_matedata.cross_tp_problem_record (area_id, crossid, weekday, time_range, date_type, high_park_problem, too_many_stop_times, imbalance_inroad, turn_imbalance, tide_cross, inroad_turnlane_mismatch) values (%s, '%s', '%s', '%s', '%s', %d, %d, %d, %d, %d, %d)""" % (insert_list) conn.begin() try: ret = cursor.executemany(sql) if ret == len(insert_list): conn.commit() return 1, "success" else: conn.rollback() return 0, "fail" except Exception as e: conn.rollback() logging.error(e) return 0, "error" def query_inroad_lanes(self, nodeid_list): nodeids = ','.join(["'" + str(item) + "'" for item in nodeid_list]) sql = """ select t1.roadid, if(t1.lane_turn_info != t1.lane_turn_info, t2.lane_turn_info, t1.lane_turn_info) as lane_turn_info from (select roadid,lane_turn_info from tmnet.road where nodeid in (%s) and is_sup_road <> 1) t1 left join (select roadid, lane_turn_info from tmnet.road_ledegr_update_info where nodeid in (%s) and is_sup_road <> 1) t2 on t1.roadid = t2.roadid """ % (nodeids, nodeids) return self.do_select(sql) def query_cross_problem_record(self, area_id, update_date, query_type): sql = """ select * from cross_doctor_matedata.cross_tp_problem_record where area_id = %s and day = '%s' and date_type = '%s' """ % (area_id, update_date, query_type) return self.do_select(sql) def query_shield_records(self, area_id): sql = """ select * from cross_doctor_matedata.cross_problem_shield where area_id = %s """ % (area_id) return self.do_select(sql) def add_shield_records(self, crossid ,area_id, weekday, time_range, problem_key, date_type, first_date): sql = """ insert into cross_doctor_matedata.cross_problem_shield (crossid, area_id, weekdays, time_range, problem_key, date_type, first_date) values ('%s', %s, '%s', '%s', '%s', '%s', '%s') """ % (crossid, area_id, weekday, time_range, problem_key, date_type, first_date) return self.do_execute(sql) def delete_shield_records(self, crossid, area_id, weekday, time_range, problem_key, date_type, first_date): sql = """ delete from cross_doctor_matedata.cross_problem_shield where crossid = '%s' and area_id = %s and weekdays = '%s' and time_range = '%s' and problem_key = '%s' and date_type = '%s' and first_date = '%s' """ % (crossid, area_id, weekday, time_range, problem_key, date_type, first_date) return self.do_execute(sql) def query_csr_data(self, nodeid, key, crossid): sql = f""" select * from traffic_{nodeid}.csr_data where key = '{key}' and crossid = '{crossid}' """ return self.do_select(sql) def check_err_data(self): sql = """ select * from cross_doctor_matedata.cross_phase_problems_record """ return self.do_select(sql) def update_err_data(self, crossid, start_hm, end_hm, cont_times, final_state, level_color, first_date, new_first_date): sql = """ update cross_doctor_matedata.cross_phase_problems_record set first_date = %s where crossid = '%s' and start_hm = %s and end_hm = %s and cont_times = %s and final_state = %s and level_color = %s and first_date = %s """ % (new_first_date, crossid, start_hm, end_hm, cont_times, final_state, level_color, first_date) return self.do_execute(sql) def query_cross_flow_usable_date_sql(self, nodeid, crossid): conn, cursor = self.connect() try: sql = f"""select distinct day from traffic_{nodeid}.cross_flowdata where crossid = %s """ print(cursor.mogrify(sql, (crossid))) cursor.execute(sql, (crossid)) result = cursor.fetchall() return result, None except Exception as error: return None, error finally: self.close(conn, cursor) def query_cross_flowdata(self, nodeid, crossid, date_list): conn, cursor = self.connect() try: sql = f"""select crossid,`day`,data from traffic_{nodeid}.cross_flowdata where crossid = %s and `day` in %s""" print(cursor.mogrify(sql, (crossid, date_list))) cursor.execute(sql, (crossid, date_list)) result = cursor.fetchall() return result, None except Exception as error: return None, error finally: self.close(conn, cursor)