446 lines
16 KiB
Python
446 lines
16 KiB
Python
# -*- coding: utf-8 -*-
|
||
# @Author: Owl
|
||
# @Date: 2025/10/10 11:22
|
||
# @Description: 初始化加载逻辑及公共属性赋值位置
|
||
import argparse
|
||
import configparser
|
||
import logging
|
||
import os
|
||
import sys
|
||
from datetime import datetime
|
||
|
||
import yaml
|
||
|
||
from app.comm import comparable_rank
|
||
from app.global_source import *
|
||
|
||
|
||
def load_roadnet_by_city(citycode: int):
|
||
logging.info('start load roadnet for %d' % citycode)
|
||
net_filename = 'data/xl_roadnet_%d.txt' % citycode
|
||
roadinfo_filename = 'data/xl_roadinfo_%d.txt' % citycode
|
||
# roadlinks_filename = 'data/xl_roadlinks_%d.txt' % citycode
|
||
g_roadnet.load_from_file(net_filename, citycode)
|
||
g_roadinfo_manager.load_from_file(roadinfo_filename)
|
||
# g_roadlinks_manager.load_from_file(roadlinks_filename)
|
||
logging.info('finish load roadnet for %d' % citycode)
|
||
|
||
|
||
def reload_base_info(citycode: int):
|
||
global last_reload_time_info
|
||
crosses_list, roads_list = db_tmnet.reload_cross_info(citycode, last_reload_time_info[citycode])
|
||
for cross_info in crosses_list:
|
||
crossid = cross_info['crossid']
|
||
if crossid:
|
||
g_roadnet.update_cross_info(crossid, cross_info)
|
||
if cross_info['isdeleted'] == 1:
|
||
del g_roadnet.cross_map[crossid]
|
||
for road_info in roads_list:
|
||
roadid = road_info['roadid']
|
||
if roadid:
|
||
g_roadnet.update_road_net(roadid, road_info)
|
||
g_roadinfo_manager.update_road_info(roadid, road_info)
|
||
last_reload_time_info[citycode] = datetime.now()
|
||
|
||
|
||
def pick_citycode(filename: str):
|
||
|
||
tail_part = filename.split('_')[-1]
|
||
citycode = int(tail_part.split('.')[0])
|
||
return citycode
|
||
|
||
|
||
def list_all_citycode(path: str):
|
||
"""
|
||
找出所有的citycode
|
||
"""
|
||
citycode_set = set()
|
||
files = os.listdir(path)
|
||
for file in files:
|
||
if not file.endswith(".txt"):
|
||
continue
|
||
if not file.startswith("xl_roadnet_"):
|
||
continue
|
||
citycode = pick_citycode(file)
|
||
citycode_set.add(citycode)
|
||
return citycode_set
|
||
|
||
|
||
def load_all_roadnet():
|
||
global g_citycode_set
|
||
need_filter_by_city = len(g_citycode_set) > 0
|
||
if need_filter_by_city:
|
||
# 按照citycode的列表加载各个城市的roadnet
|
||
for citycode in g_citycode_set:
|
||
last_reload_time_info[citycode] = datetime.strptime('1970-01-01 00:00:00', '%Y-%m-%d %H:%M:%S')
|
||
load_roadnet_by_city(citycode)
|
||
reload_base_info(citycode)
|
||
else:
|
||
# 加载data目录下的所有城市的roadnet
|
||
citycode_set = list_all_citycode('./data')
|
||
for citycode in citycode_set:
|
||
load_roadnet_by_city(citycode)
|
||
global g_roadnet, g_roadinfo_manager
|
||
g_roadnet.do_cross_classify(g_roadinfo_manager)
|
||
g_roadnet.collect_midroad_crosses()
|
||
# 设置辅路属性
|
||
set_all_side_inroadid()
|
||
|
||
|
||
def set_all_side_inroadid():
|
||
sup_roadid_set = set()
|
||
main_side_pair_list = g_roadnet.query_main_side_road_pair()
|
||
for pair in main_side_pair_list:
|
||
r1 = pair[0]
|
||
r2 = pair[1]
|
||
r1_info = g_roadinfo_manager.query_roadinfo(r1)
|
||
r1_name = g_roadnet.query_road(r1).name
|
||
r2_info = g_roadinfo_manager.query_roadinfo(r2)
|
||
r1_rc, r2_rc = additional_sup_road_comparable_rank(r1_info.roadclass, r2_info.roadclass)
|
||
r2_name = g_roadnet.query_road(r2).name
|
||
if r1_name.__contains__('辅路'):
|
||
sup_roadid_set.add(r1)
|
||
elif r2_name.__contains__('辅路'):
|
||
sup_roadid_set.add(r2)
|
||
else:
|
||
if r1_rc > r2_rc:
|
||
sup_roadid_set.add(r1)
|
||
elif r1_rc < r2_rc:
|
||
sup_roadid_set.add(r2)
|
||
elif r1_rc == r2_rc:
|
||
if r1_info.lane_num < r2_info.lane_num:
|
||
sup_roadid_set.add(r1)
|
||
elif r1_info.lane_num > r2_info.lane_num:
|
||
sup_roadid_set.add(r2)
|
||
else:
|
||
if r1_info.ffs < r2_info.ffs:
|
||
sup_roadid_set.add(r1)
|
||
elif r1_info.ffs > r2_info.ffs:
|
||
sup_roadid_set.add(r2)
|
||
else:
|
||
pass
|
||
# got sup_road_list
|
||
side_roadid_list = list(sup_roadid_set)
|
||
for rid in side_roadid_list:
|
||
g_roadnet.query_road(rid).sideway = True
|
||
return side_roadid_list
|
||
|
||
|
||
def additional_sup_road_comparable_rank(r1_roadclass, r2_roadclass):
|
||
suburb_rc = [1, 2, 3, 4, 5] # 城市外道路等级分区
|
||
city_rc = [6, 7, 8, 9, 10] # 城市内道路等级分区
|
||
# 如果既不在城市内分区,又不在城市外分区,说明该道路等级为未知,不做该判断
|
||
if (r1_roadclass not in suburb_rc and r1_roadclass not in city_rc) or (r2_roadclass not in suburb_rc and r2_roadclass not in city_rc):
|
||
return comparable_rank(r1_roadclass), comparable_rank(r2_roadclass)
|
||
# 如果待比较的两个道路等级为同区内的 则进行常规辅路判定逻辑
|
||
if not ((r1_roadclass in suburb_rc and r2_roadclass in suburb_rc) or (r1_roadclass in city_rc and r2_roadclass in city_rc)):
|
||
# 如果r1是城市外道路,r2为城市内道路
|
||
if r1_roadclass in suburb_rc and r2_roadclass in city_rc:
|
||
# 如果r1不属于县道及其他低等级道路 则认为r1的道路等级更高
|
||
if r1_roadclass not in [4, 5]:
|
||
return 1, 2
|
||
# 县道大于城市支路或郊区小路 其他低等级道路 大于郊区小路
|
||
elif (r1_roadclass == 4 and r2_roadclass in [9, 10]) or (r1_roadclass == 5 and r2_roadclass == 10):
|
||
return 1, 2
|
||
else:
|
||
return 2, 1
|
||
else:
|
||
if r1_roadclass in city_rc and r2_roadclass in suburb_rc:
|
||
if r2_roadclass not in [4, 5]:
|
||
return 2, 1
|
||
elif (r2_roadclass == 4 and r1_roadclass in [9, 10]) or (r2_roadclass == 5 and r1_roadclass == 10):
|
||
return 2, 1
|
||
else:
|
||
return 1, 2
|
||
return comparable_rank(r1_roadclass), comparable_rank(r2_roadclass)
|
||
|
||
|
||
def init_with_config():
|
||
config = configparser.ConfigParser()
|
||
config.read('cross_doctor.ini', encoding='utf-8')
|
||
# 数据库相关配置
|
||
if not config.has_section("db"):
|
||
logging.error('missing db section in router.ini')
|
||
sys.exit(1)
|
||
dev = dev_args()
|
||
# 根据配置,修改密码和数据库名称
|
||
if config.has_option('db', 'host'):
|
||
host = config.get('db', 'host')
|
||
g_dbinfo['host'] = host
|
||
if config.has_option('db', 'user'):
|
||
user = config.get('db', 'user')
|
||
g_dbinfo['user'] = user
|
||
if config.has_option('db', 'password'):
|
||
password = config.get('db', 'password')
|
||
g_dbinfo['password'] = password
|
||
if config.has_option('db', 'dbname'):
|
||
dbname = config.get('db', 'dbname')
|
||
g_dbinfo['db'] = dbname
|
||
|
||
if config.has_option('roadnet_db', 'host'):
|
||
host = config.get('roadnet_db', 'host')
|
||
g_roadnet_db['host'] = host
|
||
if config.has_option('roadnet_db', 'user'):
|
||
user = config.get('roadnet_db', 'user')
|
||
g_roadnet_db['user'] = user
|
||
if config.has_option('roadnet_db', 'password'):
|
||
password = config.get('roadnet_db', 'password')
|
||
g_roadnet_db['password'] = password
|
||
if config.has_option('roadnet_db', 'dbname'):
|
||
dbname = config.get('roadnet_db', 'dbname')
|
||
g_roadnet_db['db'] = dbname
|
||
|
||
if config.has_option('cloud_db', 'host'):
|
||
host = config.get('cloud_db', 'host')
|
||
g_cloud_db['host'] = host
|
||
if config.has_option('cloud_db', 'user'):
|
||
user = config.get('cloud_db', 'user')
|
||
g_cloud_db['user'] = user
|
||
if config.has_option('cloud_db', 'password'):
|
||
password = config.get('cloud_db', 'password')
|
||
g_cloud_db['password'] = password
|
||
if config.has_option('cloud_db', 'dbname'):
|
||
dbname = config.get('cloud_db', 'dbname')
|
||
g_cloud_db['db'] = dbname
|
||
if config.has_option('user_dbinfo', 'port'):
|
||
port = config.get('user_dbinfo', 'port')
|
||
g_user_db['port'] = port
|
||
if config.has_option('user_dbinfo', 'dbname'):
|
||
dbname = config.get('user_dbinfo', 'dbname')
|
||
g_user_db['db'] = dbname
|
||
if config.has_option('user_dbinfo', 'user'):
|
||
user = config.get('user_dbinfo', 'user')
|
||
g_user_db['user'] = user
|
||
if config.has_option('user_dbinfo', 'password'):
|
||
password = config.get('user_dbinfo', 'password')
|
||
g_user_db['password'] = password
|
||
if config.has_option('cross_delay_db', 'port'):
|
||
port = int(config.get('cross_delay_db', 'port'))
|
||
g_cross_delay_db['port'] = port
|
||
if config.has_option('cross_delay_db', 'dbname'):
|
||
dbname = config.get('cross_delay_db', 'dbname')
|
||
g_cross_delay_db['db'] = dbname
|
||
if config.has_option('cross_delay_db', 'user'):
|
||
user = config.get('cross_delay_db', 'user')
|
||
g_cross_delay_db['user'] = user
|
||
if config.has_option('cross_delay_db', 'password'):
|
||
password = config.get('cross_delay_db', 'password')
|
||
g_cross_delay_db['password'] = password
|
||
# redis
|
||
if config.has_section('redis'):
|
||
if config.has_option('redis', 'ip'):
|
||
g_redisinfo['ip'] = config.get('redis', 'ip')
|
||
if config.has_option('redis', 'port'):
|
||
g_redisinfo['port'] = int(config.get('redis', 'port'))
|
||
if config.has_option('redis', 'password'):
|
||
g_redisinfo['password'] = config.get('redis', 'password')
|
||
if config.has_option('redis', 'db'):
|
||
g_redisinfo['db'] = config.get('redis', 'db')
|
||
# rpc
|
||
if config.has_section('rpc'):
|
||
g_config['rpc_host'] = '172.21.32.21'
|
||
g_config['rpc_port'] = '50051'
|
||
if config.has_option('rpc', 'host'):
|
||
g_config['rpc_host'] = config.get('rpc', 'host')
|
||
if config.has_option('rpc', 'port'):
|
||
g_config['rpc_port'] = int(config.get('rpc', 'port'))
|
||
# 重新初始化 延误数据库连接
|
||
if dev == 0:
|
||
if config.has_option('db', 'host_inner'):
|
||
g_dbinfo['host'] = config.get('db', 'host_inner')
|
||
if config.has_option('roadnet_db', 'host_inner'):
|
||
g_roadnet_db['host'] = config.get('roadnet_db', 'host_inner')
|
||
if config.has_option('cloud_db', 'host_inner'):
|
||
g_cloud_db['host'] = config.get('cloud_db', 'host_inner')
|
||
if config.has_option('cloud_db', 'port_inner'):
|
||
g_cloud_db['port'] = int(config.get('cloud_db', 'port_inner'))
|
||
if config.has_option('redis', 'ip_inner'):
|
||
g_redisinfo['ip'] = config.get('redis', 'ip_inner')
|
||
if config.has_option('rpc', 'host_inner'):
|
||
g_config['rpc_host'] = config.get('rpc', 'host_inner')
|
||
if config.has_option('user_dbinfo', 'host_inner'):
|
||
g_user_db['host'] = config.get('user_dbinfo', 'host_inner')
|
||
if config.has_option('cross_delay_db', 'host_inner'):
|
||
g_cross_delay_db['host'] = config.get('cross_delay_db', 'host_inner')
|
||
print(g_dbinfo)
|
||
print(g_roadnet_db)
|
||
print(g_cloud_db)
|
||
print(g_user_db)
|
||
print(g_cross_delay_db)
|
||
g_db_pool.init_pool(g_dbinfo)
|
||
g_roadnet_pool.init_pool(g_roadnet_db)
|
||
g_cloud_pool.init_pool(g_cloud_db)
|
||
g_user_pool.init_pool(g_user_db)
|
||
g_cross_delay_pool.init_pool(g_cross_delay_db)
|
||
|
||
# 本机服务配置
|
||
if config.has_section('server'):
|
||
if config.has_option('server', 'host'):
|
||
g_config['host'] = config.get('server', 'host')
|
||
else:
|
||
g_config['host'] = '0.0.0.0'
|
||
if config.has_option('server', 'port'):
|
||
g_config['port'] = int(config.get('server', 'port'))
|
||
else:
|
||
g_config['port'] = 6666
|
||
|
||
logging.info(g_config)
|
||
# 路网相关配置
|
||
if not config.has_section("roadnet"):
|
||
logging.error('missing roadnet section in router.ini')
|
||
sys.exit(1)
|
||
if config.has_option('roadnet', 'citylist'):
|
||
citylist_str = config.get('roadnet', 'citylist')
|
||
# cityinfo_dict = ast.literal_eval(citylist_str)
|
||
# g_config['cityinfo_dict'] = cityinfo_dict
|
||
# print(cityinfo_dict)
|
||
ss = citylist_str.split(',')
|
||
for item in ss:
|
||
citycode = int(item)
|
||
g_citycode_set.add(citycode)
|
||
# 加载所有城市的路网
|
||
load_all_roadnet()
|
||
cityinfo_dict = read_city_info('city_info.yaml')
|
||
print(cityinfo_dict)
|
||
g_config['cityinfo_dict'] = cityinfo_dict
|
||
|
||
# 20250422 新增业务主管独有功能相关内容
|
||
if config.has_section('executive'):
|
||
if config.has_option('executive', 'userid'):
|
||
g_config['executives'] = config.get('executive', 'userid').split(',')
|
||
# 20250514 探索分词索引查询路口名称
|
||
# full_crossname_inverted_index()
|
||
# gen_cross_info_index(cityinfo_dict)
|
||
# print('finished load config file, server started')
|
||
|
||
|
||
def dev_args():
|
||
try:
|
||
parser = argparse.ArgumentParser()
|
||
parser.add_argument('--dev', default=0, help='开发环境参数:0否,1是')
|
||
args = parser.parse_args()
|
||
return args.dev
|
||
except SystemExit:
|
||
pass
|
||
return 0
|
||
|
||
|
||
def read_city_info(file_path):
|
||
with open(file_path, 'r', encoding='utf-8') as f:
|
||
config = yaml.safe_load(f)
|
||
return config['city_list']
|
||
|
||
|
||
def check_param(params, key):
|
||
value = None
|
||
if key in params:
|
||
value = params[key]
|
||
return value
|
||
|
||
|
||
def convert_time(time_int):
|
||
time_int = int(time_int)
|
||
hours = time_int // 100
|
||
minutes = time_int % 100
|
||
time_str = f"{hours:02d}:{minutes:02d}"
|
||
|
||
return time_str
|
||
|
||
|
||
def generate_date_range(start_date_str, end_date_str):
|
||
"""生成两个日期之间的所有日期(包括开始和结束日期)"""
|
||
start_date = datetime.strptime(start_date_str, "%Y%m%d")
|
||
end_date = datetime.strptime(end_date_str, "%Y%m%d")
|
||
date_list = []
|
||
current_date = start_date
|
||
while current_date <= end_date:
|
||
date_list.append(current_date.strftime("%Y%m%d"))
|
||
current_date += timedelta(days=1)
|
||
return date_list
|
||
|
||
|
||
def gen_ten_weeks_ago_data_list():
|
||
fmt = "%Y%m%d"
|
||
end = datetime.now() - timedelta(days=1) # 昨天
|
||
start = end - timedelta(days=69) # 70 天跨度 = 10 个完整周
|
||
|
||
week_buckets = {}
|
||
for d in range((end - start).days + 1):
|
||
day = start + timedelta(days=d)
|
||
monday = day - timedelta(days=day.weekday()) # 本周一
|
||
week_buckets.setdefault(monday, []).append(day.strftime(fmt))
|
||
|
||
# 大 list:从远到近(最早一周在最前面)
|
||
result = [week_buckets[k] for k in sorted(week_buckets, reverse=False)]
|
||
return result
|
||
|
||
|
||
def count_lsr(turn_str: str):
|
||
"""
|
||
按车道顺序统计转向组合种类及数量,跳过'-',其余均保留
|
||
返回: (中文组合列表, 数量列表) 两个字符串
|
||
"""
|
||
if not turn_str.strip():
|
||
return '-', '0'
|
||
|
||
# 代码 -> 中文
|
||
g_turn2cn = {
|
||
'1': '直行', '2': '左转', '3': '右转', '4': '直左', '5': '直右',
|
||
'6': '调头', '7': '直调', '8': '左调', '9': '直左',
|
||
'10': '左直调', '11': '-', '12': '直左右', '13': '右转调头',
|
||
'14': '左右调', '15': '直右调', '16': '左直右调', '17': '公交车道',
|
||
'19': '可变车道'
|
||
}
|
||
|
||
seen = {} # 顺序去重计数
|
||
for key in turn_str.split('|'):
|
||
if key == '11': # 只跳过 '-'
|
||
continue
|
||
if key in g_turn2cn:
|
||
name = g_turn2cn[key]
|
||
seen.setdefault(name, 0)
|
||
seen[name] += 1
|
||
|
||
if not seen:
|
||
return '-', '0'
|
||
|
||
name_lst = list(seen.keys())
|
||
count_lst = list(seen.values())
|
||
return '/'.join(name_lst), '/'.join(map(str, count_lst))
|
||
|
||
|
||
def time_overlap(a: str, b: List[str]) -> bool:
|
||
def to_minutes(t: str) -> int:
|
||
h, m = map(int, t.split(':'))
|
||
return h * 60 + m
|
||
|
||
a_start, a_end = map(to_minutes, a.split('-'))
|
||
|
||
for interval in b:
|
||
b_start, b_end = map(to_minutes, interval.split('-'))
|
||
# 检查是否有重叠
|
||
if not (a_end <= b_start or a_start >= b_end):
|
||
return True
|
||
return False
|
||
|
||
|
||
def parse_week_str(s: str):
|
||
start, end = [datetime.strptime(d, "%Y%m%d") for d in s.split('-')]
|
||
date_list = [(start + timedelta(days=i)).strftime("%Y%m%d")
|
||
for i in range((end - start).days + 1)]
|
||
return date_list
|
||
|
||
|
||
def sort_dict_by_clockwise(input_dict: Dict[str, any]) -> dict:
|
||
"""
|
||
将字典按照顺时针方向(从北开始)排序键的顺序。
|
||
|
||
参数:
|
||
input_dict: 键为方向字母的字典。
|
||
|
||
返回:
|
||
按照顺时针顺序排列的有序字典。
|
||
"""
|
||
clockwise_order = ['N', 'NE', 'E', 'SE', 'S', 'SW', 'W', 'NW']
|
||
return {key: input_dict[key] for key in clockwise_order if key in input_dict}
|