1952 lines
72 KiB
Python
1952 lines
72 KiB
Python
from typing import Dict, Any, List
|
||
|
||
import app.console as console
|
||
from app.GeoFunc import *
|
||
from app.comm import *
|
||
from matplotlib.path import Path
|
||
|
||
# 车信箭头标记到转向集合的映射关系
|
||
g_turn2str = {'1': 's', '2': 'l', '3': 'r', '4': 'ls', '5': 'sr', '6': 't', '7': 'st', '8': 'lt', '9': 'lr',
|
||
'10': 'lst', '11': '-', '12': 'lsr', '13': 'rt', '14': 'lrt', '15': 'srt', '16': 'lsrt', '17': 'bus', '19': 'reversible'}
|
||
|
||
g_turnflag2type = {'s': 0, 'l': 1, 'r': 2, 't': 3}
|
||
|
||
|
||
# 转向类型到车信箭头标记的映射关系
|
||
g_turnset_of_turn_type = [{1,4,5,7,10,12,15,16}, {2,4,8,9,10,12,14,16},
|
||
{3,5,6,7,8,10,13,14,15,16}, {6,7,8,10,13,15,16}]
|
||
|
||
|
||
def turninfo2str(turninfo):
|
||
global g_turn2str
|
||
res = []
|
||
if turninfo == '':
|
||
res.append(g_turn2str['1'])
|
||
else:
|
||
ss = turninfo.split('|')
|
||
for turn in ss:
|
||
if turn in g_turn2str.keys():
|
||
res.append(g_turn2str[turn])
|
||
else:
|
||
turn = '1'
|
||
res.append(g_turn2str[turn])
|
||
return res
|
||
|
||
|
||
def turnflag_to_turn_type(token):
|
||
global g_turnflag2type
|
||
if token in g_turnflag2type:
|
||
return g_turnflag2type[token]
|
||
else:
|
||
return 0
|
||
|
||
|
||
def get_comparable_rc(rc):
|
||
"""
|
||
返回取值1~5的有可比性的道路等级
|
||
:param rc:
|
||
:return:
|
||
"""
|
||
return rc if rc <= 5 else (rc - 5)
|
||
|
||
|
||
class MapBound:
|
||
"""
|
||
地图矩形范围
|
||
"""
|
||
|
||
def __init__(self):
|
||
self.min_lon = 0
|
||
self.min_lat = 0
|
||
self.max_lon = 0
|
||
self.max_lat = 0
|
||
|
||
def in_bound(self, lon: float, lat: float):
|
||
"""
|
||
判断指定坐标是否落入bound内
|
||
"""
|
||
return self.min_lon < lon < self.max_lon and self.min_lat < lat < self.max_lat
|
||
|
||
@classmethod
|
||
def parse_from_str(cls, bound_str):
|
||
bound = MapBound()
|
||
try:
|
||
lonlat_str_list = bound_str.split(',')
|
||
bound.min_lon = float(lonlat_str_list[0])
|
||
bound.min_lat = float(lonlat_str_list[1])
|
||
bound.max_lon = float(lonlat_str_list[2])
|
||
bound.max_lat = float(lonlat_str_list[3])
|
||
return bound
|
||
except Exception:
|
||
return None
|
||
|
||
|
||
# 定义经纬度点
|
||
class Pt:
|
||
def __init__(self, lng, lat):
|
||
self.lng = lng
|
||
self.lat = lat
|
||
|
||
def copy(self):
|
||
return Pt(self.lng, self.lat)
|
||
|
||
def toStr(self):
|
||
return "%.6f,%.6f" % (self.lng, self.lat)
|
||
|
||
def dist(self, other):
|
||
d = math.fabs(self.lng - other.lng) + math.fabs(self.lat - other.lat)
|
||
# console.log(
|
||
# '' + str(self.lng) + ' ' + str(other.lng) + ' ' + str(self.lat) + ' ' + str(other.lat) + ' = ' + str(d))
|
||
return d
|
||
|
||
|
||
def sqrt_dist(self, other):
|
||
d = math.sqrt((self.lng - other.lng)*(self.lng - other.lng) + (self.lat - other.lat)*(self.lat - other.lat))
|
||
return d
|
||
|
||
|
||
CROSS_TYPE_NORMAL = 1
|
||
CROSS_TYPE_LIGHT = 2
|
||
|
||
# // 定义路口类
|
||
class Cross:
|
||
def __init__(self, lng, lat):
|
||
self.id = self.coord2id(lng, lat)
|
||
self.at_edge = False
|
||
# type 0:普通路口; 1:信控路口
|
||
self.type = 1
|
||
self.location = Pt(lng, lat)
|
||
self.name = 'unknown'
|
||
self.in_roadids = []
|
||
self.out_roadids = []
|
||
# 20250922 新增辖区id 辖区名称 信息
|
||
self.area_id = 0
|
||
self.area_name = ''
|
||
|
||
def set_as_edge(self):
|
||
self.at_edge = True
|
||
|
||
def judge_if_edge(self):
|
||
if len(self.in_roadids) <= 1 and len(self.out_roadids) <= 1:
|
||
self.at_edge = True
|
||
|
||
@staticmethod
|
||
def coord2id(lng, lat):
|
||
LNG = round(lng * 100000)
|
||
LAT = round(lat * 100000)
|
||
id = 'CR_' + str(LNG) + '_' + str(LAT)
|
||
return id
|
||
|
||
def add_inroad(self, roadid):
|
||
self.in_roadids.append(roadid)
|
||
|
||
def add_outroad(self, roadid):
|
||
self.out_roadids.append(roadid)
|
||
|
||
def get_location(self):
|
||
return str(self.location.lng) + ',' + str(self.location.lat)
|
||
|
||
def has_light(self) -> bool:
|
||
"""是否为信控路口"""
|
||
return self.type == 1
|
||
|
||
def to_dict(self):
|
||
info = {}
|
||
info['crossid'] = self.id
|
||
info['location'] = self.get_location()
|
||
info['name'] = self.name
|
||
info['type'] = self.type
|
||
return info
|
||
|
||
def toString(self):
|
||
return self.id + " coord:" + str(self.location.lng) + ',' + str(self.location.lat) + ' inroad:' + str(
|
||
len(self.in_roadid)) + ' outroad:' + str(len(self.out_roadids))
|
||
|
||
def serialize_as_txt(self):
|
||
edge_flag = 1 if self.at_edge else 0
|
||
return self.id + ' ' + str(self.location.lng) + ',' + str(self.location.lat) + ' ' + self.name + ' ' + edge_flag
|
||
|
||
@classmethod
|
||
def deserialize_from_txt(cls, line):
|
||
ss = line.split(' ')
|
||
xys = ss[1].split(',')
|
||
lng = float(xys[0])
|
||
lat = float(xys[1])
|
||
cross = Cross(lng, lat)
|
||
cross.id = ss[0]
|
||
if len(ss) >= 3:
|
||
cross.name = ss[2]
|
||
if len(ss) >= 4:
|
||
flag = int(ss[3])
|
||
cross.at_edge = True if flag == 1 else False
|
||
if len(ss) >= 5:
|
||
cross.type = int(ss[4])
|
||
if cross.type < 1 or cross.type > 2:
|
||
console.log('wrong type %s for cross %s' % (ss[4], cross.id))
|
||
cross.type = 2
|
||
else:
|
||
# 如果不是边缘路口,则默认视作灯控路口
|
||
if not cross.at_edge:
|
||
cross.type = 2
|
||
return cross
|
||
|
||
|
||
class Road:
|
||
def __init__(self, crossid_from, crossid_to):
|
||
self.id = self.make_roadid(crossid_from, crossid_to)
|
||
self.crossid_from = crossid_from
|
||
self.crossid_to = crossid_to
|
||
self.out_flows = []
|
||
self.name = 'unknown'
|
||
self.path = []
|
||
self.srcDir = '' # 车流来向
|
||
|
||
def make_roadid(self, crossid_from, crossid_to):
|
||
return 'RD_' + crossid_from + '_' + crossid_to
|
||
|
||
def set_name(self, name):
|
||
self.name = name
|
||
|
||
|
||
def is_sideway(self):
|
||
"""该路段是否为辅路"""
|
||
return self.name[-2:] == '辅路'
|
||
|
||
def set_path(self, geo):
|
||
# if self.crossid_to == '6ee88a42611bd4':
|
||
# print('debug')
|
||
xys = geo.split(',')
|
||
pt_num = len(xys) // 2
|
||
for i in range(0, pt_num):
|
||
self.path.append(Pt(float(xys[2 * i]), float(xys[2 * i + 1])))
|
||
self.srcDir = GeoFunc.calc_srcDir_tail(self.path)
|
||
# self.srcDir = GeoFunc.calc_srcDir(MyPoint.makeOne(self.path[0]), MyPoint.makeOne(self.path[len(self.path) - 1]))
|
||
|
||
def get_headfrom_srcDir(self):
|
||
"""
|
||
获取路段起始段的车流来向
|
||
:return:
|
||
"""
|
||
return GeoFunc.calc_srcDir_head(self.path)
|
||
|
||
def get_angle_from_head(self):
|
||
return GeoFunc.calc_angle_head(self.path)
|
||
|
||
def get_angle_from_tail(self):
|
||
return GeoFunc.calc_angle_tail(self.path)
|
||
|
||
def toPt(self, point: MyPoint):
|
||
pt = Pt()
|
||
pt.lng = point.X
|
||
pt.lat = point.Y
|
||
return pt
|
||
|
||
def get_tail_up_pt(self):
|
||
pt_list = self.path
|
||
num = len(pt_list)
|
||
pB = MyPoint.makeOne(pt_list[num - 1])
|
||
cur_pA_index = num - 2
|
||
while True:
|
||
pA = MyPoint.makeOne(pt_list[cur_pA_index])
|
||
if cur_pA_index <= 0 or pA.get_dist(pB) >= 0.0005:
|
||
ppt = MyPoint.makeOne(pt_list[cur_pA_index + 1])
|
||
left_dist = 0.0005 - ppt.get_dist(pB)
|
||
mid_pt = ppt.calc_left_pt(pA, left_dist)
|
||
# print('left_dist %f. dist %f. %.6f,%.6f' % (left_dist, pB.get_dist(mid_pt), mid_pt.X, mid_pt.Y))
|
||
return Pt(mid_pt.X, mid_pt.Y)
|
||
# return pt_list[cur_pA_index]
|
||
cur_pA_index -= 1
|
||
return None
|
||
|
||
def get_head_down_pt(self):
|
||
pt_list = self.path
|
||
num = len(pt_list)
|
||
pA = MyPoint.makeOne(pt_list[0])
|
||
cur_pB_index = 1
|
||
while True:
|
||
pB = MyPoint.makeOne(pt_list[cur_pB_index])
|
||
if cur_pB_index >= num - 1 or pA.get_dist(pB) >= 0.0005:
|
||
ppt = MyPoint.makeOne(pt_list[cur_pB_index-1])
|
||
left_dist = 0.0005 - pA.get_dist(ppt)
|
||
mid_pt = ppt.calc_left_pt(pB, left_dist)
|
||
# print('left_dist %f. dist %f. %.6f,%.6f' % (left_dist, pA.get_dist(mid_pt), mid_pt.X, mid_pt.Y))
|
||
return Pt(mid_pt.X, mid_pt.Y)
|
||
# return pt_list[cur_pB_index]
|
||
cur_pB_index += 1
|
||
return None
|
||
|
||
def get_geo(self):
|
||
ss = []
|
||
for pt in self.path:
|
||
ss.append(str(pt.lng))
|
||
ss.append(str(pt.lat))
|
||
return ','.join(ss)
|
||
|
||
def fill_coords(self, cross_map):
|
||
cross_from = cross_map.get(self.crossid_from)
|
||
cross_to = cross_map.get(self.crossid_to)
|
||
self.path.append(cross_from.location.copy())
|
||
self.path.append(cross_to.location.copy())
|
||
self.srcDir = GeoFunc.calc_srcDir(MyPoint.makeOne(cross_from.location), MyPoint.makeOne(cross_to.location))
|
||
|
||
def add_outflow(self, flow):
|
||
self.out_flows.append(flow)
|
||
|
||
def toString(self):
|
||
return self.crossid_from + ' => ' + self.crossid_to + ' name:' + self.name
|
||
|
||
def serialize_as_txt(self):
|
||
return self.id + ' ' + self.crossid_from + ' ' + self.crossid_to + ' ' + self.name
|
||
|
||
@classmethod
|
||
def deserialize_from_txt(cls, line):
|
||
ss = line.split(' ')
|
||
id = ss[0]
|
||
crossid_from = ss[1]
|
||
crossid_to = ss[2]
|
||
road = Road(crossid_from, crossid_to)
|
||
road.id = id
|
||
road.set_name(ss[3])
|
||
if (len(ss) >= 5):
|
||
geo = ss[4]
|
||
road.set_path(geo)
|
||
return road
|
||
|
||
def get_direct_angle(self):
|
||
pt0: Pt = self.path[0]
|
||
pt1: Pt = self.path[-1]
|
||
pA = MyPoint(pt0.lng, pt0.lat)
|
||
pB = MyPoint(pt1.lng, pt1.lat)
|
||
return GeoFunc.calc_pole_angle(pA, pB)
|
||
|
||
|
||
# 定义流向类
|
||
class Flow:
|
||
def __init__(self, crossid, roadid_in, roadid_out):
|
||
self.crossid = crossid
|
||
self.roadid_in = roadid_in
|
||
self.roadid_out = roadid_out
|
||
self.turn_type = -1
|
||
|
||
def getid(self):
|
||
return hash('FL_' + self.roadid_in + '_' + str(self.turn_type) + self.roadid_out)
|
||
# return 'FL_' + self.roadid_in + '_' + str(self.turn_type)
|
||
|
||
def toString(self):
|
||
return self.crossid + ' ' + self.roadid_in + ' ' + self.roadid_out + ' ' + str(self.turn_type)
|
||
|
||
def serialize_as_txt(self):
|
||
return self.toString()
|
||
|
||
@classmethod
|
||
def deserialize_from_txt(cls, line):
|
||
ss = line.split(' ')
|
||
flow = Flow(ss[0], ss[1], ss[2])
|
||
flow.turn_type = int(ss[3])
|
||
return flow
|
||
|
||
|
||
#
|
||
class CrossTopo:
|
||
"""
|
||
路口拓扑,包含各个进口道方向[取值为标准八方向]所对应的转向[取值为0,1,2,3]数组
|
||
"""
|
||
|
||
def __init__(self):
|
||
# 存储结构:srcDir => [0/1/2/3]数组,数组内每个元素表示1个流向
|
||
self.srcDir_flows = {}
|
||
|
||
# 添加一个流向
|
||
def add_flow_delay(self, srcDir, turn_type):
|
||
data = None
|
||
if srcDir in self.srcDir_flows:
|
||
data = self.srcDir_flows[srcDir]
|
||
else:
|
||
data = []
|
||
if turn_type not in data:
|
||
data.append(turn_type)
|
||
self.srcDir_flows[srcDir] = data
|
||
|
||
def get_srcDir_list(self) -> [str]:
|
||
return list(self.srcDir_flows.keys())
|
||
|
||
def get_srcDir_pairs(self) -> (str, str):
|
||
pair_list = []
|
||
srcDir_set = set(self.srcDir_flows.keys())
|
||
if 'N' in srcDir_set and 'S' in srcDir_set:
|
||
pair_list.append(['N', 'S'])
|
||
if 'E' in srcDir_set and 'W' in srcDir_set:
|
||
pair_list.append(['E', 'W'])
|
||
if 'NW' in srcDir_set and 'SE' in srcDir_set:
|
||
pair_list.append(['NW', 'SE'])
|
||
if 'NE' in srcDir_set and 'SW' in srcDir_set:
|
||
pair_list.append(['NE', 'SW'])
|
||
if len(pair_list) == 2:
|
||
# 仅当存在两组对向时,才返回结果
|
||
return pair_list[0], pair_list[1]
|
||
else:
|
||
return False, False
|
||
|
||
|
||
# 定义交管路网类
|
||
class RoadNet:
|
||
def __init__(self, nodeid):
|
||
self.nodeid = nodeid
|
||
# 路口数据
|
||
self.cross_map = {}
|
||
self.road_map = {}
|
||
self.flow_map = {}
|
||
self.bound_info = {}
|
||
self.is_for_greenwave = False
|
||
# 路口的rank信息
|
||
self.cross_rank = {}
|
||
# 路口到所属citycode的映射关系
|
||
self.cross2city = {}
|
||
# 疑似过街灯的信号路口集合
|
||
self.midroad_cross_idset = set()
|
||
|
||
def collect_midroad_crosses(self):
|
||
"""
|
||
识别路网中疑似在道路中间的不是交叉口的信控cross, 收集到self.midroad_cross_idset中
|
||
:return:
|
||
"""
|
||
for cross in self.cross_map.values():
|
||
if cross.type == CROSS_TYPE_LIGHT and len(cross.in_roadids) == 2:
|
||
name_set = set()
|
||
for rid in cross.in_roadids:
|
||
road = self.query_road(rid)
|
||
name_set.add(road.name)
|
||
if len(name_set) == 1:
|
||
self.midroad_cross_idset.add(cross.id)
|
||
print('collect midroad_cross num: %d' % len(self.midroad_cross_idset))
|
||
|
||
def is_midroad_cross(self, crossid: str):
|
||
"""
|
||
判断是不是道路中间的可跨越的信控路口
|
||
:param crossid:
|
||
:return:
|
||
"""
|
||
return crossid in self.midroad_cross_idset
|
||
|
||
def set_for_greenwave(self):
|
||
self.is_for_greenwave = True
|
||
|
||
def add_cross(self, cross):
|
||
if self.bound_info and len(self.bound_info) > 0:
|
||
bound_info = self.bound_info
|
||
for item in bound_info:
|
||
area_id = item['area_id']
|
||
area_name = item['area_name']
|
||
cross.area_id = area_id
|
||
cross.area_name = area_name
|
||
bound = item['bounds']
|
||
if check_point_in_bounds(bound, cross.location):
|
||
self.cross_map[cross.id] = cross
|
||
else:
|
||
continue
|
||
elif not self.bound_info:
|
||
self.cross_map[cross.id] = cross
|
||
|
||
def add_road(self, road):
|
||
if road.crossid_from == road.crossid_to:
|
||
return False
|
||
if road.crossid_from not in self.cross_map.keys() and road.crossid_to not in self.cross_map.keys():
|
||
return False
|
||
self.road_map[road.id] = road
|
||
if (len(road.path) < 2):
|
||
road.fill_coords(self.cross_map)
|
||
# 更新路口的进出口道路
|
||
cross_from = self.cross_map.get(road.crossid_from)
|
||
if cross_from:
|
||
cross_from.add_outroad(road.id)
|
||
cross_to = self.cross_map.get(road.crossid_to)
|
||
if cross_to:
|
||
cross_to.add_inroad(road.id)
|
||
return True
|
||
|
||
def add_bidir_road(self, road):
|
||
reversed_road = Road(road.crossid_to, road.crossid_from)
|
||
self.add_road(road)
|
||
self.add_road(reversed_road)
|
||
|
||
def add_flow(self, flow):
|
||
self.flow_map[flow.getid()] = flow
|
||
inroad = self.road_map.get(flow.roadid_in)
|
||
if inroad:
|
||
inroad.add_outflow(flow.getid())
|
||
|
||
# 自动生成所有的flow
|
||
def fill_allflows(self):
|
||
self.flow_map.clear()
|
||
for road in self.road_map.values():
|
||
road.out_flows = []
|
||
|
||
add_num = 0
|
||
# 针对每个路口的每一组in-out路段,都生成一个flow
|
||
for (id, cross) in self.cross_map.items():
|
||
in_roads = cross.in_roadids
|
||
out_roads = cross.out_roadids
|
||
# 遍历进口道
|
||
for in_roadid in in_roads:
|
||
# 遍历出口道
|
||
for out_roadid in out_roads:
|
||
flow = Flow(id, in_roadid, out_roadid)
|
||
flow.turn_type = self.calc_turn_type(in_roadid, out_roadid)
|
||
self.add_flow(flow)
|
||
add_num += 1
|
||
print("add flow %d, final get flow %d" % (add_num, len(self.flow_map)))
|
||
|
||
def query_cross(self, crossid) -> Cross:
|
||
return self.cross_map.get(crossid)
|
||
|
||
def query_road(self, roadid) -> Road:
|
||
return self.road_map.get(roadid)
|
||
|
||
def query_road_2(self, crossid, srcDir) -> Road:
|
||
"""查询路口指定来车方向的进口道"""
|
||
inroad_ids = self.query_cross(crossid).in_roadids
|
||
for rid in inroad_ids:
|
||
road = self.query_road(rid)
|
||
if road.srcDir == srcDir:
|
||
return road
|
||
return None
|
||
|
||
def query_next_roads(self, roadid, srcDir) -> [str]:
|
||
"""查询下游路段"""
|
||
res = []
|
||
road = self.query_road(roadid)
|
||
next_roadids = self.query_cross(road.crossid_to).out_roadids
|
||
for rid in next_roadids:
|
||
next_road = self.query_road(rid)
|
||
if next_road.srcDir == srcDir:
|
||
res.append(rid)
|
||
return res
|
||
|
||
def query_next_staight_roads(self, roadid, angle_limit = 60):
|
||
"""
|
||
查找路段的下游近似直行的road
|
||
@param roadid: 输入roadid
|
||
@param angle_limit: 转角限制,默认60度
|
||
@return: [roadid]
|
||
"""
|
||
res = []
|
||
this_road = self.query_road(roadid)
|
||
tmp_cross = self.query_cross(this_road.crossid_to)
|
||
direct_angle = this_road.get_direct_angle()
|
||
for roadid in tmp_cross.out_roadids:
|
||
road = self.query_road(roadid)
|
||
# 找出转角小于60的road
|
||
diff_angle = GeoFunc.diff_pole_angle(direct_angle, road.get_direct_angle())
|
||
if diff_angle <= angle_limit:
|
||
res.append(road.id)
|
||
return res
|
||
|
||
def query_prev_roads(self, roadid, srcDir) -> [str]:
|
||
"""查询上游路段"""
|
||
res = []
|
||
road = self.query_road(roadid)
|
||
prev_roadids: list[str] = self.query_cross(road.crossid_from).in_roadids
|
||
for rid in prev_roadids:
|
||
prev_road = self.query_road(rid)
|
||
if not prev_road:
|
||
print('missing query_prev_roads roadid %s from road %s srcDir %s' % (rid, roadid, srcDir))
|
||
if prev_road.srcDir == srcDir:
|
||
res.append(rid)
|
||
return res
|
||
|
||
def query_main_side_road_pair(self):
|
||
"""
|
||
获取路网内包含的疑似主辅路的roadid的pair
|
||
:return:[(roadid1, roadid2)]
|
||
"""
|
||
pair_list = []
|
||
# 遍历路口
|
||
for cross in self.cross_map.values():
|
||
if cross.at_edge:
|
||
continue
|
||
inroad_num = len(cross.in_roadids)
|
||
for i in range(0, inroad_num):
|
||
for j in range(i+1, inroad_num):
|
||
rid1 = cross.in_roadids[i]
|
||
rid2 = cross.in_roadids[j]
|
||
road1 = self.query_road(rid1)
|
||
road2 = self.query_road(rid2)
|
||
if road1.crossid_from == road2.crossid_from and road1.srcDir == road2.srcDir:
|
||
pair_list.append((rid1, rid2))
|
||
# return pair_list
|
||
return pair_list
|
||
|
||
|
||
def query_flow(self, flowid) -> Flow:
|
||
return self.flow_map.get(flowid)
|
||
|
||
def get_crossid_list(self):
|
||
res = []
|
||
for id in self.cross_map.keys():
|
||
res.append(id)
|
||
return res
|
||
|
||
def get_valid_crossid_list(self) -> []:
|
||
res = []
|
||
for (crossid, cross) in self.cross_map.items():
|
||
if cross.at_edge:
|
||
continue
|
||
else:
|
||
res.append(crossid)
|
||
return res
|
||
|
||
# g_cross_list
|
||
def fill_valid_crossids(self, crossid_list) -> int:
|
||
fill_num = 0
|
||
for (crossid, cross) in self.cross_map.items():
|
||
if cross.at_edge:
|
||
continue
|
||
else:
|
||
crossid_list.append(crossid)
|
||
fill_num += 1
|
||
return fill_num
|
||
|
||
def fill_valid_crossinfo(self, cross_list) -> int:
|
||
fill_num = 0
|
||
for (crossid, cross) in self.cross_map.items():
|
||
if cross.at_edge:
|
||
continue
|
||
# elif len(cross.in_roadids) < 4:
|
||
# continue
|
||
else:
|
||
cross_list.append({'crossid': crossid, 'type': cross.type, 'name': cross.name, 'location': cross.location.toStr()})
|
||
fill_num += 1
|
||
return fill_num
|
||
|
||
def get_crossroads(self) -> []:
|
||
"""
|
||
获取十字路口的信息数组
|
||
:return:
|
||
"""
|
||
cross_list = []
|
||
for (crossid, cross) in self.cross_map.items():
|
||
if cross.at_edge:
|
||
continue
|
||
else:
|
||
# 进一步判断是否为十字路口
|
||
if len(cross.in_roadids) >= 4:
|
||
cross_list.append([crossid, cross.name, cross.location.toStr()])
|
||
return cross_list
|
||
|
||
def fill_crossroads(self, crossid_set) -> []:
|
||
crossid_set.clear()
|
||
for (crossid, cross) in self.cross_map.items():
|
||
if cross.at_edge:
|
||
continue
|
||
else:
|
||
# 进一步判断是否为十字路口
|
||
if len(cross.in_roadids) >= 4:
|
||
crossid_set.add(crossid)
|
||
return crossid_set
|
||
|
||
def get_cross_allflow(self, crossid) -> {}:
|
||
flows = {}
|
||
for (id, flow) in self.flow_map.items():
|
||
if flow.crossid == crossid:
|
||
flows[id] = flow
|
||
return flows
|
||
|
||
def get_cross_topo(self, crossid) -> CrossTopo:
|
||
cross = self.cross_map.get(crossid)
|
||
if not cross:
|
||
return False
|
||
topo = CrossTopo()
|
||
in_roads = cross.in_roadids
|
||
# 遍历进口道
|
||
for in_roadid in in_roads:
|
||
inroad = self.query_road(in_roadid)
|
||
srcDir = inroad.srcDir
|
||
for flowid in inroad.out_flows:
|
||
turn_type = self.query_flow(flowid).turn_type
|
||
topo.add_flow_delay(srcDir, turn_type)
|
||
return topo
|
||
|
||
def get_adjcross(self, crossid):
|
||
cross = self.cross_map.get(crossid)
|
||
adj_cross = {}
|
||
for inroad_id in cross.in_roadids:
|
||
road = self.query_road(inroad_id)
|
||
adj_cross[road.srcDir] = self.query_cross(road.crossid_from)
|
||
return adj_cross
|
||
|
||
def get_adjcross_from(self, crossid, srcDir):
|
||
cross = self.cross_map.get(crossid)
|
||
adj_cross = None
|
||
for inroad_id in cross.in_roadids:
|
||
road = self.query_road(inroad_id)
|
||
if road.srcDir == srcDir:
|
||
adj_cross = self.query_cross(road.crossid_from)
|
||
return adj_cross
|
||
|
||
def search_cross(self, keyword):
|
||
cross_list = []
|
||
for cross in self.cross_map.values():
|
||
if cross.name.find(keyword) >= 0:
|
||
cross_list.append(cross)
|
||
return cross_list
|
||
|
||
# 根据位置查询路口
|
||
def hit_cross(self, lng, lat):
|
||
pos = Pt(lng, lat)
|
||
# 遍历map
|
||
for (id, cross) in self.cross_map.items():
|
||
if (cross.location.dist(pos) < 0.001):
|
||
console.log("hit_cross: " + id)
|
||
return id
|
||
else:
|
||
console.log("hit_cross: falsse")
|
||
|
||
return None
|
||
|
||
def query_cross_all_srcDir(self, crossid):
|
||
srcDir_list = []
|
||
cross = self.cross_map.get(crossid)
|
||
for roadid in cross.in_roadids:
|
||
road = self.road_map.get(roadid)
|
||
srcDir_list.append(road.srcDir)
|
||
|
||
return srcDir_list
|
||
|
||
def toString(self):
|
||
return ('RoadNet:%s cross_num:%d road_num:%d flow:%d'
|
||
% (self.nodeid, len(self.cross_map), len(self.road_map), len(self.flow_map)))
|
||
|
||
def serialize_as_txt(self):
|
||
# 遍历路口
|
||
lines = []
|
||
cr_num = len(self.cross_map)
|
||
road_num = len(self.road_map)
|
||
flow_num = len(self.flow_map)
|
||
headline = "cross %d road %d flow %d" % (cr_num, road_num, flow_num)
|
||
lines.append(headline)
|
||
|
||
# 遍历路口
|
||
for (id, cross) in self.cross_map.items():
|
||
lines.append(cross.serialize_as_txt())
|
||
|
||
# 遍历路段
|
||
for (id, road) in self.road_map.items():
|
||
lines.append(road.serialize_as_txt())
|
||
|
||
# 遍历流向
|
||
for (id, flow) in self.flow_map.items():
|
||
lines.append(flow.serialize_as_txt())
|
||
|
||
# 连接并返回
|
||
return '\n'.join(lines)
|
||
|
||
def reset(self):
|
||
self.cross_map.clear()
|
||
self.road_map.clear()
|
||
self.flow_map.clear()
|
||
|
||
def calc_turn_type(self, in_roadid, out_roadid):
|
||
in_road = self.query_road(in_roadid)
|
||
out_road = self.query_road(out_roadid)
|
||
if None == in_road or None == out_road:
|
||
return None
|
||
# # 简单地,根据入和出路段的首末点,来计算转角
|
||
# pA = MyPoint.makeOne(in_road.path[0])
|
||
# pB = MyPoint.makeOne(in_road.path[1])
|
||
# pC = MyPoint.makeOne(out_road.path[0])
|
||
# pD = MyPoint.makeOne(out_road.path[1])
|
||
# return GeoFunc.calc_turn_type(pA, pB, pC, pD)
|
||
# 改为:基于进口道末端几何 和 出口道开头几何 的 转角计算
|
||
pA, pB = GeoFunc.pick_path_tail(in_road.path)
|
||
pC, pD = GeoFunc.pick_path_head(out_road.path)
|
||
return GeoFunc.calc_turn_type(pA, pB, pC, pD)
|
||
|
||
def complete_roadname(self):
|
||
# 遍历road
|
||
add_roadname_num = 0
|
||
for road in self.road_map.values():
|
||
if road.name is not None and road.name != "unknown":
|
||
continue
|
||
for other in self.road_map.values():
|
||
if other.name is not None and other.name != "unknown":
|
||
if (road.crossid_from == other.crossid_to
|
||
and road.crossid_to == other.crossid_from):
|
||
road.set_name('' + other.name)
|
||
add_roadname_num += 1
|
||
msg = "add roadname num:" + str(add_roadname_num)
|
||
console.log(msg)
|
||
# alert(msg)
|
||
|
||
def generate_crossname(self):
|
||
generate_num = 0
|
||
for cross in self.cross_map.values():
|
||
if cross.at_edge:
|
||
continue
|
||
|
||
if len(cross.in_roadids) < 1 or len(cross.out_roadids) < 1:
|
||
# console.log("bad cross: " + cross.id)
|
||
continue
|
||
|
||
if cross.name is not None and cross.name != "unkown":
|
||
# console.log("had cross name: " + cross.name)
|
||
continue
|
||
|
||
in_roadname = self.query_road(cross.in_roadids[0]).name
|
||
for roadid in cross.out_roadids:
|
||
out_roadname = self.query_road(roadid).name
|
||
if in_roadname != out_roadname:
|
||
cross_name = in_roadname + out_roadname + '交叉口'
|
||
cross.name = cross_name
|
||
generate_num += 1
|
||
break
|
||
|
||
console.log("generate cross name num:" + str(generate_num))
|
||
|
||
def load_from_file(self, filename, citycode: int = 0):
|
||
# 在绿波模式下,需要累积加载多个城市的路网,因此,加载时,不执行重置操作
|
||
if not self.is_for_greenwave:
|
||
self.reset()
|
||
f = open(filename, 'r', encoding='utf-8')
|
||
file_content = f.read()
|
||
f.close()
|
||
return self.load(file_content, citycode)
|
||
|
||
def load_from_fix_road_dir_file(self, filename, citycode: int = 0):
|
||
f = open(filename, 'r', encoding='utf-8')
|
||
file_content = f.read()
|
||
f.close()
|
||
data = file_content.replace('\r', '')
|
||
lines = data.split('\n')
|
||
header_fields = lines[0].split(' ')
|
||
road_num = int(header_fields[0])
|
||
line_offset = 1
|
||
for i in range(0, road_num):
|
||
line = lines[i + line_offset]
|
||
roadid = line.split(' ')[0]
|
||
fix_dir = line.split(' ')[1]
|
||
road = self.query_road(roadid)
|
||
if road:
|
||
road.srcDir = fix_dir
|
||
|
||
|
||
|
||
# 传入的文件内容字符串
|
||
def load(self, data, citycode: int = 0):
|
||
data = data.replace('\r', '')
|
||
lines = data.split('\n')
|
||
header_fields = lines[0].split(' ')
|
||
cross_num = int(header_fields[1])
|
||
road_num = int(header_fields[3])
|
||
flow_num = int(header_fields[5])
|
||
# 读取所有路口
|
||
line_offset = 1
|
||
for i in range(0, cross_num):
|
||
line = lines[i + line_offset]
|
||
cross = Cross.deserialize_from_txt(line)
|
||
# # 忽略非市区的路口
|
||
# if not cross.urban:
|
||
# continue
|
||
self.add_cross(cross)
|
||
if cross.id == 'CR_11343097_2361427':
|
||
print('debug load')
|
||
if citycode > 0:
|
||
self.cross2city[cross.id] = citycode
|
||
# 读取所有进口道
|
||
line_offset += cross_num
|
||
for i in range(0, road_num):
|
||
line = lines[i + line_offset]
|
||
self.add_road(Road.deserialize_from_txt(line))
|
||
# 读取所有流向
|
||
line_offset += road_num
|
||
for i in range(0, flow_num):
|
||
line = lines[i + line_offset]
|
||
# 在绿波模式下,不加载flow数据
|
||
if not self.is_for_greenwave:
|
||
self.add_flow(Flow.deserialize_from_txt(line))
|
||
if not self.is_for_greenwave:
|
||
self.clear_inner_roads()
|
||
|
||
def query_artery_crosses(self, crossid_list, is_reversed):
|
||
"""
|
||
查询干线上的路口、路段列表
|
||
:param crossid_list:
|
||
:return:
|
||
"""
|
||
if is_reversed:
|
||
orig_list = crossid_list
|
||
crossid_list = list(orig_list)
|
||
crossid_list.reverse()
|
||
|
||
cur_cross = self.query_cross(crossid_list[0])
|
||
cross_list = [cur_cross]
|
||
road_list = []
|
||
head_srcDir = None
|
||
for i in range(1, len(crossid_list)):
|
||
next_crossid = crossid_list[i]
|
||
|
||
find_road = None
|
||
for roadid in cur_cross.out_roadids:
|
||
road = self.query_road(roadid)
|
||
if road.crossid_to == next_crossid:
|
||
find_road = road
|
||
break
|
||
if not find_road:
|
||
logging.warning("find road between [%s -> %s] fail" % (cur_cross.id, next_crossid))
|
||
return False, False
|
||
if not head_srcDir:
|
||
# 仅当找到第一个路段时,处理一次
|
||
head_srcDir = find_road.srcDir
|
||
# 从第一个路口的进口道中寻找合适的进口道
|
||
for inroadid in cur_cross.in_roadids:
|
||
road = self.query_road(inroadid)
|
||
if road.srcDir == head_srcDir:
|
||
road_list.append(road)
|
||
break
|
||
if len(road_list) <= 0:
|
||
# 没有找到上游合适的进口道
|
||
road_list.append(None)
|
||
#
|
||
road_list.append(find_road)
|
||
# update
|
||
cur_cross = self.query_cross(next_crossid)
|
||
cross_list.append(cur_cross)
|
||
# 再反转回来
|
||
if is_reversed:
|
||
cross_list.reverse()
|
||
road_list.reverse()
|
||
return cross_list, road_list
|
||
|
||
|
||
def get_road_direct_angle(self, roadid):
|
||
""""""
|
||
road = self.query_road(roadid)
|
||
if not road:
|
||
return -1
|
||
else:
|
||
return road.get_direct_angle()
|
||
|
||
def try_connect_cross(self, crossid_from, crossid_to, ignore_sideway=True) -> dict:
|
||
"""
|
||
尝试获得2个路口之间的连接信息
|
||
:param crossid_from:
|
||
:param crossid_to:
|
||
:param ignore_sideway 是否忽略辅路,默认: 忽略
|
||
:return: {'roadid': xxx, 'prev_roadids': [], 'mid_crossids': [], 'reason':'xxx'}
|
||
"""
|
||
# (0) 检查输入参数的有效性
|
||
cross_from = self.query_cross(crossid_from)
|
||
if not cross_from:
|
||
print('missing ' + crossid_from)
|
||
return None
|
||
next_cross = self.query_cross(crossid_to)
|
||
if not next_cross:
|
||
print('missing ' + crossid_to)
|
||
return None
|
||
# (1) 首先,尝试直接连通
|
||
find_road = None
|
||
find_road_list = [] # 存储可能存在的多个road
|
||
for roadid in cross_from.out_roadids:
|
||
road = self.query_road(roadid)
|
||
if ignore_sideway and road.is_sideway():
|
||
continue
|
||
if road.crossid_to == crossid_to:
|
||
find_road_list.append(road)
|
||
# 选择1个road
|
||
if len(find_road_list) == 1:
|
||
find_road = find_road_list[0]
|
||
elif len(find_road_list) > 1:
|
||
# 使用第一个不是辅路的road
|
||
for road in find_road_list:
|
||
if not road.is_sideway():
|
||
find_road = road
|
||
if not find_road:
|
||
# 如果全是辅路,则选择第一个
|
||
find_road = find_road_list[0]
|
||
if find_road:
|
||
res = {'roadid': find_road.id, 'prev_roadids': [], 'mid_crossids': []}
|
||
return res
|
||
# (2)当不能直连时,开始拓展查找模式
|
||
# 计算两个路口之间的角度
|
||
pA = MyPoint(cross_from.location.lng, cross_from.location.lat)
|
||
pB = MyPoint(next_cross.location.lng, next_cross.location.lat)
|
||
direct_angle = GeoFunc.calc_pole_angle(pA, pB)
|
||
# 尝试寻找
|
||
max_skip_cross_num = 5 # 最大允许跨过的非灯路口个数
|
||
find_road = None # 紧挨下游的road
|
||
prev_roadids = [] # 从上游路口到find_road之间的roadid序列
|
||
mid_crossids = [] # 中间跨越的路口ID集合
|
||
tmp_cross = cross_from
|
||
while True:
|
||
min_diff_angle = 360
|
||
cur_direct_road = None
|
||
for roadid in tmp_cross.out_roadids:
|
||
road = self.query_road(roadid)
|
||
# 按需,舍弃辅路的road
|
||
if ignore_sideway and road.is_sideway():
|
||
continue
|
||
if road.crossid_to == crossid_to:
|
||
find_road = road
|
||
break
|
||
# 舍弃连接边缘路口的路段
|
||
tmp_next_cross = self.query_cross(road.crossid_to)
|
||
if not tmp_next_cross or tmp_next_cross.at_edge:
|
||
continue
|
||
else:
|
||
# 找出转角小于60的“最直”的road
|
||
diff_angle = GeoFunc.diff_pole_angle(direct_angle, road.get_direct_angle())
|
||
if diff_angle <= 60 and diff_angle < min_diff_angle:
|
||
cur_direct_road = road
|
||
min_diff_angle = diff_angle
|
||
# 找到了下游路口,直接返回结果
|
||
if find_road:
|
||
res = {'roadid': find_road.id, 'prev_roadids': prev_roadids, 'mid_crossids': mid_crossids}
|
||
return res
|
||
# 如果没有找到符合要求的“直行”road,则返回错误
|
||
if not cur_direct_road:
|
||
# 没有找到直行的road
|
||
print('break at cross: %s. connect[%s %s]' % (tmp_cross.id, crossid_from, crossid_to))
|
||
res = {'roadid': None, 'prev_roadids': prev_roadids, 'mid_crossids': mid_crossids,
|
||
'reason': 'no direct'}
|
||
return res
|
||
# 找到了“直行”road,但尚未找到下游路口, 则继续寻找
|
||
# 记录中间的路段ID和路口ID
|
||
prev_roadids.append(cur_direct_road.id)
|
||
mid_crossids.append(cur_direct_road.crossid_to)
|
||
# 检查一下当前“跨越”情形,是否满足[继续寻找]的要求
|
||
tmp_cross = self.query_cross(cur_direct_road.crossid_to)
|
||
# 跨越了信控路口,直接确认失败
|
||
if tmp_cross.type == CROSS_TYPE_LIGHT and not self.is_midroad_cross(tmp_cross.id): # 信控路口
|
||
res = {'roadid': None, 'prev_roadids': prev_roadids, 'mid_crossids': mid_crossids,
|
||
'reason': 'skip light'}
|
||
print('%s - %s mid %s' % (crossid_from, crossid_to, tmp_cross.id))
|
||
return res
|
||
# 超过最大跨路口个数限制,直接确认失败
|
||
if len(prev_roadids) > max_skip_cross_num:
|
||
# print('too_far %s - %s' % (crossid_from, crossid_to))
|
||
res = {'roadid': None, 'prev_roadids': prev_roadids, 'mid_crossids': mid_crossids, 'reason': 'too far'}
|
||
return res
|
||
|
||
def query_artery_crosses_ex(self, crossid_list, is_reversed, ignore_sideway=True):
|
||
"""
|
||
查询干线上的路口、路段列表
|
||
:param crossid_list:
|
||
:param is_reversed: 是否为逆向
|
||
:param ignore_sideway : 是否忽略辅路,默认:忽略
|
||
:return: cross_list, road_list, prev_roadids_list, mid_crossids_list
|
||
"""
|
||
if is_reversed:
|
||
orig_list = crossid_list
|
||
crossid_list = list(orig_list)
|
||
crossid_list.reverse()
|
||
|
||
cur_cross = self.query_cross(crossid_list[0])
|
||
cross_list = [cur_cross]
|
||
road_list = []
|
||
prev_roadids_list = []
|
||
mid_crossids_list = []
|
||
for i in range(1, len(crossid_list)):
|
||
from_crossid = crossid_list[i - 1]
|
||
to_crossid = crossid_list[i]
|
||
find_res = self.try_connect_cross(from_crossid, to_crossid, ignore_sideway)
|
||
find_road = None
|
||
if find_res['roadid']:
|
||
# 第一个路段,补全一下前向的路段
|
||
if i == 1:
|
||
road_list.append(None)
|
||
prev_roadids_list.append([])
|
||
find_road = self.query_road(find_res['roadid'])
|
||
road_list.append(find_road)
|
||
cross_list.append(self.query_cross(to_crossid))
|
||
prev_roadids_list.append(find_res['prev_roadids'])
|
||
mid_crossids_list.append(find_res['mid_crossids'])
|
||
else:
|
||
# 中间中断了,则立刻返回错误
|
||
print('break with: ' + find_res['reason'])
|
||
return False, False, False, False
|
||
# 再反转回来
|
||
if is_reversed:
|
||
cross_list.reverse()
|
||
road_list.reverse()
|
||
prev_roadids_list.reverse()
|
||
mid_crossids_list.reverse()
|
||
return cross_list, road_list, prev_roadids_list, mid_crossids_list
|
||
|
||
|
||
def query_unidir_wave_crosses(self, crossid_list, roadinfo: 'RoadInfoManager', sideway_start = 0):
|
||
"""
|
||
查询单向绿波上的各路口间的路段连接信息
|
||
@param crossid_list:
|
||
@param roadinfo: 全局的路段属性管理器
|
||
@param sideway_start: 从1开始
|
||
@return:
|
||
"""
|
||
cross_num = len(crossid_list)
|
||
# 设置主辅路连接标记
|
||
sideway_flags = []
|
||
for i in range(0, cross_num):
|
||
if sideway_start > 1 and i >= (sideway_start-1):
|
||
sideway_flags.append(True) # 辅路路口
|
||
else:
|
||
sideway_flags.append(False) # 主路路口
|
||
cur_cross = self.query_cross(crossid_list[0])
|
||
cross_list = [cur_cross]
|
||
road_list = []
|
||
prev_roadids_list = []
|
||
mid_crossids_list = []
|
||
for i in range(1, cross_num):
|
||
from_crossid = crossid_list[i - 1]
|
||
to_crossid = crossid_list[i]
|
||
# find_res = self.try_connect_cross(from_crossid, to_crossid, ignore_sideway)
|
||
find_res = self.super_connect_cross(from_crossid, to_crossid, roadinfo,
|
||
sideway_flags[i-1], sideway_flags[i])
|
||
find_road = None
|
||
if find_res['roadid']:
|
||
# 第一个路段,补全一下前向的路段
|
||
if i == 1:
|
||
road_list.append(None)
|
||
prev_roadids_list.append([])
|
||
find_road = self.query_road(find_res['roadid'])
|
||
road_list.append(find_road)
|
||
cross_list.append(self.query_cross(to_crossid))
|
||
prev_roadids_list.append(find_res['prev_roadids'])
|
||
mid_crossids_list.append(find_res['mid_crossids'])
|
||
else:
|
||
# 中间中断了,则立刻返回错误
|
||
print('break with: ' + find_res['reason'])
|
||
return False, False, False, False
|
||
return cross_list, road_list, prev_roadids_list, mid_crossids_list
|
||
|
||
def find_path(self, in_road: Road, DRidset: set(), acc_len: int, road_list: [], len_limit: int, roadinfo: 'RoadInfoManager'):
|
||
"""
|
||
两个路口之间的寻路
|
||
@param in_road: 将要检查的road对象
|
||
@param DRidset: 末端结果roadid集合
|
||
@param acc_len: 前序路径长度
|
||
@param road_list: 前序路段road数组
|
||
@param len_limit: 里程上限阈值
|
||
@param roadinfo: 公共数据,用于查询road的长度
|
||
@return: [road], reason
|
||
"""
|
||
# 命中末段road
|
||
if in_road.id in DRidset:
|
||
road_list.append(in_road)
|
||
return road_list, 'ok'
|
||
road_len = roadinfo.query_roadinfo(in_road.id).length
|
||
# 里程超限,停止寻路
|
||
if acc_len + road_len >= len_limit:
|
||
return False, 'too_long'
|
||
else:
|
||
# 禁止跨过信控路口
|
||
if self.query_cross(in_road.crossid_to).type == CROSS_TYPE_LIGHT:
|
||
return False, 'skip light'
|
||
# 展开节点
|
||
next_rid_list = self.query_next_staight_roads(in_road.id, )
|
||
# 无下游,停止寻路
|
||
if len(next_rid_list) == 0:
|
||
return False, 'no next'
|
||
tmp_list = list(road_list)
|
||
tmp_list.append(in_road)
|
||
for rid in next_rid_list:
|
||
tmp_road = self.query_road(rid)
|
||
res, reason = self.find_path(tmp_road, DRidset, acc_len + road_len, tmp_list, len_limit, roadinfo)
|
||
# 找到了
|
||
if res:
|
||
return res, 'ok'
|
||
else:
|
||
continue
|
||
# 所有的下游都找不到,则停止寻路
|
||
return False, 'not_connect'
|
||
|
||
def super_connect_cross(self, crossid_from, crossid_to, roadinfo: 'RoadInfoManager', from_sideway=False, to_sideway=True) -> dict:
|
||
"""
|
||
尝试获得2个路口之间的连接信息,默认为:主路连接辅路
|
||
:param crossid_from:
|
||
:param crossid_to:
|
||
:param from_sideway: 前路口是否为辅路连接
|
||
:param to_sideway: 后路口是否为辅路连接
|
||
:return: {'roadid': xxx, 'prev_roadids': [], 'mid_crossids': [], 'reason':'xxx'}
|
||
"""
|
||
# (0) 检查输入参数的有效性
|
||
cross_from = self.query_cross(crossid_from)
|
||
if not cross_from:
|
||
print('missing ' + crossid_from)
|
||
return None
|
||
cross_to = self.query_cross(crossid_to)
|
||
if not cross_to:
|
||
print('missing ' + crossid_to)
|
||
return None
|
||
# (1) 找出Oset 和 Dset 和 len_limit
|
||
ORoadset = set()
|
||
DRidset = set()
|
||
for roadid in cross_from.out_roadids:
|
||
road = self.query_road(roadid)
|
||
is_sideway = road.is_sideway()
|
||
if is_sideway == from_sideway:
|
||
ORoadset.add(road)
|
||
for roadid in cross_to.in_roadids:
|
||
road = self.query_road(roadid)
|
||
is_sideway = road.is_sideway()
|
||
if is_sideway == to_sideway:
|
||
DRidset.add(roadid)
|
||
# 应该改为计算球面距离
|
||
cross_dist = cross_from.location.earth_dist(cross_to.location)
|
||
len_limit = cross_dist * 1.5
|
||
# (2)
|
||
acc_len = 0
|
||
for in_road in ORoadset:
|
||
road_list, reason = self.find_path(in_road, DRidset, acc_len, [], len_limit, roadinfo)
|
||
if not road_list:
|
||
logging.warning('crossid_from: ' + crossid_from + ', crossid_to: ' + crossid_to + ', reason: ' + reason)
|
||
# print('crossid_from: ' + crossid_from + ', crossid_to: ' + crossid_to + ', reason: ' + reason)
|
||
continue
|
||
# TODO: 构造结果: {'roadid': xxx, 'prev_roadids': [], 'mid_crossids': [], 'reason':'xxx'}
|
||
res = {'roadid': road_list[-1].id}
|
||
prev_roadids = []
|
||
mid_crossids = []
|
||
for i in range(0, len(road_list)-1):
|
||
prev_roadids.append(road_list[i].id)
|
||
mid_crossids.append(road_list[i].crossid_to)
|
||
res['prev_roadids'] = prev_roadids
|
||
res['mid_crossids'] = mid_crossids
|
||
return res
|
||
# default end
|
||
return {'roadid': None, 'prev_roadids': [], 'mid_crossids': [], 'reason':'no path'}
|
||
|
||
def pick_symmetrical_roads(self) -> dict[str: int]:
|
||
"""
|
||
从路网中提取出满足相邻路口互为上下游关系的路口和路段集合
|
||
:return: roadid_to_pairid
|
||
"""
|
||
crossid_set = set()
|
||
# 已经被处理过的进口道ID集合
|
||
scaned_inroadid_set = set()
|
||
|
||
pair_id = 0
|
||
roadid_to_pairid = {}
|
||
road_pair_list = []
|
||
|
||
hit_debug = False
|
||
|
||
bad_road_num = 0
|
||
|
||
# 遍历所有的路段
|
||
for (inrodid, road) in self.road_map.items():
|
||
if road.is_sideway():
|
||
# print('ignore sideway %s %s' % (inrodid, road.name))
|
||
continue
|
||
if inrodid in scaned_inroadid_set:
|
||
continue
|
||
else:
|
||
scaned_inroadid_set.add(inrodid)
|
||
|
||
|
||
id_from = road.crossid_from
|
||
id_to = road.crossid_to
|
||
# 寻找连接 id_to -> id_from 的路段
|
||
cross_from = self.query_cross(id_from)
|
||
cross_to = self.query_cross(id_to)
|
||
|
||
# 在 cross_from 的进口道中寻找
|
||
find_reversed_road = False
|
||
for tmp_inroad_id in cross_from.in_roadids:
|
||
tmp_inroad = self.query_road(tmp_inroad_id)
|
||
if not tmp_inroad:
|
||
continue
|
||
# 忽略辅路
|
||
if tmp_inroad.is_sideway():
|
||
continue
|
||
if tmp_inroad.crossid_from == id_to:
|
||
# 找到了对称的路段
|
||
# 构建pair
|
||
scaned_inroadid_set.add(tmp_inroad_id)
|
||
|
||
find_reversed_road = True
|
||
pair_id += 1
|
||
roadid_to_pairid[inrodid] = pair_id
|
||
roadid_to_pairid[tmp_inroad_id] = pair_id
|
||
road_pair_list.append((inrodid, tmp_inroad_id))
|
||
#
|
||
crossid_set.add(id_from)
|
||
crossid_set.add(id_to)
|
||
if hit_debug:
|
||
line_geo = '%s,%s' % (self.query_cross(id_from).location.toStr(),
|
||
self.query_cross(id_to).location.toStr())
|
||
print(line_geo)
|
||
break
|
||
if not find_reversed_road:
|
||
bad_road_num += 1
|
||
if bad_road_num % 100 == 1:
|
||
print('bad_road: ' + road.toString())
|
||
print('bad_road_geo: %s' % (road.get_geo()))
|
||
continue
|
||
print('pick paired_road %d from road count %d' % (len(roadid_to_pairid), len(self.road_map)))
|
||
return roadid_to_pairid
|
||
|
||
def pick_symmetrical_roads_list(self):
|
||
"""
|
||
从路网中提取出满足相邻路口互为上下游关系的路口和路段集合
|
||
:return: roadid_to_pairid
|
||
"""
|
||
crossid_set = set()
|
||
# 已经被处理过的进口道ID集合
|
||
scaned_inroadid_set = set()
|
||
|
||
pair_id = 0
|
||
roadid_to_pairid = {}
|
||
road_pair_list = []
|
||
|
||
hit_debug = False
|
||
|
||
bad_road_num = 0
|
||
|
||
# 遍历所有的路段
|
||
for (inrodid, road) in self.road_map.items():
|
||
|
||
pair_id += 1
|
||
|
||
if inrodid in scaned_inroadid_set:
|
||
continue
|
||
else:
|
||
scaned_inroadid_set.add(inrodid)
|
||
|
||
id_from = road.crossid_from
|
||
id_to = road.crossid_to
|
||
# 寻找连接 id_to -> id_from 的路段
|
||
cross_from = self.query_cross(id_from)
|
||
cross_to = self.query_cross(id_to)
|
||
|
||
# 在 cross_from 的进口道中寻找
|
||
find_reversed_road = False
|
||
for tmp_inroad_id in cross_from.in_roadids:
|
||
tmp_inroad = self.query_road(tmp_inroad_id)
|
||
if not tmp_inroad:
|
||
continue
|
||
if tmp_inroad.crossid_from == id_to:
|
||
# 找到了对称的路段
|
||
# 构建pair
|
||
scaned_inroadid_set.add(tmp_inroad_id)
|
||
|
||
find_reversed_road = True
|
||
pair_id += 1
|
||
roadid_to_pairid[inrodid] = pair_id
|
||
roadid_to_pairid[tmp_inroad_id] = pair_id
|
||
road_pair_list.append([inrodid, tmp_inroad_id])
|
||
#
|
||
crossid_set.add(id_from)
|
||
crossid_set.add(id_to)
|
||
if hit_debug:
|
||
line_geo = '%s,%s' % (self.query_cross(id_from).location.toStr(),
|
||
self.query_cross(id_to).location.toStr())
|
||
print(line_geo)
|
||
break
|
||
if not find_reversed_road:
|
||
bad_road_num += 1
|
||
if bad_road_num % 100 == 1:
|
||
print('bad_road: ' + road.toString())
|
||
print('bad_road_geo: %s' % (road.get_geo()))
|
||
continue
|
||
return road_pair_list
|
||
|
||
def get_road_inflows(self) -> dict[str, list]:
|
||
"""
|
||
获取每个路段的上游各个入流向的路段信息
|
||
:return: {roadid => [直行进入的路段列表,左转进入的路段列表,右转进入的路段列表]}
|
||
"""
|
||
data = {}
|
||
for flow in self.flow_map.values():
|
||
roadid_out = flow.roadid_out
|
||
roadid_in = flow.roadid_in
|
||
turn_type = flow.turn_type
|
||
if turn_type > 2:
|
||
continue
|
||
if roadid_out not in data:
|
||
data[roadid_out] = [[], [], []]
|
||
data[roadid_out][turn_type].append(roadid_in)
|
||
return data
|
||
|
||
|
||
@classmethod
|
||
def load_cross_profile(cls, filename: str) -> set:
|
||
"""
|
||
加载路口画像文件,返回有效路口ID集合
|
||
"""
|
||
crossid_set = set()
|
||
f = open(filename, 'r')
|
||
for line in f:
|
||
line = line.rstrip('\r\n')
|
||
ss = line.split(' ')
|
||
crossid = ss[0]
|
||
good_flag = int(ss[1])
|
||
if good_flag == 1:
|
||
crossid_set.add(crossid)
|
||
f.close()
|
||
return crossid_set
|
||
|
||
def pick_sub_roadnet(self, crossid_set) -> 'RoadNet':
|
||
"""提取并返回子路网"""
|
||
subnet = RoadNet(self.nodeid)
|
||
for (cid, cross) in self.cross_map.items():
|
||
if cid not in crossid_set:
|
||
continue
|
||
else:
|
||
subnet.add_cross(cross)
|
||
for (rid, road) in self.road_map.items():
|
||
if road.crossid_from not in crossid_set and road.crossid_to not in crossid_set:
|
||
continue
|
||
else:
|
||
subnet.add_road(road)
|
||
for (fid, flow) in self.flow_map.items():
|
||
if flow.crossid not in crossid_set:
|
||
continue
|
||
else:
|
||
subnet.add_flow(flow)
|
||
return subnet
|
||
|
||
def clear_inner_roads(self):
|
||
change_cross_num = 0
|
||
for cross in self.cross_map.values():
|
||
inter_set = set(cross.in_roadids).intersection(set(cross.out_roadids))
|
||
if len(inter_set) > 0:
|
||
new_in_roadids = []
|
||
for in_rid in cross.in_roadids:
|
||
if in_rid not in inter_set:
|
||
new_in_roadids.append(in_rid)
|
||
cross.in_roadids = new_in_roadids
|
||
new_out_roadids = []
|
||
for out_rid in cross.out_roadids:
|
||
if out_rid not in inter_set:
|
||
new_out_roadids.append(out_rid)
|
||
cross.out_roadids = new_out_roadids
|
||
change_cross_num += 1
|
||
# end
|
||
if change_cross_num > 0:
|
||
logging.info('clear inner road for cross num: %d' % change_cross_num)
|
||
|
||
def get_other_inroad_info(self, crossid: str, main_inroadids: [], roadinfo: 'RoadInfoManager') -> []:
|
||
"""
|
||
获取指定路口的其他相交进口道ID
|
||
:param crossid: 路口ID
|
||
:param main_inroadids: 已经确定的绿波主路的进口道ID列表
|
||
:return: [ [srcDir, inroadid] ]
|
||
"""
|
||
if crossid not in self.cross_map:
|
||
print('missing cross %s' % crossid)
|
||
return None
|
||
cross = self.query_cross(crossid)
|
||
main_srcDir_set = set()
|
||
for rid in main_inroadids:
|
||
road = self.query_road(rid)
|
||
if not road:
|
||
print('missing road %s of cross %s' % (rid, crossid))
|
||
return False
|
||
main_srcDir_set.add(road.srcDir)
|
||
# slg补充上对向的车流来向
|
||
if len(main_srcDir_set) == 1:
|
||
only_srcDir = list(main_srcDir_set)[0]
|
||
main_srcDir_set.add(g_reversed_srcDir_mapping[only_srcDir])
|
||
|
||
inter_set = set(cross.in_roadids).intersection(set(cross.out_roadids))
|
||
srcDir_to_roads = {}
|
||
for in_rid in cross.in_roadids:
|
||
if in_rid in inter_set:
|
||
continue
|
||
in_road = self.query_road(in_rid)
|
||
if in_road.srcDir in main_srcDir_set:
|
||
continue
|
||
# 按srcDir分组
|
||
if in_road.srcDir in srcDir_to_roads:
|
||
srcDir_to_roads[in_road.srcDir].append(in_road)
|
||
else:
|
||
srcDir_to_roads[in_road.srcDir] = [in_road]
|
||
# [ [srcDir, inroadid] ]
|
||
other_inroad_infos = []
|
||
# 遍历 srcDir_to_roads
|
||
for srcDir, roads in srcDir_to_roads.items():
|
||
if len(roads) == 1:
|
||
other_inroad_infos.append([srcDir, roads[0].id])
|
||
else:
|
||
# 在相同srcDir的多个inroad中,选择最优代表性的一个
|
||
# 如果有主路、有辅路,则
|
||
side_roads = []
|
||
main_roads = []
|
||
for road in roads:
|
||
if road.is_sideway():
|
||
side_roads.append(road)
|
||
else:
|
||
main_roads.append(road)
|
||
# 优先选择主路(非辅路)路段
|
||
selected_roads = []
|
||
if len(main_roads) > 0:
|
||
selected_roads = main_roads
|
||
else:
|
||
selected_roads = side_roads
|
||
# TODO: 从 selected_roads 选出车道个数多的
|
||
pick_road = None
|
||
max_lane_num = 0
|
||
for road in selected_roads:
|
||
rinfo = roadinfo.query_roadinfo(road.id)
|
||
if rinfo.lane_num > max_lane_num:
|
||
max_lane_num = rinfo.lane_num
|
||
pick_road = road
|
||
other_inroad_infos.append([srcDir, pick_road.id])
|
||
# return
|
||
return other_inroad_infos
|
||
|
||
def calc_road_turn_angle(self, road_from: Road, road_to: Road):
|
||
pt0 = self.query_cross(road_from.crossid_from).location
|
||
pt1 = self.query_cross(road_from.crossid_to).location
|
||
pt2 = self.query_cross(road_to.crossid_from).location
|
||
pt3 = self.query_cross(road_to.crossid_to).location
|
||
pA = MyPoint(pt0.lng, pt0.lat)
|
||
pB = MyPoint(pt1.lng, pt1.lat)
|
||
pC = MyPoint(pt2.lng, pt2.lat)
|
||
pD = MyPoint(pt3.lng, pt3.lat)
|
||
turn_angle = abs(GeoFunc.radian2angle(GeoFunc.calc_turn_radian(pA, pB, pC, pD)))
|
||
return turn_angle
|
||
|
||
def calc_road_turn_angle_without_abs(self, road_from: Road, road_to: Road):
|
||
# pt0 = self.query_cross(road_from.crossid_from).location
|
||
# 第一个点,取一定距离的上游几何点
|
||
pt0 = road_from.get_tail_up_pt()
|
||
# pt1 = self.query_cross(road_from.crossid_to).location
|
||
pt1 = road_from.path[-1]
|
||
# pt2 = self.query_cross(road_to.crossid_from).location
|
||
pt2 = road_to.path[0]
|
||
# 最后一个点,取一定距离的下游几何点
|
||
pt3 = road_to.get_head_down_pt()
|
||
# pt3 = self.query_cross(road_to.crossid_to).location
|
||
|
||
pA = MyPoint.makeOne(pt0)
|
||
pB = MyPoint.makeOne(pt1)
|
||
pC = MyPoint.makeOne(pt2)
|
||
pD = MyPoint.makeOne(pt3)
|
||
turn_angle = GeoFunc.radian2angle(GeoFunc.calc_turn_radian(pA, pB, pC, pD))
|
||
# print('%.6f,%.6f,%.6f,%.6f,%.6f,%.6f,%.6f,%.6f => %.2f' % (pA.X,pA.Y, pB.X,pB.Y, pC.X,pC.Y, pD.X,pD.Y, turn_angle))
|
||
return turn_angle
|
||
|
||
def get_straight_road_pairs(self, crossid: str = None, consider_sideway = False):
|
||
"""
|
||
查询指定路口的属于直行关系的in-out路段ID pair
|
||
:param crossid:
|
||
:return:
|
||
"""
|
||
road_pairs = []
|
||
for cross in self.cross_map.values():
|
||
if crossid and cross.id != crossid:
|
||
continue
|
||
inter_set = set(cross.in_roadids).intersection(set(cross.out_roadids))
|
||
# print(inter_set)
|
||
candidate_list = []
|
||
for in_rid in cross.in_roadids:
|
||
if in_rid in inter_set:
|
||
continue
|
||
in_road = self.query_road(in_rid)
|
||
if not in_road or (not consider_sideway and in_road.is_sideway()):
|
||
continue
|
||
for out_rid in cross.out_roadids:
|
||
if out_rid in inter_set:
|
||
continue
|
||
if in_rid == out_rid:
|
||
continue
|
||
out_road = self.query_road(out_rid)
|
||
if not out_road or (not consider_sideway and out_road.is_sideway()):
|
||
continue
|
||
# 不支持主-辅路构成pair
|
||
if in_road.is_sideway() != out_road.is_sideway():
|
||
continue
|
||
# 比较前后两个路段的转角
|
||
turn_angle = self.calc_road_turn_angle(in_road, out_road)
|
||
# print(in_road.name, out_road.name, turn_angle)
|
||
if turn_angle <= 45:
|
||
if in_road.name == out_road.name:
|
||
# 将同名道路的转角减少,以便于排序,排到最前面
|
||
turn_angle = turn_angle - 50
|
||
candidate_list.append([in_rid, out_rid, turn_angle])
|
||
# 按照广义的转角排序
|
||
pair_list = sorted(candidate_list, key=lambda x: x[2])
|
||
# for info in pair_list:
|
||
# print(info[0], info[1], round(info[2], 2))
|
||
# print(pair_list)
|
||
done_inroadid_set = set()
|
||
done_outroadid_set = set()
|
||
inout_name_map = dict()
|
||
# 依次选择全局最“直”的in-out,构成pair
|
||
for info in pair_list:
|
||
in_rid = info[0]
|
||
out_rid = info[1]
|
||
in_name = in_road.name
|
||
out_name = out_road.name
|
||
if in_name in inout_name_map and out_name != inout_name_map[in_name]:
|
||
# 不符合已经确定的出入路名pair,则跳过
|
||
continue
|
||
if out_name in inout_name_map and in_name != inout_name_map[out_name]:
|
||
# 不符合已经确定的出入路名pair,则跳过
|
||
continue
|
||
if in_name not in inout_name_map and out_name not in inout_name_map:
|
||
# 构造出入路名pair
|
||
inout_name_map[in_name] = out_name
|
||
inout_name_map[out_name] = in_name
|
||
# 已经成pair的路段不再参与
|
||
if in_rid in done_inroadid_set or out_rid in done_outroadid_set:
|
||
continue
|
||
road_pairs.append([in_rid, out_rid])
|
||
done_inroadid_set.add(in_rid)
|
||
done_outroadid_set.add(out_rid)
|
||
return road_pairs
|
||
|
||
def get_straight_inroadid(self, crossid: str, out_roadid: str, consider_sideway = False):
|
||
"""
|
||
获取指定路口的出口道的直行进口道ID
|
||
"""
|
||
inout_road_pairs = self.get_straight_road_pairs(crossid, consider_sideway)
|
||
for pair in inout_road_pairs:
|
||
if out_roadid == pair[1]:
|
||
return pair[0]
|
||
# end not find
|
||
return None
|
||
|
||
def get_straight_outroadid(self, crossid: str, in_roadid: str, consider_sideway = False):
|
||
"""
|
||
获取指定路口的进口道的直行出口道ID
|
||
"""
|
||
inout_road_pairs = self.get_straight_road_pairs(crossid, consider_sideway)
|
||
for pair in inout_road_pairs:
|
||
if in_roadid == pair[0]:
|
||
return pair[1]
|
||
# end not find
|
||
return None
|
||
|
||
def get_turn_inroadid(self, crossid: str, out_roadid: str, want_turn_type = 1):
|
||
"""
|
||
获取指定路口的出口道的指定左右转的进口道ID
|
||
@param crossid: 路口ID
|
||
@param out_roadid: 出口道ID
|
||
@param want_turn_type: 要求的转向类型取值:1,2表示:左转 和 右转
|
||
@return: road or None
|
||
"""
|
||
out_road = self.query_road(out_roadid)
|
||
pC = MyPoint.makeOne(out_road.path[0])
|
||
pD = MyPoint.makeOne(out_road.path[-1])
|
||
cur_min_diff = 180
|
||
find_roadid = None
|
||
in_roadids = self.query_cross(crossid).in_roadids
|
||
for in_rid in in_roadids:
|
||
in_road = self.query_road(in_rid)
|
||
pA = MyPoint.makeOne(in_road.path[0])
|
||
pB = MyPoint.makeOne(in_road.path[-1])
|
||
turn_type, diff = GeoFunc.calc_turn_type_ex(pA, pB, pC, pD)
|
||
if turn_type != want_turn_type:
|
||
continue
|
||
if diff < cur_min_diff:
|
||
cur_min_diff = diff
|
||
find_roadid = in_road.id
|
||
return find_roadid
|
||
|
||
def get_turn_outroadid(self, crossid: str, in_roadid: str, want_turn_type = 1):
|
||
"""
|
||
获取指定路口的进口道的指定左右转的出口道ID
|
||
@param crossid:
|
||
@param in_roadid:
|
||
@param want_turn_type:
|
||
@return:
|
||
"""
|
||
in_road = self.query_road(in_roadid)
|
||
pA = MyPoint.makeOne(in_road.path[0])
|
||
pB = MyPoint.makeOne(in_road.path[-1])
|
||
cur_min_diff = 180
|
||
find_roadid = None
|
||
out_roadids = self.query_cross(crossid).out_roadids
|
||
for out_rid in out_roadids:
|
||
out_road = self.query_road(out_rid)
|
||
pC = MyPoint.makeOne(out_road.path[0])
|
||
pD = MyPoint.makeOne(out_road.path[-1])
|
||
turn_type, diff = GeoFunc.calc_turn_type_ex(pA, pB, pC, pD)
|
||
if turn_type != want_turn_type:
|
||
continue
|
||
if diff < cur_min_diff:
|
||
cur_min_diff = diff
|
||
find_roadid = out_road.id
|
||
return find_roadid
|
||
|
||
def do_cross_classify(self, road_to_info: 'RoadInfoManager'):
|
||
"""
|
||
内部执行路口的分级,返回各个等级的路口个数
|
||
"""
|
||
self.cross_rank = dict()
|
||
rank_stat = dict()
|
||
for cross in self.cross_map.values():
|
||
if cross.at_edge:
|
||
continue
|
||
if cross.type != CROSS_TYPE_LIGHT:
|
||
continue
|
||
max_inroad_rank = -1
|
||
for rid in cross.in_roadids:
|
||
roadinfo = road_to_info.query_roadinfo(rid)
|
||
rank = (int)(roadinfo.get_rank() * 2)
|
||
if rank > max_inroad_rank:
|
||
max_inroad_rank = rank
|
||
self.cross_rank[cross.id] = max_inroad_rank
|
||
if max_inroad_rank in rank_stat:
|
||
rank_stat[max_inroad_rank] += 1
|
||
else:
|
||
rank_stat[max_inroad_rank] = 1
|
||
# 统计一下各个rank的数字
|
||
return rank_stat
|
||
|
||
def query_cross_by_bound(self, bound: MapBound, road_to_info: 'RoadInfoManager') -> []:
|
||
"""
|
||
查询指定范围内的信控路口信息
|
||
"""
|
||
info_list = []
|
||
for cross in self.cross_map.values():
|
||
if cross.id == 'CR_11343097_2361427':
|
||
print('debug')
|
||
if cross.at_edge:
|
||
continue
|
||
if cross.type != CROSS_TYPE_LIGHT:
|
||
continue
|
||
loc: Pt = cross.location
|
||
if bound.in_bound(loc.lng, loc.lat):
|
||
corss_roads = cross.in_roadids + cross.out_roadids
|
||
max_rc = 10
|
||
for ro in corss_roads:
|
||
roadinfo = road_to_info.query_roadinfo(ro)
|
||
rc = comparable_rank(roadinfo.roadclass)
|
||
if rc < max_rc:
|
||
max_rc = rc
|
||
if max_rc > 8:
|
||
cross_class = 2
|
||
else:
|
||
cross_class = 1
|
||
info = {'crossid': cross.id, 'name': cross.name, 'location': loc.toStr(), 'type': cross.type,
|
||
'cross_class': cross_class}
|
||
info_list.append(info)
|
||
return info_list
|
||
|
||
def query_cross_by_rank(self, min_rank: int) -> []:
|
||
"""
|
||
查询不低于指定rank的信控路口信息
|
||
"""
|
||
info_list = []
|
||
for cross in self.cross_map.values():
|
||
if cross.at_edge:
|
||
continue
|
||
if cross.type != CROSS_TYPE_LIGHT:
|
||
continue
|
||
if cross.id in self.cross_rank:
|
||
rank = self.cross_rank[cross.id]
|
||
if rank >= min_rank:
|
||
loc: Pt = cross.location
|
||
info = {'crossid': cross.id, 'name': cross.name, 'location': loc.toStr()}
|
||
info_list.append(info)
|
||
return info_list
|
||
|
||
def update_cross_info(self, crossid, cross_info):
|
||
cross = self.cross_map[crossid]
|
||
cross.name = cross_info['name']
|
||
cross.location = Pt(float(cross_info['location'].split(',')[0]), float(cross_info['location'].split(',')[1]))
|
||
|
||
def update_road_net(self, roadid, road_info):
|
||
road = self.road_map[roadid]
|
||
road.srcDir = road_info['src_direct']
|
||
|
||
|
||
|
||
class RoadInfo:
|
||
def __init__(self):
|
||
self.roadid = ''
|
||
self.roadclass = 6
|
||
self.length = 0
|
||
self.lane_num = 1
|
||
self.turninfo = '1'
|
||
self.ffs = 60
|
||
self.speed_limit = 60
|
||
self.capacity = 1800
|
||
|
||
def get_rank(self):
|
||
rc_weight = 6 - get_comparable_rc(self.roadclass)
|
||
return (rc_weight + self.lane_num) / 2
|
||
|
||
|
||
def get_lanenum_of_turn(self, turn_type: int) -> int:
|
||
"""
|
||
获取指定转向类型的车道个数
|
||
:param turn_type:
|
||
:return:
|
||
"""
|
||
if turn_type < 0 or turn_type > 3:
|
||
return 0
|
||
hit_turnset = g_turnset_of_turn_type[turn_type]
|
||
lanenum = 0
|
||
turn_list = self.turninfo.split('|')
|
||
for item in turn_list:
|
||
turn = int(item)
|
||
if turn in hit_turnset:
|
||
lanenum += 1
|
||
return lanenum
|
||
|
||
|
||
def get_straight_left_lanenum(self) -> int:
|
||
"""
|
||
获取直左共用的车道个数
|
||
:param turn_type:
|
||
:return:
|
||
"""
|
||
hit_turnset_0 = g_turnset_of_turn_type[0]
|
||
hit_turnset_1 = g_turnset_of_turn_type[1]
|
||
hit_turnset = hit_turnset_0.intersection(hit_turnset_1)
|
||
lanenum = 0
|
||
turn_list = self.turninfo.split('|')
|
||
for item in turn_list:
|
||
turn = int(item)
|
||
if turn in hit_turnset:
|
||
lanenum += 1
|
||
return lanenum
|
||
|
||
|
||
def to_dict(self):
|
||
info = {}
|
||
info['roadclass'] = self.roadclass
|
||
info['length'] = self.length
|
||
info['lane_num'] = self.lane_num
|
||
info['ffs'] = self.ffs
|
||
info['capacity'] = self.capacity
|
||
if self.turninfo != '':
|
||
ss = self.turninfo.split('|')
|
||
else:
|
||
ss = [1]
|
||
info['lane_turn_list'] = ss
|
||
info['lane_turn_flags'] = turninfo2str(self.turninfo)
|
||
return info
|
||
|
||
@classmethod
|
||
def deserialize_from_txt(cls, line):
|
||
ss = line.split(' ')
|
||
roadinfo = RoadInfo()
|
||
roadinfo.roadid = ss[0]
|
||
roadinfo.roadclass = int(ss[1])
|
||
roadinfo.length = int(ss[2])
|
||
roadinfo.lane_num = int(ss[3])
|
||
roadinfo.turninfo = ss[4]
|
||
roadinfo.ffs = int(ss[5])
|
||
roadinfo.speed_limit = int(ss[6])
|
||
roadinfo.capacity = int(ss[7])
|
||
return roadinfo
|
||
|
||
|
||
# 定义交管路网的路段属性管理器类
|
||
class RoadInfoManager:
|
||
def __init__(self):
|
||
self.roadinfo_map = {}
|
||
|
||
def query_roadinfo(self, roadid) -> RoadInfo:
|
||
return self.roadinfo_map.get(roadid)
|
||
|
||
def load_from_file(self, filename):
|
||
f = open(filename, 'r', encoding='utf-8')
|
||
file_content = f.read()
|
||
f.close()
|
||
data = file_content.replace('\r', ' ')
|
||
lines = data.split('\n')
|
||
header_fields = lines[0].split(' ')
|
||
road_num = int(header_fields[1])
|
||
|
||
line_offset = 1
|
||
for i in range(0, road_num):
|
||
line = lines[i + line_offset]
|
||
roadinfo = RoadInfo.deserialize_from_txt(line)
|
||
self.roadinfo_map[roadinfo.roadid] = roadinfo
|
||
|
||
def toString(self):
|
||
# 统计一下,车道数分布
|
||
lane_stat = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
|
||
rc_stat = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
|
||
for roadinfo in self.roadinfo_map.values():
|
||
lane_num = roadinfo.lane_num
|
||
lane_stat[lane_num] += 1
|
||
rc_stat[roadinfo.roadclass] += 1
|
||
res = ''
|
||
for num in range(0, len(lane_stat)):
|
||
res += "[lane_%d: %d]" % (num, lane_stat[num])
|
||
res += "\n"
|
||
for rc in range(0, len(rc_stat)):
|
||
res += "[rc_%d: %d]" % (rc, rc_stat[rc])
|
||
return res
|
||
|
||
def update_road_info(self, roadid, road_info):
|
||
roadInfo = self.query_roadinfo(roadid)
|
||
roadInfo.roadclass = 6 if not road_info['from_road_class'] else road_info['from_road_class']
|
||
roadInfo.lane_num = road_info['lane_num']
|
||
roadInfo.length = road_info['from_road_length']
|
||
roadInfo.ffs = road_info['ffs']
|
||
roadInfo.capacity = road_info['capacity']
|
||
roadInfo.turninfo = road_info['lane_turn_info']
|
||
roadInfo.lane_turn_list = road_info['lane_turn_info'].split('|') if len(road_info['lane_turn_info']) > 1 else [1]
|
||
roadInfo.lane_turn_flags = turninfo2str(road_info['lane_turn_info'])
|
||
|
||
|
||
def check_point_in_bound(point1, point2, param_point):
|
||
x1, y1 = float(point1.split(',')[0]), float(point1.split(',')[1])
|
||
x2, y2 = float(point2.split(',')[0]), float(point2.split(',')[1])
|
||
x3, y3 = param_point.lng, param_point.lat
|
||
return (x1 <= x3 <= x2) and (y1 <= y3 <= y2)
|
||
|
||
|
||
# 20250904 更新城市边框定义方式 支持一个城市多个框选范围的情况
|
||
def check_point_in_bounds(bounds, param_point):
|
||
lng, lat = float(param_point.toStr().split(',')[0]), float(param_point.toStr().split(',')[1])
|
||
def _inside(poly):
|
||
n, inside = len(poly), False
|
||
j = n - 1
|
||
for i in range(n):
|
||
xi, yi = poly[i]
|
||
xj, yj = poly[j]
|
||
if ((yi > lat) != (yj > lat)) and \
|
||
(lng < (xj - xi) * (lat - yi) / (yj - yi + 1e-12) + xi):
|
||
inside = not inside
|
||
j = i
|
||
return inside
|
||
|
||
for s in bounds:
|
||
coords = list(map(float, s.split(',')))
|
||
if len(coords) < 6 or len(coords) % 2:
|
||
continue
|
||
poly = [(coords[i], coords[i+1]) for i in range(0, len(coords), 2)]
|
||
if _inside(poly):
|
||
return True
|
||
return False
|
||
|
||
|
||
# 20250904 更新城市边框定义方式 支持一个城市多个框选范围的情况 (实现方式2)
|
||
def point_in_any_bound(raw_bounds, param_point):
|
||
"""
|
||
pt: (lng, lat)
|
||
raw_bounds: ["x1,y1,x2,y2,...", "..."]
|
||
"""
|
||
lng, lat = param_point.toStr().split(',')[0], param_point.toStr().split(',')[1]
|
||
for s in raw_bounds:
|
||
coords = list(map(float, s.split(',')))
|
||
if len(coords) < 6 or len(coords) % 2:
|
||
continue
|
||
xy = [(coords[i], coords[i+1]) for i in range(0, len(coords), 2)]
|
||
if Path(xy).contains_point((lng, lat)):
|
||
return True
|
||
return False
|
||
|
||
|
||
if __name__ == '__main__':
|
||
turninfo = '4|1|5'
|
||
print(turninfo2str(turninfo))
|
||
# roadnet = RoadNet('Demo')
|
||
# f = open('D:/slgwork/slgcode/importer_bd/roadnet_demo.txt', 'r', encoding='utf-8')
|
||
# file_content = f.read()
|
||
# f.close()
|
||
# roadnet.load(file_content)
|
||
# print("ok")
|
||
# print(roadnet.toString())
|
||
#
|
||
# save_data = roadnet.serialize_as_txt()
|
||
# f = open("./tmp_roadnet.txt", "w", encoding='utf-8')
|
||
# f.write(save_data)
|
||
# f.close()
|