cross_doctor/app/common_worker.py

516 lines
19 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
# @Author: Owl
# @Date: 2025/10/10 11:22
# @Description: 初始化加载逻辑及公共属性赋值位置
import argparse
import configparser
import logging
import os
import sys
from datetime import datetime
import yaml
from app.comm import comparable_rank
from app.global_source import *
def load_roadnet_by_city(citycode: int):
logging.info('start load roadnet for %d' % citycode)
net_filename = './../common_roadnet/data/xl_roadnet_%d.txt' % citycode
roadinfo_filename = './../common_roadnet/data/xl_roadinfo_%d.txt' % citycode
roadlinks_filename = './../common_roadnet/data/xl_roadlinks_%d.txt' % citycode
g_roadnet.load_from_file(net_filename, citycode)
g_roadinfo_manager.load_from_file(roadinfo_filename)
g_roadlinks_manager.load_from_file(roadlinks_filename)
logging.info('finish load roadnet for %d' % citycode)
def reload_base_info(citycode: int):
global last_reload_time_info
crosses_list, roads_list = db_tmnet.reload_cross_info(citycode, last_reload_time_info[citycode])
for cross_info in crosses_list:
crossid = cross_info['crossid']
if crossid:
g_roadnet.update_cross_info(crossid, cross_info)
if cross_info['isdeleted'] == 1:
del g_roadnet.cross_map[crossid]
for road_info in roads_list:
roadid = road_info['roadid']
if roadid:
g_roadnet.update_road_net(roadid, road_info)
g_roadinfo_manager.update_road_info(roadid, road_info)
last_reload_time_info[citycode] = datetime.now()
def pick_citycode(filename: str):
tail_part = filename.split('_')[-1]
citycode = int(tail_part.split('.')[0])
return citycode
def list_all_citycode(path: str):
"""
找出所有的citycode
"""
citycode_set = set()
files = os.listdir(path)
for file in files:
if not file.endswith(".txt"):
continue
if not file.startswith("xl_roadnet_"):
continue
citycode = pick_citycode(file)
citycode_set.add(citycode)
return citycode_set
def load_all_roadnet():
global g_citycode_set
need_filter_by_city = len(g_citycode_set) > 0
if need_filter_by_city:
# 按照citycode的列表加载各个城市的roadnet
for citycode in g_citycode_set:
last_reload_time_info[citycode] = datetime.strptime('1970-01-01 00:00:00', '%Y-%m-%d %H:%M:%S')
load_roadnet_by_city(citycode)
reload_base_info(citycode)
else:
# 加载data目录下的所有城市的roadnet
citycode_set = list_all_citycode('../common_roadnet/data')
for citycode in citycode_set:
load_roadnet_by_city(citycode)
global g_roadnet, g_roadinfo_manager
g_roadnet.do_cross_classify(g_roadinfo_manager)
g_roadnet.collect_midroad_crosses()
# 设置辅路属性
set_all_side_inroadid()
def set_all_side_inroadid():
sup_roadid_set = set()
main_side_pair_list = g_roadnet.query_main_side_road_pair()
for pair in main_side_pair_list:
r1 = pair[0]
r2 = pair[1]
r1_info = g_roadinfo_manager.query_roadinfo(r1)
r1_name = g_roadnet.query_road(r1).name
r2_info = g_roadinfo_manager.query_roadinfo(r2)
r1_rc, r2_rc = additional_sup_road_comparable_rank(r1_info.roadclass, r2_info.roadclass)
r2_name = g_roadnet.query_road(r2).name
if r1_name.__contains__('辅路'):
sup_roadid_set.add(r1)
elif r2_name.__contains__('辅路'):
sup_roadid_set.add(r2)
else:
if r1_rc > r2_rc:
sup_roadid_set.add(r1)
elif r1_rc < r2_rc:
sup_roadid_set.add(r2)
elif r1_rc == r2_rc:
if r1_info.lane_num < r2_info.lane_num:
sup_roadid_set.add(r1)
elif r1_info.lane_num > r2_info.lane_num:
sup_roadid_set.add(r2)
else:
if r1_info.ffs < r2_info.ffs:
sup_roadid_set.add(r1)
elif r1_info.ffs > r2_info.ffs:
sup_roadid_set.add(r2)
else:
pass
# got sup_road_list
side_roadid_list = list(sup_roadid_set)
for rid in side_roadid_list:
g_roadnet.query_road(rid).sideway = True
return side_roadid_list
def additional_sup_road_comparable_rank(r1_roadclass, r2_roadclass):
suburb_rc = [1, 2, 3, 4, 5] # 城市外道路等级分区
city_rc = [6, 7, 8, 9, 10] # 城市内道路等级分区
# 如果既不在城市内分区,又不在城市外分区,说明该道路等级为未知,不做该判断
if (r1_roadclass not in suburb_rc and r1_roadclass not in city_rc) or (r2_roadclass not in suburb_rc and r2_roadclass not in city_rc):
return comparable_rank(r1_roadclass), comparable_rank(r2_roadclass)
# 如果待比较的两个道路等级为同区内的 则进行常规辅路判定逻辑
if not ((r1_roadclass in suburb_rc and r2_roadclass in suburb_rc) or (r1_roadclass in city_rc and r2_roadclass in city_rc)):
# 如果r1是城市外道路r2为城市内道路
if r1_roadclass in suburb_rc and r2_roadclass in city_rc:
# 如果r1不属于县道及其他低等级道路 则认为r1的道路等级更高
if r1_roadclass not in [4, 5]:
return 1, 2
# 县道大于城市支路或郊区小路 其他低等级道路 大于郊区小路
elif (r1_roadclass == 4 and r2_roadclass in [9, 10]) or (r1_roadclass == 5 and r2_roadclass == 10):
return 1, 2
else:
return 2, 1
else:
if r1_roadclass in city_rc and r2_roadclass in suburb_rc:
if r2_roadclass not in [4, 5]:
return 2, 1
elif (r2_roadclass == 4 and r1_roadclass in [9, 10]) or (r2_roadclass == 5 and r1_roadclass == 10):
return 2, 1
else:
return 1, 2
return comparable_rank(r1_roadclass), comparable_rank(r2_roadclass)
def init_with_config():
config = configparser.ConfigParser()
config.read('cross_doctor.ini', encoding='utf-8')
# 数据库相关配置
if not config.has_section("db"):
logging.error('missing db section in router.ini')
sys.exit(1)
dev = dev_args()
# 根据配置,修改密码和数据库名称
if config.has_option('db', 'host'):
host = config.get('db', 'host')
g_dbinfo['host'] = host
if config.has_option('db', 'user'):
user = config.get('db', 'user')
g_dbinfo['user'] = user
if config.has_option('db', 'password'):
password = config.get('db', 'password')
g_dbinfo['password'] = password
if config.has_option('db', 'dbname'):
dbname = config.get('db', 'dbname')
g_dbinfo['db'] = dbname
if config.has_option('roadnet_db', 'host'):
host = config.get('roadnet_db', 'host')
g_roadnet_db['host'] = host
if config.has_option('roadnet_db', 'user'):
user = config.get('roadnet_db', 'user')
g_roadnet_db['user'] = user
if config.has_option('roadnet_db', 'password'):
password = config.get('roadnet_db', 'password')
g_roadnet_db['password'] = password
if config.has_option('roadnet_db', 'dbname'):
dbname = config.get('roadnet_db', 'dbname')
g_roadnet_db['db'] = dbname
if config.has_option('cloud_db', 'host'):
host = config.get('cloud_db', 'host')
g_cloud_db['host'] = host
if config.has_option('cloud_db', 'user'):
user = config.get('cloud_db', 'user')
g_cloud_db['user'] = user
if config.has_option('cloud_db', 'password'):
password = config.get('cloud_db', 'password')
g_cloud_db['password'] = password
if config.has_option('cloud_db', 'dbname'):
dbname = config.get('cloud_db', 'dbname')
g_cloud_db['db'] = dbname
if config.has_option('user_dbinfo', 'port'):
port = config.get('user_dbinfo', 'port')
g_user_db['port'] = port
if config.has_option('user_dbinfo', 'dbname'):
dbname = config.get('user_dbinfo', 'dbname')
g_user_db['db'] = dbname
if config.has_option('user_dbinfo', 'user'):
user = config.get('user_dbinfo', 'user')
g_user_db['user'] = user
if config.has_option('user_dbinfo', 'password'):
password = config.get('user_dbinfo', 'password')
g_user_db['password'] = password
if config.has_option('cross_delay_db', 'port'):
port = int(config.get('cross_delay_db', 'port'))
g_cross_delay_db['port'] = port
if config.has_option('cross_delay_db', 'dbname'):
dbname = config.get('cross_delay_db', 'dbname')
g_cross_delay_db['db'] = dbname
if config.has_option('cross_delay_db', 'user'):
user = config.get('cross_delay_db', 'user')
g_cross_delay_db['user'] = user
if config.has_option('cross_delay_db', 'password'):
password = config.get('cross_delay_db', 'password')
g_cross_delay_db['password'] = password
# redis
if config.has_section('redis'):
if config.has_option('redis', 'ip'):
g_redisinfo['ip'] = config.get('redis', 'ip')
if config.has_option('redis', 'port'):
g_redisinfo['port'] = int(config.get('redis', 'port'))
if config.has_option('redis', 'password'):
g_redisinfo['password'] = config.get('redis', 'password')
if config.has_option('redis', 'db'):
g_redisinfo['db'] = config.get('redis', 'db')
# rpc
if config.has_section('rpc'):
g_config['rpc_host'] = '172.21.32.21'
g_config['rpc_port'] = '50051'
if config.has_option('rpc', 'host'):
g_config['rpc_host'] = config.get('rpc', 'host')
if config.has_option('rpc', 'port'):
g_config['rpc_port'] = int(config.get('rpc', 'port'))
# 重新初始化 延误数据库连接
if dev == 0:
if config.has_option('db', 'host_inner'):
g_dbinfo['host'] = config.get('db', 'host_inner')
if config.has_option('roadnet_db', 'host_inner'):
g_roadnet_db['host'] = config.get('roadnet_db', 'host_inner')
if config.has_option('cloud_db', 'host_inner'):
g_cloud_db['host'] = config.get('cloud_db', 'host_inner')
if config.has_option('cloud_db', 'port_inner'):
g_cloud_db['port'] = int(config.get('cloud_db', 'port_inner'))
if config.has_option('redis', 'ip_inner'):
g_redisinfo['ip'] = config.get('redis', 'ip_inner')
if config.has_option('rpc', 'host_inner'):
g_config['rpc_host'] = config.get('rpc', 'host_inner')
if config.has_option('user_dbinfo', 'host_inner'):
g_user_db['host'] = config.get('user_dbinfo', 'host_inner')
if config.has_option('cross_delay_db', 'host_inner'):
g_cross_delay_db['host'] = config.get('cross_delay_db', 'host_inner')
print(g_dbinfo)
print(g_roadnet_db)
print(g_cloud_db)
print(g_user_db)
print(g_cross_delay_db)
g_db_pool.init_pool(g_dbinfo)
g_roadnet_pool.init_pool(g_roadnet_db)
g_cloud_pool.init_pool(g_cloud_db)
g_user_pool.init_pool(g_user_db)
g_cross_delay_pool.init_pool(g_cross_delay_db)
# 本机服务配置
if config.has_section('server'):
if config.has_option('server', 'host'):
g_config['host'] = config.get('server', 'host')
else:
g_config['host'] = '0.0.0.0'
if config.has_option('server', 'port'):
g_config['port'] = int(config.get('server', 'port'))
else:
g_config['port'] = 6666
logging.info(g_config)
# 路网相关配置
if not config.has_section("roadnet"):
logging.error('missing roadnet section in router.ini')
sys.exit(1)
if config.has_option('roadnet', 'citylist'):
citylist_str = config.get('roadnet', 'citylist')
# cityinfo_dict = ast.literal_eval(citylist_str)
# g_config['cityinfo_dict'] = cityinfo_dict
# print(cityinfo_dict)
ss = citylist_str.split(',')
for item in ss:
citycode = int(item)
g_citycode_set.add(citycode)
# 加载所有城市的路网
load_all_roadnet()
cityinfo_dict = read_city_info('city_info.yaml')
print(cityinfo_dict)
g_config['cityinfo_dict'] = cityinfo_dict
# 20250422 新增业务主管独有功能相关内容
if config.has_section('executive'):
if config.has_option('executive', 'userid'):
g_config['executives'] = config.get('executive', 'userid').split(',')
# 20250514 探索分词索引查询路口名称
# full_crossname_inverted_index()
# gen_cross_info_index(cityinfo_dict)
# print('finished load config file, server started')
def dev_args():
try:
parser = argparse.ArgumentParser()
parser.add_argument('--dev', default=0, help='开发环境参数0否1是')
args = parser.parse_args()
return args.dev
except SystemExit:
pass
return 0
def read_city_info(file_path):
with open(file_path, 'r', encoding='utf-8') as f:
config = yaml.safe_load(f)
return config['city_list']
def check_param(params, key):
value = None
if key in params:
value = params[key]
return value
def convert_time(time_int):
time_int = int(time_int)
hours = time_int // 100
minutes = time_int % 100
time_str = f"{hours:02d}:{minutes:02d}"
return time_str
def generate_date_range(start_date_str, end_date_str):
"""生成两个日期之间的所有日期(包括开始和结束日期)"""
start_date = datetime.strptime(start_date_str, "%Y%m%d")
end_date = datetime.strptime(end_date_str, "%Y%m%d")
date_list = []
current_date = start_date
while current_date <= end_date:
date_list.append(current_date.strftime("%Y%m%d"))
current_date += timedelta(days=1)
return date_list
def gen_ten_weeks_ago_data_list():
fmt = "%Y%m%d"
end = datetime.now() - timedelta(days=1) # 昨天
start = end - timedelta(days=69) # 70 天跨度 = 10 个完整周
week_buckets = {}
for d in range((end - start).days + 1):
day = start + timedelta(days=d)
monday = day - timedelta(days=day.weekday()) # 本周一
week_buckets.setdefault(monday, []).append(day.strftime(fmt))
# 大 list从远到近最早一周在最前面
result = [week_buckets[k] for k in sorted(week_buckets, reverse=False)]
return result
def count_lsr(turn_str: str):
"""
按车道顺序统计转向组合种类及数量,跳过'-',其余均保留
返回: (中文组合列表, 数量列表) 两个字符串
"""
if not turn_str.strip():
return '-', '0'
# 代码 -> 中文
g_turn2cn = {
'1': '直行', '2': '左转', '3': '右转', '4': '直左', '5': '直右',
'6': '调头', '7': '直调', '8': '左调', '9': '直左',
'10': '左直调', '11': '-', '12': '直左右', '13': '右转调头',
'14': '左右调', '15': '直右调', '16': '左直右调', '17': '公交车道',
'19': '可变车道'
}
seen = {} # 顺序去重计数
for key in turn_str.split('|'):
if key == '11': # 只跳过 '-'
continue
if key in g_turn2cn:
name = g_turn2cn[key]
seen.setdefault(name, 0)
seen[name] += 1
if not seen:
return '-', '0'
name_lst = list(seen.keys())
count_lst = list(seen.values())
return '/'.join(name_lst), '/'.join(map(str, count_lst))
def time_overlap(a: str, b: List[str]) -> bool:
def to_minutes(t: str) -> int:
h, m = map(int, t.split(':'))
return h * 60 + m
a_start, a_end = map(to_minutes, a.split('-'))
for interval in b:
b_start, b_end = map(to_minutes, interval.split('-'))
# 检查是否有重叠
if not (a_end <= b_start or a_start >= b_end):
return True
return False
def parse_week_str(s: str):
start, end = [datetime.strptime(d, "%Y%m%d") for d in s.split('-')]
date_list = [(start + timedelta(days=i)).strftime("%Y%m%d")
for i in range((end - start).days + 1)]
return date_list
def sort_dict_by_clockwise(input_dict: Dict[str, any]) -> dict:
"""
将字典按照顺时针方向(从北开始)排序键的顺序。
参数:
input_dict: 键为方向字母的字典。
返回:
按照顺时针顺序排列的有序字典。
"""
clockwise_order = ['N', 'NE', 'E', 'SE', 'S', 'SW', 'W', 'NW']
return {key: input_dict[key] for key in clockwise_order if key in input_dict}
def gen_ten_weeks_ago_data_list_with_weekdays(date_type='week'):
weekdays = '1,2,3,4,5,6,7'
if date_type == 'workday':
weekdays = '1,2,3,4,5'
if date_type == 'weekend':
weekdays = '6,7'
fmt = "%Y%m%d"
end = datetime.now() - timedelta(days=1) # 昨天
start = end - timedelta(days=69) # 70 天跨度 = 10 个完整周
week_buckets = {}
for d in range((end - start).days + 1):
day = start + timedelta(days=d)
monday = day - timedelta(days=day.weekday()) # 本周一
if str(day.weekday() + 1) not in weekdays:
continue
week_buckets.setdefault(monday, []).append(day.strftime(fmt))
# 大 list从远到近最早一周在最前面
result = [week_buckets[k] for k in sorted(week_buckets, reverse=False)]
return result
def clean_dict_nan(obj, default_value=None):
"""
递归清理 dict/list 中的 NaN/Infinity
Args:
obj: 任意 Python 对象
default_value: NaN 的替换值None 会转为 JSON null
"""
if isinstance(obj, dict):
return {k: clean_dict_nan(v, default_value) for k, v in obj.items()}
if isinstance(obj, list):
return [clean_dict_nan(item, default_value) for item in obj]
if isinstance(obj, float) and (math.isnan(obj) or math.isinf(obj)):
return default_value
return obj
def has_duplicate_dates(range1_start, range1_end, range2_start, range2_end):
"""
判定两个日期范围是否存在重复的日期
参数:
range1_start: 第一个日期范围的开始日期(格式 YYYYMMDD
range1_end: 第一个日期范围的结束日期(格式 YYYYMMDD
range2_start: 第二个日期范围的开始日期(格式 YYYYMMDD
range2_end: 第二个日期范围的结束日期(格式 YYYYMMDD
返回:
tuple: (重复的日期列表, 是否重复)
"""
if range1_end == '-':
range1_end = datetime.now().strftime("%Y%m%d")
try:
# 生成两个日期范围内的所有日期
dates1 = generate_date_range(range1_start, range1_end)
dates2 = generate_date_range(range2_start, range2_end)
# 找出两个集合的交集(重复的日期)
duplicate_dates = list(set(dates1) & set(dates2))
# 返回结果
return len(duplicate_dates) > 0
except ValueError as e:
print(f"日期格式错误: {e}")
return False