cross_doctor/app/common_worker.py

446 lines
16 KiB
Python
Raw Normal View History

2025-10-10 14:38:22 +08:00
# -*- coding: utf-8 -*-
# @Author: Owl
# @Date: 2025/10/10 11:22
# @Description: 初始化加载逻辑及公共属性赋值位置
import argparse
import configparser
import logging
import os
import sys
from datetime import datetime
import yaml
from app.comm import comparable_rank
from app.global_source import *
def load_roadnet_by_city(citycode: int):
logging.info('start load roadnet for %d' % citycode)
net_filename = 'data/xl_roadnet_%d.txt' % citycode
roadinfo_filename = 'data/xl_roadinfo_%d.txt' % citycode
# roadlinks_filename = 'data/xl_roadlinks_%d.txt' % citycode
g_roadnet.load_from_file(net_filename, citycode)
g_roadinfo_manager.load_from_file(roadinfo_filename)
# g_roadlinks_manager.load_from_file(roadlinks_filename)
logging.info('finish load roadnet for %d' % citycode)
def reload_base_info(citycode: int):
global last_reload_time_info
crosses_list, roads_list = db_tmnet.reload_cross_info(citycode, last_reload_time_info[citycode])
for cross_info in crosses_list:
crossid = cross_info['crossid']
if crossid:
g_roadnet.update_cross_info(crossid, cross_info)
if cross_info['isdeleted'] == 1:
del g_roadnet.cross_map[crossid]
for road_info in roads_list:
roadid = road_info['roadid']
if roadid:
g_roadnet.update_road_net(roadid, road_info)
g_roadinfo_manager.update_road_info(roadid, road_info)
last_reload_time_info[citycode] = datetime.now()
def pick_citycode(filename: str):
tail_part = filename.split('_')[-1]
citycode = int(tail_part.split('.')[0])
return citycode
def list_all_citycode(path: str):
"""
找出所有的citycode
"""
citycode_set = set()
files = os.listdir(path)
for file in files:
if not file.endswith(".txt"):
continue
if not file.startswith("xl_roadnet_"):
continue
citycode = pick_citycode(file)
citycode_set.add(citycode)
return citycode_set
def load_all_roadnet():
global g_citycode_set
need_filter_by_city = len(g_citycode_set) > 0
if need_filter_by_city:
# 按照citycode的列表加载各个城市的roadnet
for citycode in g_citycode_set:
last_reload_time_info[citycode] = datetime.strptime('1970-01-01 00:00:00', '%Y-%m-%d %H:%M:%S')
load_roadnet_by_city(citycode)
reload_base_info(citycode)
else:
# 加载data目录下的所有城市的roadnet
citycode_set = list_all_citycode('./data')
for citycode in citycode_set:
load_roadnet_by_city(citycode)
global g_roadnet, g_roadinfo_manager
g_roadnet.do_cross_classify(g_roadinfo_manager)
g_roadnet.collect_midroad_crosses()
# 设置辅路属性
set_all_side_inroadid()
def set_all_side_inroadid():
sup_roadid_set = set()
main_side_pair_list = g_roadnet.query_main_side_road_pair()
for pair in main_side_pair_list:
r1 = pair[0]
r2 = pair[1]
r1_info = g_roadinfo_manager.query_roadinfo(r1)
r1_name = g_roadnet.query_road(r1).name
r2_info = g_roadinfo_manager.query_roadinfo(r2)
r1_rc, r2_rc = additional_sup_road_comparable_rank(r1_info.roadclass, r2_info.roadclass)
r2_name = g_roadnet.query_road(r2).name
if r1_name.__contains__('辅路'):
sup_roadid_set.add(r1)
elif r2_name.__contains__('辅路'):
sup_roadid_set.add(r2)
else:
if r1_rc > r2_rc:
sup_roadid_set.add(r1)
elif r1_rc < r2_rc:
sup_roadid_set.add(r2)
elif r1_rc == r2_rc:
if r1_info.lane_num < r2_info.lane_num:
sup_roadid_set.add(r1)
elif r1_info.lane_num > r2_info.lane_num:
sup_roadid_set.add(r2)
else:
if r1_info.ffs < r2_info.ffs:
sup_roadid_set.add(r1)
elif r1_info.ffs > r2_info.ffs:
sup_roadid_set.add(r2)
else:
pass
# got sup_road_list
side_roadid_list = list(sup_roadid_set)
for rid in side_roadid_list:
g_roadnet.query_road(rid).sideway = True
return side_roadid_list
def additional_sup_road_comparable_rank(r1_roadclass, r2_roadclass):
suburb_rc = [1, 2, 3, 4, 5] # 城市外道路等级分区
city_rc = [6, 7, 8, 9, 10] # 城市内道路等级分区
# 如果既不在城市内分区,又不在城市外分区,说明该道路等级为未知,不做该判断
if (r1_roadclass not in suburb_rc and r1_roadclass not in city_rc) or (r2_roadclass not in suburb_rc and r2_roadclass not in city_rc):
return comparable_rank(r1_roadclass), comparable_rank(r2_roadclass)
# 如果待比较的两个道路等级为同区内的 则进行常规辅路判定逻辑
if not ((r1_roadclass in suburb_rc and r2_roadclass in suburb_rc) or (r1_roadclass in city_rc and r2_roadclass in city_rc)):
# 如果r1是城市外道路r2为城市内道路
if r1_roadclass in suburb_rc and r2_roadclass in city_rc:
# 如果r1不属于县道及其他低等级道路 则认为r1的道路等级更高
if r1_roadclass not in [4, 5]:
return 1, 2
# 县道大于城市支路或郊区小路 其他低等级道路 大于郊区小路
elif (r1_roadclass == 4 and r2_roadclass in [9, 10]) or (r1_roadclass == 5 and r2_roadclass == 10):
return 1, 2
else:
return 2, 1
else:
if r1_roadclass in city_rc and r2_roadclass in suburb_rc:
if r2_roadclass not in [4, 5]:
return 2, 1
elif (r2_roadclass == 4 and r1_roadclass in [9, 10]) or (r2_roadclass == 5 and r1_roadclass == 10):
return 2, 1
else:
return 1, 2
return comparable_rank(r1_roadclass), comparable_rank(r2_roadclass)
def init_with_config():
config = configparser.ConfigParser()
config.read('cross_doctor.ini', encoding='utf-8')
2025-10-10 14:38:22 +08:00
# 数据库相关配置
if not config.has_section("db"):
logging.error('missing db section in router.ini')
sys.exit(1)
dev = dev_args()
# 根据配置,修改密码和数据库名称
if config.has_option('db', 'host'):
host = config.get('db', 'host')
g_dbinfo['host'] = host
if config.has_option('db', 'user'):
user = config.get('db', 'user')
g_dbinfo['user'] = user
if config.has_option('db', 'password'):
password = config.get('db', 'password')
g_dbinfo['password'] = password
if config.has_option('db', 'dbname'):
dbname = config.get('db', 'dbname')
g_dbinfo['db'] = dbname
if config.has_option('roadnet_db', 'host'):
host = config.get('roadnet_db', 'host')
g_roadnet_db['host'] = host
if config.has_option('roadnet_db', 'user'):
user = config.get('roadnet_db', 'user')
g_roadnet_db['user'] = user
if config.has_option('roadnet_db', 'password'):
password = config.get('roadnet_db', 'password')
g_roadnet_db['password'] = password
if config.has_option('roadnet_db', 'dbname'):
dbname = config.get('roadnet_db', 'dbname')
g_roadnet_db['db'] = dbname
if config.has_option('cloud_db', 'host'):
host = config.get('cloud_db', 'host')
g_cloud_db['host'] = host
if config.has_option('cloud_db', 'user'):
user = config.get('cloud_db', 'user')
g_cloud_db['user'] = user
if config.has_option('cloud_db', 'password'):
password = config.get('cloud_db', 'password')
g_cloud_db['password'] = password
if config.has_option('cloud_db', 'dbname'):
dbname = config.get('cloud_db', 'dbname')
g_cloud_db['db'] = dbname
if config.has_option('user_dbinfo', 'port'):
port = config.get('user_dbinfo', 'port')
g_user_db['port'] = port
if config.has_option('user_dbinfo', 'dbname'):
dbname = config.get('user_dbinfo', 'dbname')
g_user_db['db'] = dbname
if config.has_option('user_dbinfo', 'user'):
user = config.get('user_dbinfo', 'user')
g_user_db['user'] = user
if config.has_option('user_dbinfo', 'password'):
password = config.get('user_dbinfo', 'password')
g_user_db['password'] = password
if config.has_option('cross_delay_db', 'port'):
port = int(config.get('cross_delay_db', 'port'))
g_cross_delay_db['port'] = port
if config.has_option('cross_delay_db', 'dbname'):
dbname = config.get('cross_delay_db', 'dbname')
g_cross_delay_db['db'] = dbname
if config.has_option('cross_delay_db', 'user'):
user = config.get('cross_delay_db', 'user')
g_cross_delay_db['user'] = user
if config.has_option('cross_delay_db', 'password'):
password = config.get('cross_delay_db', 'password')
g_cross_delay_db['password'] = password
2025-10-10 14:38:22 +08:00
# redis
if config.has_section('redis'):
if config.has_option('redis', 'ip'):
g_redisinfo['ip'] = config.get('redis', 'ip')
if config.has_option('redis', 'port'):
g_redisinfo['port'] = int(config.get('redis', 'port'))
if config.has_option('redis', 'password'):
g_redisinfo['password'] = config.get('redis', 'password')
if config.has_option('redis', 'db'):
g_redisinfo['db'] = config.get('redis', 'db')
# rpc
if config.has_section('rpc'):
g_config['rpc_host'] = '172.21.32.21'
g_config['rpc_port'] = '50051'
if config.has_option('rpc', 'host'):
g_config['rpc_host'] = config.get('rpc', 'host')
if config.has_option('rpc', 'port'):
g_config['rpc_port'] = int(config.get('rpc', 'port'))
# 重新初始化 延误数据库连接
if dev == 0:
if config.has_option('db', 'host_inner'):
g_dbinfo['host'] = config.get('db', 'host_inner')
if config.has_option('roadnet_db', 'host_inner'):
g_roadnet_db['host'] = config.get('roadnet_db', 'host_inner')
if config.has_option('cloud_db', 'host_inner'):
g_cloud_db['host'] = config.get('cloud_db', 'host_inner')
if config.has_option('cloud_db', 'port_inner'):
g_cloud_db['port'] = int(config.get('cloud_db', 'port_inner'))
if config.has_option('redis', 'ip_inner'):
g_redisinfo['ip'] = config.get('redis', 'ip_inner')
if config.has_option('rpc', 'host_inner'):
g_config['rpc_host'] = config.get('rpc', 'host_inner')
2025-11-13 15:27:59 +08:00
if config.has_option('user_dbinfo', 'host_inner'):
2025-11-13 14:50:45 +08:00
g_user_db['host'] = config.get('user_db', 'host_inner')
2025-11-13 15:27:59 +08:00
if config.has_option('cross_delay_db', 'host_inner'):
g_cross_delay_db['host'] = config.get('cross_delay_db', 'host_inner')
2025-10-10 14:38:22 +08:00
print(g_dbinfo)
print(g_roadnet_db)
print(g_cloud_db)
print(g_user_db)
print(g_cross_delay_db)
2025-10-10 14:38:22 +08:00
g_db_pool.init_pool(g_dbinfo)
g_roadnet_pool.init_pool(g_roadnet_db)
g_cloud_pool.init_pool(g_cloud_db)
g_user_pool.init_pool(g_user_db)
g_cross_delay_pool.init_pool(g_cross_delay_db)
2025-10-10 14:38:22 +08:00
# 本机服务配置
if config.has_section('server'):
if config.has_option('server', 'host'):
g_config['host'] = config.get('server', 'host')
else:
g_config['host'] = '0.0.0.0'
if config.has_option('server', 'port'):
g_config['port'] = int(config.get('server', 'port'))
else:
g_config['port'] = 6666
logging.info(g_config)
# 路网相关配置
if not config.has_section("roadnet"):
logging.error('missing roadnet section in router.ini')
sys.exit(1)
if config.has_option('roadnet', 'citylist'):
citylist_str = config.get('roadnet', 'citylist')
# cityinfo_dict = ast.literal_eval(citylist_str)
# g_config['cityinfo_dict'] = cityinfo_dict
# print(cityinfo_dict)
ss = citylist_str.split(',')
for item in ss:
citycode = int(item)
g_citycode_set.add(citycode)
# 加载所有城市的路网
load_all_roadnet()
cityinfo_dict = read_city_info('city_info.yaml')
print(cityinfo_dict)
g_config['cityinfo_dict'] = cityinfo_dict
# 20250422 新增业务主管独有功能相关内容
if config.has_section('executive'):
if config.has_option('executive', 'userid'):
g_config['executives'] = config.get('executive', 'userid').split(',')
# 20250514 探索分词索引查询路口名称
# full_crossname_inverted_index()
# gen_cross_info_index(cityinfo_dict)
# print('finished load config file, server started')
def dev_args():
try:
parser = argparse.ArgumentParser()
parser.add_argument('--dev', default=0, help='开发环境参数0否1是')
args = parser.parse_args()
return args.dev
except SystemExit:
pass
return 0
def read_city_info(file_path):
with open(file_path, 'r', encoding='utf-8') as f:
config = yaml.safe_load(f)
return config['city_list']
def check_param(params, key):
value = None
if key in params:
value = params[key]
return value
def convert_time(time_int):
time_int = int(time_int)
hours = time_int // 100
minutes = time_int % 100
time_str = f"{hours:02d}:{minutes:02d}"
return time_str
def generate_date_range(start_date_str, end_date_str):
"""生成两个日期之间的所有日期(包括开始和结束日期)"""
start_date = datetime.strptime(start_date_str, "%Y%m%d")
end_date = datetime.strptime(end_date_str, "%Y%m%d")
date_list = []
current_date = start_date
while current_date <= end_date:
date_list.append(current_date.strftime("%Y%m%d"))
current_date += timedelta(days=1)
return date_list
def gen_ten_weeks_ago_data_list():
fmt = "%Y%m%d"
end = datetime.now() - timedelta(days=1) # 昨天
start = end - timedelta(days=69) # 70 天跨度 = 10 个完整周
week_buckets = {}
for d in range((end - start).days + 1):
day = start + timedelta(days=d)
monday = day - timedelta(days=day.weekday()) # 本周一
week_buckets.setdefault(monday, []).append(day.strftime(fmt))
# 大 list从远到近最早一周在最前面
result = [week_buckets[k] for k in sorted(week_buckets, reverse=False)]
return result
2025-11-10 16:47:13 +08:00
def count_lsr(turn_str: str):
"""
2025-11-10 16:47:13 +08:00
按车道顺序统计转向组合种类及数量跳过'-'其余均保留
返回: (中文组合列表, 数量列表) 两个字符串
"""
if not turn_str.strip():
return '-', '0'
# 代码 -> 中文
g_turn2cn = {
'1': '直行', '2': '左转', '3': '右转', '4': '直左', '5': '直右',
'6': '调头', '7': '直调', '8': '左调', '9': '直左',
'10': '左直调', '11': '-', '12': '直左右', '13': '右转调头',
'14': '左右调', '15': '直右调', '16': '左直右调', '17': '公交车道',
'19': '可变车道'
}
seen = {} # 顺序去重计数
for key in turn_str.split('|'):
if key == '11': # 只跳过 '-'
continue
if key in g_turn2cn:
name = g_turn2cn[key]
seen.setdefault(name, 0)
seen[name] += 1
2025-11-10 16:47:13 +08:00
if not seen:
return '-', '0'
2025-11-10 16:47:13 +08:00
name_lst = list(seen.keys())
count_lst = list(seen.values())
return '/'.join(name_lst), '/'.join(map(str, count_lst))
def time_overlap(a: str, b: List[str]) -> bool:
def to_minutes(t: str) -> int:
h, m = map(int, t.split(':'))
return h * 60 + m
a_start, a_end = map(to_minutes, a.split('-'))
for interval in b:
b_start, b_end = map(to_minutes, interval.split('-'))
# 检查是否有重叠
if not (a_end <= b_start or a_start >= b_end):
return True
return False
def parse_week_str(s: str):
start, end = [datetime.strptime(d, "%Y%m%d") for d in s.split('-')]
date_list = [(start + timedelta(days=i)).strftime("%Y%m%d")
for i in range((end - start).days + 1)]
return date_list
def sort_dict_by_clockwise(input_dict: Dict[str, any]) -> dict:
"""
将字典按照顺时针方向从北开始排序键的顺序
参数:
input_dict: 键为方向字母的字典
返回:
按照顺时针顺序排列的有序字典
"""
clockwise_order = ['N', 'NE', 'E', 'SE', 'S', 'SW', 'W', 'NW']
return {key: input_dict[key] for key in clockwise_order if key in input_dict}