cross_doctor/app/common_worker.py

306 lines
12 KiB
Python
Raw Normal View History

2025-10-10 14:38:22 +08:00
# -*- coding: utf-8 -*-
# @Author: Owl
# @Date: 2025/10/10 11:22
# @Description: 初始化加载逻辑及公共属性赋值位置
import argparse
import configparser
import logging
import os
import sys
from datetime import datetime
import yaml
from app.comm import comparable_rank
from app.global_source import *
def load_roadnet_by_city(citycode: int):
logging.info('start load roadnet for %d' % citycode)
net_filename = 'data/xl_roadnet_%d.txt' % citycode
roadinfo_filename = 'data/xl_roadinfo_%d.txt' % citycode
# roadlinks_filename = 'data/xl_roadlinks_%d.txt' % citycode
g_roadnet.load_from_file(net_filename, citycode)
g_roadinfo_manager.load_from_file(roadinfo_filename)
# g_roadlinks_manager.load_from_file(roadlinks_filename)
logging.info('finish load roadnet for %d' % citycode)
def reload_base_info(citycode: int):
global last_reload_time_info
crosses_list, roads_list = db_tmnet.reload_cross_info(citycode, last_reload_time_info[citycode])
for cross_info in crosses_list:
crossid = cross_info['crossid']
if crossid:
g_roadnet.update_cross_info(crossid, cross_info)
if cross_info['isdeleted'] == 1:
del g_roadnet.cross_map[crossid]
for road_info in roads_list:
roadid = road_info['roadid']
if roadid:
g_roadnet.update_road_net(roadid, road_info)
g_roadinfo_manager.update_road_info(roadid, road_info)
last_reload_time_info[citycode] = datetime.now()
def pick_citycode(filename: str):
tail_part = filename.split('_')[-1]
citycode = int(tail_part.split('.')[0])
return citycode
def list_all_citycode(path: str):
"""
找出所有的citycode
"""
citycode_set = set()
files = os.listdir(path)
for file in files:
if not file.endswith(".txt"):
continue
if not file.startswith("xl_roadnet_"):
continue
citycode = pick_citycode(file)
citycode_set.add(citycode)
return citycode_set
def load_all_roadnet():
global g_citycode_set
need_filter_by_city = len(g_citycode_set) > 0
if need_filter_by_city:
# 按照citycode的列表加载各个城市的roadnet
for citycode in g_citycode_set:
last_reload_time_info[citycode] = datetime.strptime('1970-01-01 00:00:00', '%Y-%m-%d %H:%M:%S')
load_roadnet_by_city(citycode)
reload_base_info(citycode)
else:
# 加载data目录下的所有城市的roadnet
citycode_set = list_all_citycode('./data')
for citycode in citycode_set:
load_roadnet_by_city(citycode)
global g_roadnet, g_roadinfo_manager
g_roadnet.do_cross_classify(g_roadinfo_manager)
g_roadnet.collect_midroad_crosses()
# 设置辅路属性
set_all_side_inroadid()
def set_all_side_inroadid():
sup_roadid_set = set()
main_side_pair_list = g_roadnet.query_main_side_road_pair()
for pair in main_side_pair_list:
r1 = pair[0]
r2 = pair[1]
r1_info = g_roadinfo_manager.query_roadinfo(r1)
r1_name = g_roadnet.query_road(r1).name
r2_info = g_roadinfo_manager.query_roadinfo(r2)
r1_rc, r2_rc = additional_sup_road_comparable_rank(r1_info.roadclass, r2_info.roadclass)
r2_name = g_roadnet.query_road(r2).name
if r1_name.__contains__('辅路'):
sup_roadid_set.add(r1)
elif r2_name.__contains__('辅路'):
sup_roadid_set.add(r2)
else:
if r1_rc > r2_rc:
sup_roadid_set.add(r1)
elif r1_rc < r2_rc:
sup_roadid_set.add(r2)
elif r1_rc == r2_rc:
if r1_info.lane_num < r2_info.lane_num:
sup_roadid_set.add(r1)
elif r1_info.lane_num > r2_info.lane_num:
sup_roadid_set.add(r2)
else:
if r1_info.ffs < r2_info.ffs:
sup_roadid_set.add(r1)
elif r1_info.ffs > r2_info.ffs:
sup_roadid_set.add(r2)
else:
pass
# got sup_road_list
side_roadid_list = list(sup_roadid_set)
for rid in side_roadid_list:
g_roadnet.query_road(rid).sideway = True
return side_roadid_list
def additional_sup_road_comparable_rank(r1_roadclass, r2_roadclass):
suburb_rc = [1, 2, 3, 4, 5] # 城市外道路等级分区
city_rc = [6, 7, 8, 9, 10] # 城市内道路等级分区
# 如果既不在城市内分区,又不在城市外分区,说明该道路等级为未知,不做该判断
if (r1_roadclass not in suburb_rc and r1_roadclass not in city_rc) or (r2_roadclass not in suburb_rc and r2_roadclass not in city_rc):
return comparable_rank(r1_roadclass), comparable_rank(r2_roadclass)
# 如果待比较的两个道路等级为同区内的 则进行常规辅路判定逻辑
if not ((r1_roadclass in suburb_rc and r2_roadclass in suburb_rc) or (r1_roadclass in city_rc and r2_roadclass in city_rc)):
# 如果r1是城市外道路r2为城市内道路
if r1_roadclass in suburb_rc and r2_roadclass in city_rc:
# 如果r1不属于县道及其他低等级道路 则认为r1的道路等级更高
if r1_roadclass not in [4, 5]:
return 1, 2
# 县道大于城市支路或郊区小路 其他低等级道路 大于郊区小路
elif (r1_roadclass == 4 and r2_roadclass in [9, 10]) or (r1_roadclass == 5 and r2_roadclass == 10):
return 1, 2
else:
return 2, 1
else:
if r1_roadclass in city_rc and r2_roadclass in suburb_rc:
if r2_roadclass not in [4, 5]:
return 2, 1
elif (r2_roadclass == 4 and r1_roadclass in [9, 10]) or (r2_roadclass == 5 and r1_roadclass == 10):
return 2, 1
else:
return 1, 2
return comparable_rank(r1_roadclass), comparable_rank(r2_roadclass)
def init_with_config():
config = configparser.ConfigParser()
config.read('manager.ini', encoding='utf-8')
# 数据库相关配置
if not config.has_section("db"):
logging.error('missing db section in router.ini')
sys.exit(1)
dev = dev_args()
# 根据配置,修改密码和数据库名称
if config.has_option('db', 'host'):
host = config.get('db', 'host')
g_dbinfo['host'] = host
if config.has_option('db', 'user'):
user = config.get('db', 'user')
g_dbinfo['user'] = user
if config.has_option('db', 'password'):
password = config.get('db', 'password')
g_dbinfo['password'] = password
if config.has_option('db', 'dbname'):
dbname = config.get('db', 'dbname')
g_dbinfo['db'] = dbname
if config.has_option('roadnet_db', 'host'):
host = config.get('roadnet_db', 'host')
g_roadnet_db['host'] = host
if config.has_option('roadnet_db', 'user'):
user = config.get('roadnet_db', 'user')
g_roadnet_db['user'] = user
if config.has_option('roadnet_db', 'password'):
password = config.get('roadnet_db', 'password')
g_roadnet_db['password'] = password
if config.has_option('roadnet_db', 'dbname'):
dbname = config.get('roadnet_db', 'dbname')
g_roadnet_db['db'] = dbname
if config.has_option('cloud_db', 'host'):
host = config.get('cloud_db', 'host')
g_cloud_db['host'] = host
if config.has_option('cloud_db', 'user'):
user = config.get('cloud_db', 'user')
g_cloud_db['user'] = user
if config.has_option('cloud_db', 'password'):
password = config.get('cloud_db', 'password')
g_cloud_db['password'] = password
if config.has_option('cloud_db', 'dbname'):
dbname = config.get('cloud_db', 'dbname')
g_cloud_db['db'] = dbname
# redis
if config.has_section('redis'):
if config.has_option('redis', 'ip'):
g_redisinfo['ip'] = config.get('redis', 'ip')
if config.has_option('redis', 'port'):
g_redisinfo['port'] = int(config.get('redis', 'port'))
if config.has_option('redis', 'password'):
g_redisinfo['password'] = config.get('redis', 'password')
if config.has_option('redis', 'db'):
g_redisinfo['db'] = config.get('redis', 'db')
# rpc
if config.has_section('rpc'):
g_config['rpc_host'] = '172.21.32.21'
g_config['rpc_port'] = '50051'
if config.has_option('rpc', 'host'):
g_config['rpc_host'] = config.get('rpc', 'host')
if config.has_option('rpc', 'port'):
g_config['rpc_port'] = int(config.get('rpc', 'port'))
# 重新初始化 延误数据库连接
if dev == 0:
if config.has_option('db', 'host_inner'):
g_dbinfo['host'] = config.get('db', 'host_inner')
if config.has_option('roadnet_db', 'host_inner'):
g_roadnet_db['host'] = config.get('roadnet_db', 'host_inner')
if config.has_option('cloud_db', 'host_inner'):
g_cloud_db['host'] = config.get('cloud_db', 'host_inner')
if config.has_option('cloud_db', 'port_inner'):
g_cloud_db['port'] = int(config.get('cloud_db', 'port_inner'))
if config.has_option('redis', 'ip_inner'):
g_redisinfo['ip'] = config.get('redis', 'ip_inner')
if config.has_option('rpc', 'host_inner'):
g_config['rpc_host'] = config.get('rpc', 'host_inner')
print(g_dbinfo)
print(g_roadnet_db)
print(g_cloud_db)
g_db_pool.init_pool(g_dbinfo)
g_roadnet_pool.init_pool(g_roadnet_db)
g_cloud_pool.init_pool(g_cloud_db)
# 本机服务配置
if config.has_section('server'):
if config.has_option('server', 'host'):
g_config['host'] = config.get('server', 'host')
else:
g_config['host'] = '0.0.0.0'
if config.has_option('server', 'port'):
g_config['port'] = int(config.get('server', 'port'))
else:
g_config['port'] = 6666
logging.info(g_config)
# 路网相关配置
if not config.has_section("roadnet"):
logging.error('missing roadnet section in router.ini')
sys.exit(1)
if config.has_option('roadnet', 'citylist'):
citylist_str = config.get('roadnet', 'citylist')
# cityinfo_dict = ast.literal_eval(citylist_str)
# g_config['cityinfo_dict'] = cityinfo_dict
# print(cityinfo_dict)
ss = citylist_str.split(',')
for item in ss:
citycode = int(item)
g_citycode_set.add(citycode)
# 加载所有城市的路网
load_all_roadnet()
cityinfo_dict = read_city_info('city_info.yaml')
print(cityinfo_dict)
g_config['cityinfo_dict'] = cityinfo_dict
# 20250422 新增业务主管独有功能相关内容
if config.has_section('executive'):
if config.has_option('executive', 'userid'):
g_config['executives'] = config.get('executive', 'userid').split(',')
# 20250514 探索分词索引查询路口名称
# full_crossname_inverted_index()
# gen_cross_info_index(cityinfo_dict)
# print('finished load config file, server started')
def dev_args():
try:
parser = argparse.ArgumentParser()
parser.add_argument('--dev', default=0, help='开发环境参数0否1是')
args = parser.parse_args()
return args.dev
except SystemExit:
pass
return 0
def read_city_info(file_path):
with open(file_path, 'r', encoding='utf-8') as f:
config = yaml.safe_load(f)
return config['city_list']
def check_param(params, key):
value = None
if key in params:
value = params[key]
return value