cross_doctor/app/common_worker.py

443 lines
16 KiB
Python
Raw Normal View History

2025-10-10 14:38:22 +08:00
# -*- coding: utf-8 -*-
# @Author: Owl
# @Date: 2025/10/10 11:22
# @Description: 初始化加载逻辑及公共属性赋值位置
import argparse
import configparser
import logging
import os
import sys
from datetime import datetime
import yaml
from app.comm import comparable_rank
from app.global_source import *
def load_roadnet_by_city(citycode: int):
logging.info('start load roadnet for %d' % citycode)
net_filename = 'data/xl_roadnet_%d.txt' % citycode
roadinfo_filename = 'data/xl_roadinfo_%d.txt' % citycode
# roadlinks_filename = 'data/xl_roadlinks_%d.txt' % citycode
g_roadnet.load_from_file(net_filename, citycode)
g_roadinfo_manager.load_from_file(roadinfo_filename)
# g_roadlinks_manager.load_from_file(roadlinks_filename)
logging.info('finish load roadnet for %d' % citycode)
def reload_base_info(citycode: int):
global last_reload_time_info
crosses_list, roads_list = db_tmnet.reload_cross_info(citycode, last_reload_time_info[citycode])
for cross_info in crosses_list:
crossid = cross_info['crossid']
if crossid:
g_roadnet.update_cross_info(crossid, cross_info)
if cross_info['isdeleted'] == 1:
del g_roadnet.cross_map[crossid]
for road_info in roads_list:
roadid = road_info['roadid']
if roadid:
g_roadnet.update_road_net(roadid, road_info)
g_roadinfo_manager.update_road_info(roadid, road_info)
last_reload_time_info[citycode] = datetime.now()
def pick_citycode(filename: str):
tail_part = filename.split('_')[-1]
citycode = int(tail_part.split('.')[0])
return citycode
def list_all_citycode(path: str):
"""
找出所有的citycode
"""
citycode_set = set()
files = os.listdir(path)
for file in files:
if not file.endswith(".txt"):
continue
if not file.startswith("xl_roadnet_"):
continue
citycode = pick_citycode(file)
citycode_set.add(citycode)
return citycode_set
def load_all_roadnet():
global g_citycode_set
need_filter_by_city = len(g_citycode_set) > 0
if need_filter_by_city:
# 按照citycode的列表加载各个城市的roadnet
for citycode in g_citycode_set:
last_reload_time_info[citycode] = datetime.strptime('1970-01-01 00:00:00', '%Y-%m-%d %H:%M:%S')
load_roadnet_by_city(citycode)
reload_base_info(citycode)
else:
# 加载data目录下的所有城市的roadnet
citycode_set = list_all_citycode('./data')
for citycode in citycode_set:
load_roadnet_by_city(citycode)
global g_roadnet, g_roadinfo_manager
g_roadnet.do_cross_classify(g_roadinfo_manager)
g_roadnet.collect_midroad_crosses()
# 设置辅路属性
set_all_side_inroadid()
def set_all_side_inroadid():
sup_roadid_set = set()
main_side_pair_list = g_roadnet.query_main_side_road_pair()
for pair in main_side_pair_list:
r1 = pair[0]
r2 = pair[1]
r1_info = g_roadinfo_manager.query_roadinfo(r1)
r1_name = g_roadnet.query_road(r1).name
r2_info = g_roadinfo_manager.query_roadinfo(r2)
r1_rc, r2_rc = additional_sup_road_comparable_rank(r1_info.roadclass, r2_info.roadclass)
r2_name = g_roadnet.query_road(r2).name
if r1_name.__contains__('辅路'):
sup_roadid_set.add(r1)
elif r2_name.__contains__('辅路'):
sup_roadid_set.add(r2)
else:
if r1_rc > r2_rc:
sup_roadid_set.add(r1)
elif r1_rc < r2_rc:
sup_roadid_set.add(r2)
elif r1_rc == r2_rc:
if r1_info.lane_num < r2_info.lane_num:
sup_roadid_set.add(r1)
elif r1_info.lane_num > r2_info.lane_num:
sup_roadid_set.add(r2)
else:
if r1_info.ffs < r2_info.ffs:
sup_roadid_set.add(r1)
elif r1_info.ffs > r2_info.ffs:
sup_roadid_set.add(r2)
else:
pass
# got sup_road_list
side_roadid_list = list(sup_roadid_set)
for rid in side_roadid_list:
g_roadnet.query_road(rid).sideway = True
return side_roadid_list
def additional_sup_road_comparable_rank(r1_roadclass, r2_roadclass):
suburb_rc = [1, 2, 3, 4, 5] # 城市外道路等级分区
city_rc = [6, 7, 8, 9, 10] # 城市内道路等级分区
# 如果既不在城市内分区,又不在城市外分区,说明该道路等级为未知,不做该判断
if (r1_roadclass not in suburb_rc and r1_roadclass not in city_rc) or (r2_roadclass not in suburb_rc and r2_roadclass not in city_rc):
return comparable_rank(r1_roadclass), comparable_rank(r2_roadclass)
# 如果待比较的两个道路等级为同区内的 则进行常规辅路判定逻辑
if not ((r1_roadclass in suburb_rc and r2_roadclass in suburb_rc) or (r1_roadclass in city_rc and r2_roadclass in city_rc)):
# 如果r1是城市外道路r2为城市内道路
if r1_roadclass in suburb_rc and r2_roadclass in city_rc:
# 如果r1不属于县道及其他低等级道路 则认为r1的道路等级更高
if r1_roadclass not in [4, 5]:
return 1, 2
# 县道大于城市支路或郊区小路 其他低等级道路 大于郊区小路
elif (r1_roadclass == 4 and r2_roadclass in [9, 10]) or (r1_roadclass == 5 and r2_roadclass == 10):
return 1, 2
else:
return 2, 1
else:
if r1_roadclass in city_rc and r2_roadclass in suburb_rc:
if r2_roadclass not in [4, 5]:
return 2, 1
elif (r2_roadclass == 4 and r1_roadclass in [9, 10]) or (r2_roadclass == 5 and r1_roadclass == 10):
return 2, 1
else:
return 1, 2
return comparable_rank(r1_roadclass), comparable_rank(r2_roadclass)
def init_with_config():
config = configparser.ConfigParser()
config.read('cross_doctor.ini', encoding='utf-8')
2025-10-10 14:38:22 +08:00
# 数据库相关配置
if not config.has_section("db"):
logging.error('missing db section in router.ini')
sys.exit(1)
dev = dev_args()
# 根据配置,修改密码和数据库名称
if config.has_option('db', 'host'):
host = config.get('db', 'host')
g_dbinfo['host'] = host
if config.has_option('db', 'user'):
user = config.get('db', 'user')
g_dbinfo['user'] = user
if config.has_option('db', 'password'):
password = config.get('db', 'password')
g_dbinfo['password'] = password
if config.has_option('db', 'dbname'):
dbname = config.get('db', 'dbname')
g_dbinfo['db'] = dbname
if config.has_option('roadnet_db', 'host'):
host = config.get('roadnet_db', 'host')
g_roadnet_db['host'] = host
if config.has_option('roadnet_db', 'user'):
user = config.get('roadnet_db', 'user')
g_roadnet_db['user'] = user
if config.has_option('roadnet_db', 'password'):
password = config.get('roadnet_db', 'password')
g_roadnet_db['password'] = password
if config.has_option('roadnet_db', 'dbname'):
dbname = config.get('roadnet_db', 'dbname')
g_roadnet_db['db'] = dbname
if config.has_option('cloud_db', 'host'):
host = config.get('cloud_db', 'host')
g_cloud_db['host'] = host
if config.has_option('cloud_db', 'user'):
user = config.get('cloud_db', 'user')
g_cloud_db['user'] = user
if config.has_option('cloud_db', 'password'):
password = config.get('cloud_db', 'password')
g_cloud_db['password'] = password
if config.has_option('cloud_db', 'dbname'):
dbname = config.get('cloud_db', 'dbname')
g_cloud_db['db'] = dbname
if config.has_option('user_dbinfo', 'port'):
port = config.get('user_dbinfo', 'port')
g_user_db['port'] = port
if config.has_option('user_dbinfo', 'dbname'):
dbname = config.get('user_dbinfo', 'dbname')
g_user_db['db'] = dbname
if config.has_option('user_dbinfo', 'user'):
user = config.get('user_dbinfo', 'user')
g_user_db['user'] = user
if config.has_option('user_dbinfo', 'password'):
password = config.get('user_dbinfo', 'password')
g_user_db['password'] = password
if config.has_option('cross_delay_db', 'port'):
port = int(config.get('cross_delay_db', 'port'))
g_cross_delay_db['port'] = port
if config.has_option('cross_delay_db', 'dbname'):
dbname = config.get('cross_delay_db', 'dbname')
g_cross_delay_db['db'] = dbname
if config.has_option('cross_delay_db', 'user'):
user = config.get('cross_delay_db', 'user')
g_cross_delay_db['user'] = user
if config.has_option('cross_delay_db', 'password'):
password = config.get('cross_delay_db', 'password')
g_cross_delay_db['password'] = password
2025-10-10 14:38:22 +08:00
# redis
if config.has_section('redis'):
if config.has_option('redis', 'ip'):
g_redisinfo['ip'] = config.get('redis', 'ip')
if config.has_option('redis', 'port'):
g_redisinfo['port'] = int(config.get('redis', 'port'))
if config.has_option('redis', 'password'):
g_redisinfo['password'] = config.get('redis', 'password')
if config.has_option('redis', 'db'):
g_redisinfo['db'] = config.get('redis', 'db')
# rpc
if config.has_section('rpc'):
g_config['rpc_host'] = '172.21.32.21'
g_config['rpc_port'] = '50051'
if config.has_option('rpc', 'host'):
g_config['rpc_host'] = config.get('rpc', 'host')
if config.has_option('rpc', 'port'):
g_config['rpc_port'] = int(config.get('rpc', 'port'))
# 重新初始化 延误数据库连接
if dev == 0:
if config.has_option('db', 'host_inner'):
g_dbinfo['host'] = config.get('db', 'host_inner')
if config.has_option('roadnet_db', 'host_inner'):
g_roadnet_db['host'] = config.get('roadnet_db', 'host_inner')
if config.has_option('cloud_db', 'host_inner'):
g_cloud_db['host'] = config.get('cloud_db', 'host_inner')
if config.has_option('cloud_db', 'port_inner'):
g_cloud_db['port'] = int(config.get('cloud_db', 'port_inner'))
if config.has_option('redis', 'ip_inner'):
g_redisinfo['ip'] = config.get('redis', 'ip_inner')
if config.has_option('rpc', 'host_inner'):
g_config['rpc_host'] = config.get('rpc', 'host_inner')
print(g_dbinfo)
print(g_roadnet_db)
print(g_cloud_db)
print(g_user_db)
print(g_cross_delay_db)
2025-10-10 14:38:22 +08:00
g_db_pool.init_pool(g_dbinfo)
g_roadnet_pool.init_pool(g_roadnet_db)
g_cloud_pool.init_pool(g_cloud_db)
g_user_pool.init_pool(g_user_db)
g_cross_delay_pool.init_pool(g_cross_delay_db)
2025-10-10 14:38:22 +08:00
# 本机服务配置
if config.has_section('server'):
if config.has_option('server', 'host'):
g_config['host'] = config.get('server', 'host')
else:
g_config['host'] = '0.0.0.0'
if config.has_option('server', 'port'):
g_config['port'] = int(config.get('server', 'port'))
else:
g_config['port'] = 6666
logging.info(g_config)
# 路网相关配置
if not config.has_section("roadnet"):
logging.error('missing roadnet section in router.ini')
sys.exit(1)
if config.has_option('roadnet', 'citylist'):
citylist_str = config.get('roadnet', 'citylist')
# cityinfo_dict = ast.literal_eval(citylist_str)
# g_config['cityinfo_dict'] = cityinfo_dict
# print(cityinfo_dict)
ss = citylist_str.split(',')
for item in ss:
citycode = int(item)
g_citycode_set.add(citycode)
# 加载所有城市的路网
load_all_roadnet()
cityinfo_dict = read_city_info('city_info.yaml')
print(cityinfo_dict)
g_config['cityinfo_dict'] = cityinfo_dict
# 20250422 新增业务主管独有功能相关内容
if config.has_section('executive'):
if config.has_option('executive', 'userid'):
g_config['executives'] = config.get('executive', 'userid').split(',')
# 20250514 探索分词索引查询路口名称
# full_crossname_inverted_index()
# gen_cross_info_index(cityinfo_dict)
# print('finished load config file, server started')
def dev_args():
try:
parser = argparse.ArgumentParser()
parser.add_argument('--dev', default=0, help='开发环境参数0否1是')
args = parser.parse_args()
return args.dev
except SystemExit:
pass
return 0
def read_city_info(file_path):
with open(file_path, 'r', encoding='utf-8') as f:
config = yaml.safe_load(f)
return config['city_list']
def check_param(params, key):
value = None
if key in params:
value = params[key]
return value
def convert_time(time_int):
time_int = int(time_int)
hours = time_int // 100
minutes = time_int % 100
time_str = f"{hours:02d}:{minutes:02d}"
return time_str
def generate_date_range(start_date_str, end_date_str):
"""生成两个日期之间的所有日期(包括开始和结束日期)"""
start_date = datetime.strptime(start_date_str, "%Y%m%d")
end_date = datetime.strptime(end_date_str, "%Y%m%d")
date_list = []
current_date = start_date
while current_date <= end_date:
date_list.append(current_date.strftime("%Y%m%d"))
current_date += timedelta(days=1)
return date_list
def gen_ten_weeks_ago_data_list():
fmt = "%Y%m%d"
end = datetime.now() - timedelta(days=1) # 昨天
start = end - timedelta(days=69) # 70 天跨度 = 10 个完整周
week_buckets = {}
for d in range((end - start).days + 1):
day = start + timedelta(days=d)
monday = day - timedelta(days=day.weekday()) # 本周一
week_buckets.setdefault(monday, []).append(day.strftime(fmt))
# 大 list从远到近最早一周在最前面
result = [week_buckets[k] for k in sorted(week_buckets, reverse=False)]
return result
def count_lsr(turn_str):
"""
统计车道转向中左转(l)直行(s)右转(r)的数量
Args:
turn_str: 形如 "2|1|1|1|5" 的字符串每个数字对应 g_turn2str 中的键
Returns:
形如 "1/3/1" 的字符串顺序为左转直行右转的数量
"""
# 初始化计数器
left_count = 0
straight_count = 0
right_count = 0
# 分割输入字符串
turn_keys = turn_str.split('|')
# 遍历每个车道
for key in turn_keys:
if key in g_turn2str:
turn_value = g_turn2str[key]
# 忽略掉头和特殊车道
if turn_value in ['t', 'bus', 'reversible', '-']:
continue
# 统计每个转向
if 'l' in turn_value:
left_count += 1
if 's' in turn_value:
straight_count += 1
if 'r' in turn_value:
right_count += 1
# 返回格式化的结果
return f"{left_count}/{straight_count}/{right_count}"
def time_overlap(a: str, b: List[str]) -> bool:
def to_minutes(t: str) -> int:
h, m = map(int, t.split(':'))
return h * 60 + m
a_start, a_end = map(to_minutes, a.split('-'))
for interval in b:
b_start, b_end = map(to_minutes, interval.split('-'))
# 检查是否有重叠
if not (a_end <= b_start or a_start >= b_end):
return True
return False
def parse_week_str(s: str):
start, end = [datetime.strptime(d, "%Y%m%d") for d in s.split('-')]
date_list = [(start + timedelta(days=i)).strftime("%Y%m%d")
for i in range((end - start).days + 1)]
return date_list
def sort_dict_by_clockwise(input_dict: Dict[str, any]) -> dict:
"""
将字典按照顺时针方向从北开始排序键的顺序
参数:
input_dict: 键为方向字母的字典
返回:
按照顺时针顺序排列的有序字典
"""
clockwise_order = ['N', 'NE', 'E', 'SE', 'S', 'SW', 'W', 'NW']
return {key: input_dict[key] for key in clockwise_order if key in input_dict}