# -*- coding: utf-8 -*- # @Author: Owl # @Date: 2025/10/13 10:26 # @Description: 用于本地测试的执行文件 import configparser from google.protobuf.json_format import MessageToJson from app.common_worker import generate_date_range, convert_time from app.global_source import * import proto.xlcomm_pb2 as pb def init(): config = configparser.ConfigParser() global g_db_pool, g_db_gw config.read('cross_doctor.ini', encoding='utf-8') net_filename = 'data/xl_roadnet_350300.txt' roadinfo_filename = 'data/xl_roadinfo_350300.txt' roadlinks_filename = 'data/xl_roadlinks_350300.txt' g_roadnet.load_from_file(net_filename) g_roadinfo_manager.load_from_file(roadinfo_filename) g_roadlinks_manager.load_from_file(roadlinks_filename) if config.has_option('cloud_db', 'host'): g_cloud_db['host'] = config.get('cloud_db', 'host') if config.has_option('cloud_db', 'port'): g_cloud_db['port'] = int(config.get('cloud_db', 'port')) g_cloud_pool.init_pool(g_cloud_db) if config.has_option('db', 'host'): g_dbinfo['host'] = config.get('db', 'host') g_db_pool.init_pool(g_dbinfo) if config.has_option('cross_delay_db', 'host'): g_cross_delay_db['host'] = config.get('cross_delay_db', 'host') if config.has_option('cross_delay_db', 'port'): g_cross_delay_db['port'] = int(config.get('cross_delay_db', 'port')) if config.has_option('cross_delay_db', 'dbname'): g_cross_delay_db['db'] = config.get('cross_delay_db', 'dbname') if config.has_option('cross_delay_db', 'password'): g_cross_delay_db['password'] = config.get('cross_delay_db', 'password') print(g_cross_delay_db) g_cross_delay_pool.init_pool(g_cross_delay_db) def test_get_cross_delay_data(): row_list = db_cross.query_cross_delay_info('CR_11987179_2632645', 350100, '20251013', 't830') data = row_list[0]['data'] cross_delay = pb.xl_cross_delayinfo_t() cross_delay.ParseFromString(data) print(MessageToJson(cross_delay, indent=None, always_print_fields_with_no_presence=True)) def check_err_data(): row_list = db_cross.check_err_data() prev_date = (datetime.now() - timedelta(days=1)).strftime('%Y%m%d') for row in row_list: if row['final_state'] == 6 or row['level_color'] == 4: continue first_date = row['first_date'] end_date = row['end_date'] if end_date: date_list = generate_date_range(first_date, end_date) else: date_list = generate_date_range(first_date, prev_date) phase_types = row['phase_type'] phase_details = row['phase_detail'] phase_type_list = phase_types.split('^') phase_detail_list = phase_details.split('^') if len(date_list) < len(phase_type_list): # new_first_date = (datetime.strptime(first_date, '%Y%m%d') - timedelta(days=1)).strftime('%Y%m%d') # ret = db_cross.update_err_data(row['crossid'], row['start_hm'], row['end_hm'], row['cont_times'], row['final_state'], row['level_color'], first_date, new_first_date) # if ret != 1: # print(row) print(len(date_list), len(phase_type_list), row['cont_times']) if __name__ == '__main__': init() check_err_data()