cross_doctor/app/common_worker.py

443 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
# @Author: Owl
# @Date: 2025/10/10 11:22
# @Description: 初始化加载逻辑及公共属性赋值位置
import argparse
import configparser
import logging
import os
import sys
from datetime import datetime
import yaml
from app.comm import comparable_rank
from app.global_source import *
def load_roadnet_by_city(citycode: int):
logging.info('start load roadnet for %d' % citycode)
net_filename = 'data/xl_roadnet_%d.txt' % citycode
roadinfo_filename = 'data/xl_roadinfo_%d.txt' % citycode
# roadlinks_filename = 'data/xl_roadlinks_%d.txt' % citycode
g_roadnet.load_from_file(net_filename, citycode)
g_roadinfo_manager.load_from_file(roadinfo_filename)
# g_roadlinks_manager.load_from_file(roadlinks_filename)
logging.info('finish load roadnet for %d' % citycode)
def reload_base_info(citycode: int):
global last_reload_time_info
crosses_list, roads_list = db_tmnet.reload_cross_info(citycode, last_reload_time_info[citycode])
for cross_info in crosses_list:
crossid = cross_info['crossid']
if crossid:
g_roadnet.update_cross_info(crossid, cross_info)
if cross_info['isdeleted'] == 1:
del g_roadnet.cross_map[crossid]
for road_info in roads_list:
roadid = road_info['roadid']
if roadid:
g_roadnet.update_road_net(roadid, road_info)
g_roadinfo_manager.update_road_info(roadid, road_info)
last_reload_time_info[citycode] = datetime.now()
def pick_citycode(filename: str):
tail_part = filename.split('_')[-1]
citycode = int(tail_part.split('.')[0])
return citycode
def list_all_citycode(path: str):
"""
找出所有的citycode
"""
citycode_set = set()
files = os.listdir(path)
for file in files:
if not file.endswith(".txt"):
continue
if not file.startswith("xl_roadnet_"):
continue
citycode = pick_citycode(file)
citycode_set.add(citycode)
return citycode_set
def load_all_roadnet():
global g_citycode_set
need_filter_by_city = len(g_citycode_set) > 0
if need_filter_by_city:
# 按照citycode的列表加载各个城市的roadnet
for citycode in g_citycode_set:
last_reload_time_info[citycode] = datetime.strptime('1970-01-01 00:00:00', '%Y-%m-%d %H:%M:%S')
load_roadnet_by_city(citycode)
reload_base_info(citycode)
else:
# 加载data目录下的所有城市的roadnet
citycode_set = list_all_citycode('./data')
for citycode in citycode_set:
load_roadnet_by_city(citycode)
global g_roadnet, g_roadinfo_manager
g_roadnet.do_cross_classify(g_roadinfo_manager)
g_roadnet.collect_midroad_crosses()
# 设置辅路属性
set_all_side_inroadid()
def set_all_side_inroadid():
sup_roadid_set = set()
main_side_pair_list = g_roadnet.query_main_side_road_pair()
for pair in main_side_pair_list:
r1 = pair[0]
r2 = pair[1]
r1_info = g_roadinfo_manager.query_roadinfo(r1)
r1_name = g_roadnet.query_road(r1).name
r2_info = g_roadinfo_manager.query_roadinfo(r2)
r1_rc, r2_rc = additional_sup_road_comparable_rank(r1_info.roadclass, r2_info.roadclass)
r2_name = g_roadnet.query_road(r2).name
if r1_name.__contains__('辅路'):
sup_roadid_set.add(r1)
elif r2_name.__contains__('辅路'):
sup_roadid_set.add(r2)
else:
if r1_rc > r2_rc:
sup_roadid_set.add(r1)
elif r1_rc < r2_rc:
sup_roadid_set.add(r2)
elif r1_rc == r2_rc:
if r1_info.lane_num < r2_info.lane_num:
sup_roadid_set.add(r1)
elif r1_info.lane_num > r2_info.lane_num:
sup_roadid_set.add(r2)
else:
if r1_info.ffs < r2_info.ffs:
sup_roadid_set.add(r1)
elif r1_info.ffs > r2_info.ffs:
sup_roadid_set.add(r2)
else:
pass
# got sup_road_list
side_roadid_list = list(sup_roadid_set)
for rid in side_roadid_list:
g_roadnet.query_road(rid).sideway = True
return side_roadid_list
def additional_sup_road_comparable_rank(r1_roadclass, r2_roadclass):
suburb_rc = [1, 2, 3, 4, 5] # 城市外道路等级分区
city_rc = [6, 7, 8, 9, 10] # 城市内道路等级分区
# 如果既不在城市内分区,又不在城市外分区,说明该道路等级为未知,不做该判断
if (r1_roadclass not in suburb_rc and r1_roadclass not in city_rc) or (r2_roadclass not in suburb_rc and r2_roadclass not in city_rc):
return comparable_rank(r1_roadclass), comparable_rank(r2_roadclass)
# 如果待比较的两个道路等级为同区内的 则进行常规辅路判定逻辑
if not ((r1_roadclass in suburb_rc and r2_roadclass in suburb_rc) or (r1_roadclass in city_rc and r2_roadclass in city_rc)):
# 如果r1是城市外道路r2为城市内道路
if r1_roadclass in suburb_rc and r2_roadclass in city_rc:
# 如果r1不属于县道及其他低等级道路 则认为r1的道路等级更高
if r1_roadclass not in [4, 5]:
return 1, 2
# 县道大于城市支路或郊区小路 其他低等级道路 大于郊区小路
elif (r1_roadclass == 4 and r2_roadclass in [9, 10]) or (r1_roadclass == 5 and r2_roadclass == 10):
return 1, 2
else:
return 2, 1
else:
if r1_roadclass in city_rc and r2_roadclass in suburb_rc:
if r2_roadclass not in [4, 5]:
return 2, 1
elif (r2_roadclass == 4 and r1_roadclass in [9, 10]) or (r2_roadclass == 5 and r1_roadclass == 10):
return 2, 1
else:
return 1, 2
return comparable_rank(r1_roadclass), comparable_rank(r2_roadclass)
def init_with_config():
config = configparser.ConfigParser()
config.read('cross_doctor.ini', encoding='utf-8')
# 数据库相关配置
if not config.has_section("db"):
logging.error('missing db section in router.ini')
sys.exit(1)
dev = dev_args()
# 根据配置,修改密码和数据库名称
if config.has_option('db', 'host'):
host = config.get('db', 'host')
g_dbinfo['host'] = host
if config.has_option('db', 'user'):
user = config.get('db', 'user')
g_dbinfo['user'] = user
if config.has_option('db', 'password'):
password = config.get('db', 'password')
g_dbinfo['password'] = password
if config.has_option('db', 'dbname'):
dbname = config.get('db', 'dbname')
g_dbinfo['db'] = dbname
if config.has_option('roadnet_db', 'host'):
host = config.get('roadnet_db', 'host')
g_roadnet_db['host'] = host
if config.has_option('roadnet_db', 'user'):
user = config.get('roadnet_db', 'user')
g_roadnet_db['user'] = user
if config.has_option('roadnet_db', 'password'):
password = config.get('roadnet_db', 'password')
g_roadnet_db['password'] = password
if config.has_option('roadnet_db', 'dbname'):
dbname = config.get('roadnet_db', 'dbname')
g_roadnet_db['db'] = dbname
if config.has_option('cloud_db', 'host'):
host = config.get('cloud_db', 'host')
g_cloud_db['host'] = host
if config.has_option('cloud_db', 'user'):
user = config.get('cloud_db', 'user')
g_cloud_db['user'] = user
if config.has_option('cloud_db', 'password'):
password = config.get('cloud_db', 'password')
g_cloud_db['password'] = password
if config.has_option('cloud_db', 'dbname'):
dbname = config.get('cloud_db', 'dbname')
g_cloud_db['db'] = dbname
if config.has_option('user_dbinfo', 'port'):
port = config.get('user_dbinfo', 'port')
g_user_db['port'] = port
if config.has_option('user_dbinfo', 'dbname'):
dbname = config.get('user_dbinfo', 'dbname')
g_user_db['db'] = dbname
if config.has_option('user_dbinfo', 'user'):
user = config.get('user_dbinfo', 'user')
g_user_db['user'] = user
if config.has_option('user_dbinfo', 'password'):
password = config.get('user_dbinfo', 'password')
g_user_db['password'] = password
if config.has_option('cross_delay_db', 'port'):
port = int(config.get('cross_delay_db', 'port'))
g_cross_delay_db['port'] = port
if config.has_option('cross_delay_db', 'dbname'):
dbname = config.get('cross_delay_db', 'dbname')
g_cross_delay_db['db'] = dbname
if config.has_option('cross_delay_db', 'user'):
user = config.get('cross_delay_db', 'user')
g_cross_delay_db['user'] = user
if config.has_option('cross_delay_db', 'password'):
password = config.get('cross_delay_db', 'password')
g_cross_delay_db['password'] = password
# redis
if config.has_section('redis'):
if config.has_option('redis', 'ip'):
g_redisinfo['ip'] = config.get('redis', 'ip')
if config.has_option('redis', 'port'):
g_redisinfo['port'] = int(config.get('redis', 'port'))
if config.has_option('redis', 'password'):
g_redisinfo['password'] = config.get('redis', 'password')
if config.has_option('redis', 'db'):
g_redisinfo['db'] = config.get('redis', 'db')
# rpc
if config.has_section('rpc'):
g_config['rpc_host'] = '172.21.32.21'
g_config['rpc_port'] = '50051'
if config.has_option('rpc', 'host'):
g_config['rpc_host'] = config.get('rpc', 'host')
if config.has_option('rpc', 'port'):
g_config['rpc_port'] = int(config.get('rpc', 'port'))
# 重新初始化 延误数据库连接
if dev == 0:
if config.has_option('db', 'host_inner'):
g_dbinfo['host'] = config.get('db', 'host_inner')
if config.has_option('roadnet_db', 'host_inner'):
g_roadnet_db['host'] = config.get('roadnet_db', 'host_inner')
if config.has_option('cloud_db', 'host_inner'):
g_cloud_db['host'] = config.get('cloud_db', 'host_inner')
if config.has_option('cloud_db', 'port_inner'):
g_cloud_db['port'] = int(config.get('cloud_db', 'port_inner'))
if config.has_option('redis', 'ip_inner'):
g_redisinfo['ip'] = config.get('redis', 'ip_inner')
if config.has_option('rpc', 'host_inner'):
g_config['rpc_host'] = config.get('rpc', 'host_inner')
print(g_dbinfo)
print(g_roadnet_db)
print(g_cloud_db)
print(g_user_db)
print(g_cross_delay_db)
g_db_pool.init_pool(g_dbinfo)
g_roadnet_pool.init_pool(g_roadnet_db)
g_cloud_pool.init_pool(g_cloud_db)
g_user_pool.init_pool(g_user_db)
g_cross_delay_pool.init_pool(g_cross_delay_db)
# 本机服务配置
if config.has_section('server'):
if config.has_option('server', 'host'):
g_config['host'] = config.get('server', 'host')
else:
g_config['host'] = '0.0.0.0'
if config.has_option('server', 'port'):
g_config['port'] = int(config.get('server', 'port'))
else:
g_config['port'] = 6666
logging.info(g_config)
# 路网相关配置
if not config.has_section("roadnet"):
logging.error('missing roadnet section in router.ini')
sys.exit(1)
if config.has_option('roadnet', 'citylist'):
citylist_str = config.get('roadnet', 'citylist')
# cityinfo_dict = ast.literal_eval(citylist_str)
# g_config['cityinfo_dict'] = cityinfo_dict
# print(cityinfo_dict)
ss = citylist_str.split(',')
for item in ss:
citycode = int(item)
g_citycode_set.add(citycode)
# 加载所有城市的路网
load_all_roadnet()
cityinfo_dict = read_city_info('city_info.yaml')
print(cityinfo_dict)
g_config['cityinfo_dict'] = cityinfo_dict
# 20250422 新增业务主管独有功能相关内容
if config.has_section('executive'):
if config.has_option('executive', 'userid'):
g_config['executives'] = config.get('executive', 'userid').split(',')
# 20250514 探索分词索引查询路口名称
# full_crossname_inverted_index()
# gen_cross_info_index(cityinfo_dict)
# print('finished load config file, server started')
def dev_args():
try:
parser = argparse.ArgumentParser()
parser.add_argument('--dev', default=0, help='开发环境参数0否1是')
args = parser.parse_args()
return args.dev
except SystemExit:
pass
return 0
def read_city_info(file_path):
with open(file_path, 'r', encoding='utf-8') as f:
config = yaml.safe_load(f)
return config['city_list']
def check_param(params, key):
value = None
if key in params:
value = params[key]
return value
def convert_time(time_int):
time_int = int(time_int)
hours = time_int // 100
minutes = time_int % 100
time_str = f"{hours:02d}:{minutes:02d}"
return time_str
def generate_date_range(start_date_str, end_date_str):
"""生成两个日期之间的所有日期(包括开始和结束日期)"""
start_date = datetime.strptime(start_date_str, "%Y%m%d")
end_date = datetime.strptime(end_date_str, "%Y%m%d")
date_list = []
current_date = start_date
while current_date <= end_date:
date_list.append(current_date.strftime("%Y%m%d"))
current_date += timedelta(days=1)
return date_list
def gen_ten_weeks_ago_data_list():
fmt = "%Y%m%d"
end = datetime.now() - timedelta(days=1) # 昨天
start = end - timedelta(days=69) # 70 天跨度 = 10 个完整周
week_buckets = {}
for d in range((end - start).days + 1):
day = start + timedelta(days=d)
monday = day - timedelta(days=day.weekday()) # 本周一
week_buckets.setdefault(monday, []).append(day.strftime(fmt))
# 大 list从远到近最早一周在最前面
result = [week_buckets[k] for k in sorted(week_buckets, reverse=False)]
return result
def count_lsr(turn_str):
"""
统计车道转向中左转(l)、直行(s)、右转(r)的数量。
Args:
turn_str: 形如 "2|1|1|1|5" 的字符串,每个数字对应 g_turn2str 中的键。
Returns:
形如 "1/3/1" 的字符串,顺序为左转、直行、右转的数量。
"""
# 初始化计数器
left_count = 0
straight_count = 0
right_count = 0
# 分割输入字符串
turn_keys = turn_str.split('|')
# 遍历每个车道
for key in turn_keys:
if key in g_turn2str:
turn_value = g_turn2str[key]
# 忽略掉头和特殊车道
if turn_value in ['t', 'bus', 'reversible', '-']:
continue
# 统计每个转向
if 'l' in turn_value:
left_count += 1
if 's' in turn_value:
straight_count += 1
if 'r' in turn_value:
right_count += 1
# 返回格式化的结果
return f"{left_count}/{straight_count}/{right_count}"
def time_overlap(a: str, b: List[str]) -> bool:
def to_minutes(t: str) -> int:
h, m = map(int, t.split(':'))
return h * 60 + m
a_start, a_end = map(to_minutes, a.split('-'))
for interval in b:
b_start, b_end = map(to_minutes, interval.split('-'))
# 检查是否有重叠
if not (a_end <= b_start or a_start >= b_end):
return True
return False
def parse_week_str(s: str):
start, end = [datetime.strptime(d, "%Y%m%d") for d in s.split('-')]
date_list = [(start + timedelta(days=i)).strftime("%Y%m%d")
for i in range((end - start).days + 1)]
return date_list
def sort_dict_by_clockwise(input_dict: Dict[str, any]) -> dict:
"""
将字典按照顺时针方向(从北开始)排序键的顺序。
参数:
input_dict: 键为方向字母的字典。
返回:
按照顺时针顺序排列的有序字典。
"""
clockwise_order = ['N', 'NE', 'E', 'SE', 'S', 'SW', 'W', 'NW']
return {key: input_dict[key] for key in clockwise_order if key in input_dict}