cross_doctor/test.py

78 lines
3.2 KiB
Python

# -*- coding: utf-8 -*-
# @Author: Owl
# @Date: 2025/10/13 10:26
# @Description: 用于本地测试的执行文件
import configparser
from google.protobuf.json_format import MessageToJson
from app.common_worker import generate_date_range, convert_time
from app.global_source import *
import proto.xlcomm_pb2 as pb
def init():
config = configparser.ConfigParser()
global g_db_pool, g_db_gw
config.read('cross_doctor.ini', encoding='utf-8')
net_filename = 'data/xl_roadnet_350300.txt'
roadinfo_filename = 'data/xl_roadinfo_350300.txt'
roadlinks_filename = 'data/xl_roadlinks_350300.txt'
g_roadnet.load_from_file(net_filename)
g_roadinfo_manager.load_from_file(roadinfo_filename)
g_roadlinks_manager.load_from_file(roadlinks_filename)
if config.has_option('cloud_db', 'host'):
g_cloud_db['host'] = config.get('cloud_db', 'host')
if config.has_option('cloud_db', 'port'):
g_cloud_db['port'] = int(config.get('cloud_db', 'port'))
g_cloud_pool.init_pool(g_cloud_db)
if config.has_option('db', 'host'):
g_dbinfo['host'] = config.get('db', 'host')
g_db_pool.init_pool(g_dbinfo)
if config.has_option('cross_delay_db', 'host'):
g_cross_delay_db['host'] = config.get('cross_delay_db', 'host')
if config.has_option('cross_delay_db', 'port'):
g_cross_delay_db['port'] = int(config.get('cross_delay_db', 'port'))
if config.has_option('cross_delay_db', 'dbname'):
g_cross_delay_db['db'] = config.get('cross_delay_db', 'dbname')
if config.has_option('cross_delay_db', 'password'):
g_cross_delay_db['password'] = config.get('cross_delay_db', 'password')
print(g_cross_delay_db)
g_cross_delay_pool.init_pool(g_cross_delay_db)
def test_get_cross_delay_data():
row_list = db_cross.query_cross_delay_info('CR_11987179_2632645', 350100, '20251013', 't830')
data = row_list[0]['data']
cross_delay = pb.xl_cross_delayinfo_t()
cross_delay.ParseFromString(data)
print(MessageToJson(cross_delay, indent=None, always_print_fields_with_no_presence=True))
def check_err_data():
row_list = db_cross.check_err_data()
prev_date = (datetime.now() - timedelta(days=1)).strftime('%Y%m%d')
for row in row_list:
if row['final_state'] == 6 or row['level_color'] == 4:
continue
first_date = row['first_date']
end_date = row['end_date']
if end_date:
date_list = generate_date_range(first_date, end_date)
else:
date_list = generate_date_range(first_date, prev_date)
phase_types = row['phase_type']
phase_details = row['phase_detail']
phase_type_list = phase_types.split('^')
phase_detail_list = phase_details.split('^')
if len(date_list) < len(phase_type_list):
# new_first_date = (datetime.strptime(first_date, '%Y%m%d') - timedelta(days=1)).strftime('%Y%m%d')
# ret = db_cross.update_err_data(row['crossid'], row['start_hm'], row['end_hm'], row['cont_times'], row['final_state'], row['level_color'], first_date, new_first_date)
# if ret != 1:
# print(row)
print(len(date_list), len(phase_type_list), row['cont_times'])
if __name__ == '__main__':
init()
check_err_data()