cross_doctor/app/models.py

1952 lines
72 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from typing import Dict, Any, List
import app.console as console
from app.GeoFunc import *
from app.comm import *
from matplotlib.path import Path
# 车信箭头标记到转向集合的映射关系
g_turn2str = {'1': 's', '2': 'l', '3': 'r', '4': 'ls', '5': 'lr', '6': 't', '7': 'st', '8': 'lt', '9': 'lr',
'10': 'lst', '11': '-', '12': 'lsr', '13': 'rt', '14': 'lrt', '15': 'srt', '16': 'lsrt', '17': 'bus', '19': 'reversible'}
g_turnflag2type = {'s': 0, 'l': 1, 'r': 2, 't': 3}
# 转向类型到车信箭头标记的映射关系
g_turnset_of_turn_type = [{1,4,5,7,10,12,15,16}, {2,4,8,9,10,12,14,16},
{3,5,6,7,8,10,13,14,15,16}, {6,7,8,10,13,15,16}]
def turninfo2str(turninfo):
global g_turn2str
res = []
if turninfo == '':
res.append(g_turn2str['1'])
else:
ss = turninfo.split('|')
for turn in ss:
if turn in g_turn2str.keys():
res.append(g_turn2str[turn])
else:
turn = '1'
res.append(g_turn2str[turn])
return res
def turnflag_to_turn_type(token):
global g_turnflag2type
if token in g_turnflag2type:
return g_turnflag2type[token]
else:
return 0
def get_comparable_rc(rc):
"""
返回取值1~5的有可比性的道路等级
:param rc:
:return:
"""
return rc if rc <= 5 else (rc - 5)
class MapBound:
"""
地图矩形范围
"""
def __init__(self):
self.min_lon = 0
self.min_lat = 0
self.max_lon = 0
self.max_lat = 0
def in_bound(self, lon: float, lat: float):
"""
判断指定坐标是否落入bound内
"""
return self.min_lon < lon < self.max_lon and self.min_lat < lat < self.max_lat
@classmethod
def parse_from_str(cls, bound_str):
bound = MapBound()
try:
lonlat_str_list = bound_str.split(',')
bound.min_lon = float(lonlat_str_list[0])
bound.min_lat = float(lonlat_str_list[1])
bound.max_lon = float(lonlat_str_list[2])
bound.max_lat = float(lonlat_str_list[3])
return bound
except Exception:
return None
# 定义经纬度点
class Pt:
def __init__(self, lng, lat):
self.lng = lng
self.lat = lat
def copy(self):
return Pt(self.lng, self.lat)
def toStr(self):
return "%.6f,%.6f" % (self.lng, self.lat)
def dist(self, other):
d = math.fabs(self.lng - other.lng) + math.fabs(self.lat - other.lat)
# console.log(
# '' + str(self.lng) + ' ' + str(other.lng) + ' ' + str(self.lat) + ' ' + str(other.lat) + ' = ' + str(d))
return d
def sqrt_dist(self, other):
d = math.sqrt((self.lng - other.lng)*(self.lng - other.lng) + (self.lat - other.lat)*(self.lat - other.lat))
return d
CROSS_TYPE_NORMAL = 1
CROSS_TYPE_LIGHT = 2
# // 定义路口类
class Cross:
def __init__(self, lng, lat):
self.id = self.coord2id(lng, lat)
self.at_edge = False
# type 0:普通路口; 1信控路口
self.type = 1
self.location = Pt(lng, lat)
self.name = 'unknown'
self.in_roadids = []
self.out_roadids = []
# 20250922 新增辖区id 辖区名称 信息
self.area_id = 0
self.area_name = ''
def set_as_edge(self):
self.at_edge = True
def judge_if_edge(self):
if len(self.in_roadids) <= 1 and len(self.out_roadids) <= 1:
self.at_edge = True
@staticmethod
def coord2id(lng, lat):
LNG = round(lng * 100000)
LAT = round(lat * 100000)
id = 'CR_' + str(LNG) + '_' + str(LAT)
return id
def add_inroad(self, roadid):
self.in_roadids.append(roadid)
def add_outroad(self, roadid):
self.out_roadids.append(roadid)
def get_location(self):
return str(self.location.lng) + ',' + str(self.location.lat)
def has_light(self) -> bool:
"""是否为信控路口"""
return self.type == 1
def to_dict(self):
info = {}
info['crossid'] = self.id
info['location'] = self.get_location()
info['name'] = self.name
info['type'] = self.type
return info
def toString(self):
return self.id + " coord:" + str(self.location.lng) + ',' + str(self.location.lat) + ' inroad:' + str(
len(self.in_roadid)) + ' outroad:' + str(len(self.out_roadids))
def serialize_as_txt(self):
edge_flag = 1 if self.at_edge else 0
return self.id + ' ' + str(self.location.lng) + ',' + str(self.location.lat) + ' ' + self.name + ' ' + edge_flag
@classmethod
def deserialize_from_txt(cls, line):
ss = line.split(' ')
xys = ss[1].split(',')
lng = float(xys[0])
lat = float(xys[1])
cross = Cross(lng, lat)
cross.id = ss[0]
if len(ss) >= 3:
cross.name = ss[2]
if len(ss) >= 4:
flag = int(ss[3])
cross.at_edge = True if flag == 1 else False
if len(ss) >= 5:
cross.type = int(ss[4])
if cross.type < 1 or cross.type > 2:
console.log('wrong type %s for cross %s' % (ss[4], cross.id))
cross.type = 2
else:
# 如果不是边缘路口,则默认视作灯控路口
if not cross.at_edge:
cross.type = 2
return cross
class Road:
def __init__(self, crossid_from, crossid_to):
self.id = self.make_roadid(crossid_from, crossid_to)
self.crossid_from = crossid_from
self.crossid_to = crossid_to
self.out_flows = []
self.name = 'unknown'
self.path = []
self.srcDir = '' # 车流来向
def make_roadid(self, crossid_from, crossid_to):
return 'RD_' + crossid_from + '_' + crossid_to
def set_name(self, name):
self.name = name
def is_sideway(self):
"""该路段是否为辅路"""
return self.name[-2:] == '辅路'
def set_path(self, geo):
# if self.crossid_to == '6ee88a42611bd4':
# print('debug')
xys = geo.split(',')
pt_num = len(xys) // 2
for i in range(0, pt_num):
self.path.append(Pt(float(xys[2 * i]), float(xys[2 * i + 1])))
self.srcDir = GeoFunc.calc_srcDir_tail(self.path)
# self.srcDir = GeoFunc.calc_srcDir(MyPoint.makeOne(self.path[0]), MyPoint.makeOne(self.path[len(self.path) - 1]))
def get_headfrom_srcDir(self):
"""
获取路段起始段的车流来向
:return:
"""
return GeoFunc.calc_srcDir_head(self.path)
def get_angle_from_head(self):
return GeoFunc.calc_angle_head(self.path)
def get_angle_from_tail(self):
return GeoFunc.calc_angle_tail(self.path)
def toPt(self, point: MyPoint):
pt = Pt()
pt.lng = point.X
pt.lat = point.Y
return pt
def get_tail_up_pt(self):
pt_list = self.path
num = len(pt_list)
pB = MyPoint.makeOne(pt_list[num - 1])
cur_pA_index = num - 2
while True:
pA = MyPoint.makeOne(pt_list[cur_pA_index])
if cur_pA_index <= 0 or pA.get_dist(pB) >= 0.0005:
ppt = MyPoint.makeOne(pt_list[cur_pA_index + 1])
left_dist = 0.0005 - ppt.get_dist(pB)
mid_pt = ppt.calc_left_pt(pA, left_dist)
# print('left_dist %f. dist %f. %.6f,%.6f' % (left_dist, pB.get_dist(mid_pt), mid_pt.X, mid_pt.Y))
return Pt(mid_pt.X, mid_pt.Y)
# return pt_list[cur_pA_index]
cur_pA_index -= 1
return None
def get_head_down_pt(self):
pt_list = self.path
num = len(pt_list)
pA = MyPoint.makeOne(pt_list[0])
cur_pB_index = 1
while True:
pB = MyPoint.makeOne(pt_list[cur_pB_index])
if cur_pB_index >= num - 1 or pA.get_dist(pB) >= 0.0005:
ppt = MyPoint.makeOne(pt_list[cur_pB_index-1])
left_dist = 0.0005 - pA.get_dist(ppt)
mid_pt = ppt.calc_left_pt(pB, left_dist)
# print('left_dist %f. dist %f. %.6f,%.6f' % (left_dist, pA.get_dist(mid_pt), mid_pt.X, mid_pt.Y))
return Pt(mid_pt.X, mid_pt.Y)
# return pt_list[cur_pB_index]
cur_pB_index += 1
return None
def get_geo(self):
ss = []
for pt in self.path:
ss.append(str(pt.lng))
ss.append(str(pt.lat))
return ','.join(ss)
def fill_coords(self, cross_map):
cross_from = cross_map.get(self.crossid_from)
cross_to = cross_map.get(self.crossid_to)
self.path.append(cross_from.location.copy())
self.path.append(cross_to.location.copy())
self.srcDir = GeoFunc.calc_srcDir(MyPoint.makeOne(cross_from.location), MyPoint.makeOne(cross_to.location))
def add_outflow(self, flow):
self.out_flows.append(flow)
def toString(self):
return self.crossid_from + ' => ' + self.crossid_to + ' name:' + self.name
def serialize_as_txt(self):
return self.id + ' ' + self.crossid_from + ' ' + self.crossid_to + ' ' + self.name
@classmethod
def deserialize_from_txt(cls, line):
ss = line.split(' ')
id = ss[0]
crossid_from = ss[1]
crossid_to = ss[2]
road = Road(crossid_from, crossid_to)
road.id = id
road.set_name(ss[3])
if (len(ss) >= 5):
geo = ss[4]
road.set_path(geo)
return road
def get_direct_angle(self):
pt0: Pt = self.path[0]
pt1: Pt = self.path[-1]
pA = MyPoint(pt0.lng, pt0.lat)
pB = MyPoint(pt1.lng, pt1.lat)
return GeoFunc.calc_pole_angle(pA, pB)
# 定义流向类
class Flow:
def __init__(self, crossid, roadid_in, roadid_out):
self.crossid = crossid
self.roadid_in = roadid_in
self.roadid_out = roadid_out
self.turn_type = -1
def getid(self):
return hash('FL_' + self.roadid_in + '_' + str(self.turn_type) + self.roadid_out)
# return 'FL_' + self.roadid_in + '_' + str(self.turn_type)
def toString(self):
return self.crossid + ' ' + self.roadid_in + ' ' + self.roadid_out + ' ' + str(self.turn_type)
def serialize_as_txt(self):
return self.toString()
@classmethod
def deserialize_from_txt(cls, line):
ss = line.split(' ')
flow = Flow(ss[0], ss[1], ss[2])
flow.turn_type = int(ss[3])
return flow
#
class CrossTopo:
"""
路口拓扑,包含各个进口道方向[取值为标准八方向]所对应的转向[取值为0,1,2,3]数组
"""
def __init__(self):
# 存储结构srcDir => [0/1/2/3]数组数组内每个元素表示1个流向
self.srcDir_flows = {}
# 添加一个流向
def add_flow_delay(self, srcDir, turn_type):
data = None
if srcDir in self.srcDir_flows:
data = self.srcDir_flows[srcDir]
else:
data = []
if turn_type not in data:
data.append(turn_type)
self.srcDir_flows[srcDir] = data
def get_srcDir_list(self) -> [str]:
return list(self.srcDir_flows.keys())
def get_srcDir_pairs(self) -> (str, str):
pair_list = []
srcDir_set = set(self.srcDir_flows.keys())
if 'N' in srcDir_set and 'S' in srcDir_set:
pair_list.append(['N', 'S'])
if 'E' in srcDir_set and 'W' in srcDir_set:
pair_list.append(['E', 'W'])
if 'NW' in srcDir_set and 'SE' in srcDir_set:
pair_list.append(['NW', 'SE'])
if 'NE' in srcDir_set and 'SW' in srcDir_set:
pair_list.append(['NE', 'SW'])
if len(pair_list) == 2:
# 仅当存在两组对向时,才返回结果
return pair_list[0], pair_list[1]
else:
return False, False
# 定义交管路网类
class RoadNet:
def __init__(self, nodeid):
self.nodeid = nodeid
# 路口数据
self.cross_map = {}
self.road_map = {}
self.flow_map = {}
self.bound_info = {}
self.is_for_greenwave = False
# 路口的rank信息
self.cross_rank = {}
# 路口到所属citycode的映射关系
self.cross2city = {}
# 疑似过街灯的信号路口集合
self.midroad_cross_idset = set()
def collect_midroad_crosses(self):
"""
识别路网中疑似在道路中间的不是交叉口的信控cross, 收集到self.midroad_cross_idset中
:return:
"""
for cross in self.cross_map.values():
if cross.type == CROSS_TYPE_LIGHT and len(cross.in_roadids) == 2:
name_set = set()
for rid in cross.in_roadids:
road = self.query_road(rid)
name_set.add(road.name)
if len(name_set) == 1:
self.midroad_cross_idset.add(cross.id)
print('collect midroad_cross num: %d' % len(self.midroad_cross_idset))
def is_midroad_cross(self, crossid: str):
"""
判断是不是道路中间的可跨越的信控路口
:param crossid:
:return:
"""
return crossid in self.midroad_cross_idset
def set_for_greenwave(self):
self.is_for_greenwave = True
def add_cross(self, cross):
if self.bound_info and len(self.bound_info) > 0:
bound_info = self.bound_info
for item in bound_info:
area_id = item['area_id']
area_name = item['area_name']
cross.area_id = area_id
cross.area_name = area_name
bound = item['bounds']
if check_point_in_bounds(bound, cross.location):
self.cross_map[cross.id] = cross
else:
continue
elif not self.bound_info:
self.cross_map[cross.id] = cross
def add_road(self, road):
if road.crossid_from == road.crossid_to:
return False
if road.crossid_from not in self.cross_map.keys() and road.crossid_to not in self.cross_map.keys():
return False
self.road_map[road.id] = road
if (len(road.path) < 2):
road.fill_coords(self.cross_map)
# 更新路口的进出口道路
cross_from = self.cross_map.get(road.crossid_from)
if cross_from:
cross_from.add_outroad(road.id)
cross_to = self.cross_map.get(road.crossid_to)
if cross_to:
cross_to.add_inroad(road.id)
return True
def add_bidir_road(self, road):
reversed_road = Road(road.crossid_to, road.crossid_from)
self.add_road(road)
self.add_road(reversed_road)
def add_flow(self, flow):
self.flow_map[flow.getid()] = flow
inroad = self.road_map.get(flow.roadid_in)
if inroad:
inroad.add_outflow(flow.getid())
# 自动生成所有的flow
def fill_allflows(self):
self.flow_map.clear()
for road in self.road_map.values():
road.out_flows = []
add_num = 0
# 针对每个路口的每一组in-out路段都生成一个flow
for (id, cross) in self.cross_map.items():
in_roads = cross.in_roadids
out_roads = cross.out_roadids
# 遍历进口道
for in_roadid in in_roads:
# 遍历出口道
for out_roadid in out_roads:
flow = Flow(id, in_roadid, out_roadid)
flow.turn_type = self.calc_turn_type(in_roadid, out_roadid)
self.add_flow(flow)
add_num += 1
print("add flow %d, final get flow %d" % (add_num, len(self.flow_map)))
def query_cross(self, crossid) -> Cross:
return self.cross_map.get(crossid)
def query_road(self, roadid) -> Road:
return self.road_map.get(roadid)
def query_road_2(self, crossid, srcDir) -> Road:
"""查询路口指定来车方向的进口道"""
inroad_ids = self.query_cross(crossid).in_roadids
for rid in inroad_ids:
road = self.query_road(rid)
if road.srcDir == srcDir:
return road
return None
def query_next_roads(self, roadid, srcDir) -> [str]:
"""查询下游路段"""
res = []
road = self.query_road(roadid)
next_roadids = self.query_cross(road.crossid_to).out_roadids
for rid in next_roadids:
next_road = self.query_road(rid)
if next_road.srcDir == srcDir:
res.append(rid)
return res
def query_next_staight_roads(self, roadid, angle_limit = 60):
"""
查找路段的下游近似直行的road
@param roadid: 输入roadid
@param angle_limit: 转角限制默认60度
@return: [roadid]
"""
res = []
this_road = self.query_road(roadid)
tmp_cross = self.query_cross(this_road.crossid_to)
direct_angle = this_road.get_direct_angle()
for roadid in tmp_cross.out_roadids:
road = self.query_road(roadid)
# 找出转角小于60的road
diff_angle = GeoFunc.diff_pole_angle(direct_angle, road.get_direct_angle())
if diff_angle <= angle_limit:
res.append(road.id)
return res
def query_prev_roads(self, roadid, srcDir) -> [str]:
"""查询上游路段"""
res = []
road = self.query_road(roadid)
prev_roadids: list[str] = self.query_cross(road.crossid_from).in_roadids
for rid in prev_roadids:
prev_road = self.query_road(rid)
if not prev_road:
print('missing query_prev_roads roadid %s from road %s srcDir %s' % (rid, roadid, srcDir))
if prev_road.srcDir == srcDir:
res.append(rid)
return res
def query_main_side_road_pair(self):
"""
获取路网内包含的疑似主辅路的roadid的pair
:return:[(roadid1, roadid2)]
"""
pair_list = []
# 遍历路口
for cross in self.cross_map.values():
if cross.at_edge:
continue
inroad_num = len(cross.in_roadids)
for i in range(0, inroad_num):
for j in range(i+1, inroad_num):
rid1 = cross.in_roadids[i]
rid2 = cross.in_roadids[j]
road1 = self.query_road(rid1)
road2 = self.query_road(rid2)
if road1.crossid_from == road2.crossid_from and road1.srcDir == road2.srcDir:
pair_list.append((rid1, rid2))
# return pair_list
return pair_list
def query_flow(self, flowid) -> Flow:
return self.flow_map.get(flowid)
def get_crossid_list(self):
res = []
for id in self.cross_map.keys():
res.append(id)
return res
def get_valid_crossid_list(self) -> []:
res = []
for (crossid, cross) in self.cross_map.items():
if cross.at_edge:
continue
else:
res.append(crossid)
return res
# g_cross_list
def fill_valid_crossids(self, crossid_list) -> int:
fill_num = 0
for (crossid, cross) in self.cross_map.items():
if cross.at_edge:
continue
else:
crossid_list.append(crossid)
fill_num += 1
return fill_num
def fill_valid_crossinfo(self, cross_list) -> int:
fill_num = 0
for (crossid, cross) in self.cross_map.items():
if cross.at_edge:
continue
# elif len(cross.in_roadids) < 4:
# continue
else:
cross_list.append({'crossid': crossid, 'type': cross.type, 'name': cross.name, 'location': cross.location.toStr()})
fill_num += 1
return fill_num
def get_crossroads(self) -> []:
"""
获取十字路口的信息数组
:return:
"""
cross_list = []
for (crossid, cross) in self.cross_map.items():
if cross.at_edge:
continue
else:
# 进一步判断是否为十字路口
if len(cross.in_roadids) >= 4:
cross_list.append([crossid, cross.name, cross.location.toStr()])
return cross_list
def fill_crossroads(self, crossid_set) -> []:
crossid_set.clear()
for (crossid, cross) in self.cross_map.items():
if cross.at_edge:
continue
else:
# 进一步判断是否为十字路口
if len(cross.in_roadids) >= 4:
crossid_set.add(crossid)
return crossid_set
def get_cross_allflow(self, crossid) -> {}:
flows = {}
for (id, flow) in self.flow_map.items():
if flow.crossid == crossid:
flows[id] = flow
return flows
def get_cross_topo(self, crossid) -> CrossTopo:
cross = self.cross_map.get(crossid)
if not cross:
return False
topo = CrossTopo()
in_roads = cross.in_roadids
# 遍历进口道
for in_roadid in in_roads:
inroad = self.query_road(in_roadid)
srcDir = inroad.srcDir
for flowid in inroad.out_flows:
turn_type = self.query_flow(flowid).turn_type
topo.add_flow_delay(srcDir, turn_type)
return topo
def get_adjcross(self, crossid):
cross = self.cross_map.get(crossid)
adj_cross = {}
for inroad_id in cross.in_roadids:
road = self.query_road(inroad_id)
adj_cross[road.srcDir] = self.query_cross(road.crossid_from)
return adj_cross
def get_adjcross_from(self, crossid, srcDir):
cross = self.cross_map.get(crossid)
adj_cross = None
for inroad_id in cross.in_roadids:
road = self.query_road(inroad_id)
if road.srcDir == srcDir:
adj_cross = self.query_cross(road.crossid_from)
return adj_cross
def search_cross(self, keyword):
cross_list = []
for cross in self.cross_map.values():
if cross.name.find(keyword) >= 0:
cross_list.append(cross)
return cross_list
# 根据位置查询路口
def hit_cross(self, lng, lat):
pos = Pt(lng, lat)
# 遍历map
for (id, cross) in self.cross_map.items():
if (cross.location.dist(pos) < 0.001):
console.log("hit_cross: " + id)
return id
else:
console.log("hit_cross: falsse")
return None
def query_cross_all_srcDir(self, crossid):
srcDir_list = []
cross = self.cross_map.get(crossid)
for roadid in cross.in_roadids:
road = self.road_map.get(roadid)
srcDir_list.append(road.srcDir)
return srcDir_list
def toString(self):
return ('RoadNet:%s cross_num:%d road_num:%d flow:%d'
% (self.nodeid, len(self.cross_map), len(self.road_map), len(self.flow_map)))
def serialize_as_txt(self):
# 遍历路口
lines = []
cr_num = len(self.cross_map)
road_num = len(self.road_map)
flow_num = len(self.flow_map)
headline = "cross %d road %d flow %d" % (cr_num, road_num, flow_num)
lines.append(headline)
# 遍历路口
for (id, cross) in self.cross_map.items():
lines.append(cross.serialize_as_txt())
# 遍历路段
for (id, road) in self.road_map.items():
lines.append(road.serialize_as_txt())
# 遍历流向
for (id, flow) in self.flow_map.items():
lines.append(flow.serialize_as_txt())
# 连接并返回
return '\n'.join(lines)
def reset(self):
self.cross_map.clear()
self.road_map.clear()
self.flow_map.clear()
def calc_turn_type(self, in_roadid, out_roadid):
in_road = self.query_road(in_roadid)
out_road = self.query_road(out_roadid)
if None == in_road or None == out_road:
return None
# # 简单地,根据入和出路段的首末点,来计算转角
# pA = MyPoint.makeOne(in_road.path[0])
# pB = MyPoint.makeOne(in_road.path[1])
# pC = MyPoint.makeOne(out_road.path[0])
# pD = MyPoint.makeOne(out_road.path[1])
# return GeoFunc.calc_turn_type(pA, pB, pC, pD)
# 改为:基于进口道末端几何 和 出口道开头几何 的 转角计算
pA, pB = GeoFunc.pick_path_tail(in_road.path)
pC, pD = GeoFunc.pick_path_head(out_road.path)
return GeoFunc.calc_turn_type(pA, pB, pC, pD)
def complete_roadname(self):
# 遍历road
add_roadname_num = 0
for road in self.road_map.values():
if road.name is not None and road.name != "unknown":
continue
for other in self.road_map.values():
if other.name is not None and other.name != "unknown":
if (road.crossid_from == other.crossid_to
and road.crossid_to == other.crossid_from):
road.set_name('' + other.name)
add_roadname_num += 1
msg = "add roadname num:" + str(add_roadname_num)
console.log(msg)
# alert(msg)
def generate_crossname(self):
generate_num = 0
for cross in self.cross_map.values():
if cross.at_edge:
continue
if len(cross.in_roadids) < 1 or len(cross.out_roadids) < 1:
# console.log("bad cross: " + cross.id)
continue
if cross.name is not None and cross.name != "unkown":
# console.log("had cross name: " + cross.name)
continue
in_roadname = self.query_road(cross.in_roadids[0]).name
for roadid in cross.out_roadids:
out_roadname = self.query_road(roadid).name
if in_roadname != out_roadname:
cross_name = in_roadname + out_roadname + '交叉口'
cross.name = cross_name
generate_num += 1
break
console.log("generate cross name num:" + str(generate_num))
def load_from_file(self, filename, citycode: int = 0):
# 在绿波模式下,需要累积加载多个城市的路网,因此,加载时,不执行重置操作
if not self.is_for_greenwave:
self.reset()
f = open(filename, 'r', encoding='utf-8')
file_content = f.read()
f.close()
return self.load(file_content, citycode)
def load_from_fix_road_dir_file(self, filename, citycode: int = 0):
f = open(filename, 'r', encoding='utf-8')
file_content = f.read()
f.close()
data = file_content.replace('\r', '')
lines = data.split('\n')
header_fields = lines[0].split(' ')
road_num = int(header_fields[0])
line_offset = 1
for i in range(0, road_num):
line = lines[i + line_offset]
roadid = line.split(' ')[0]
fix_dir = line.split(' ')[1]
road = self.query_road(roadid)
if road:
road.srcDir = fix_dir
# 传入的文件内容字符串
def load(self, data, citycode: int = 0):
data = data.replace('\r', '')
lines = data.split('\n')
header_fields = lines[0].split(' ')
cross_num = int(header_fields[1])
road_num = int(header_fields[3])
flow_num = int(header_fields[5])
# 读取所有路口
line_offset = 1
for i in range(0, cross_num):
line = lines[i + line_offset]
cross = Cross.deserialize_from_txt(line)
# # 忽略非市区的路口
# if not cross.urban:
# continue
self.add_cross(cross)
if cross.id == 'CR_11343097_2361427':
print('debug load')
if citycode > 0:
self.cross2city[cross.id] = citycode
# 读取所有进口道
line_offset += cross_num
for i in range(0, road_num):
line = lines[i + line_offset]
self.add_road(Road.deserialize_from_txt(line))
# 读取所有流向
line_offset += road_num
for i in range(0, flow_num):
line = lines[i + line_offset]
# 在绿波模式下不加载flow数据
if not self.is_for_greenwave:
self.add_flow(Flow.deserialize_from_txt(line))
if not self.is_for_greenwave:
self.clear_inner_roads()
def query_artery_crosses(self, crossid_list, is_reversed):
"""
查询干线上的路口、路段列表
:param crossid_list:
:return:
"""
if is_reversed:
orig_list = crossid_list
crossid_list = list(orig_list)
crossid_list.reverse()
cur_cross = self.query_cross(crossid_list[0])
cross_list = [cur_cross]
road_list = []
head_srcDir = None
for i in range(1, len(crossid_list)):
next_crossid = crossid_list[i]
find_road = None
for roadid in cur_cross.out_roadids:
road = self.query_road(roadid)
if road.crossid_to == next_crossid:
find_road = road
break
if not find_road:
logging.warning("find road between [%s -> %s] fail" % (cur_cross.id, next_crossid))
return False, False
if not head_srcDir:
# 仅当找到第一个路段时,处理一次
head_srcDir = find_road.srcDir
# 从第一个路口的进口道中寻找合适的进口道
for inroadid in cur_cross.in_roadids:
road = self.query_road(inroadid)
if road.srcDir == head_srcDir:
road_list.append(road)
break
if len(road_list) <= 0:
# 没有找到上游合适的进口道
road_list.append(None)
#
road_list.append(find_road)
# update
cur_cross = self.query_cross(next_crossid)
cross_list.append(cur_cross)
# 再反转回来
if is_reversed:
cross_list.reverse()
road_list.reverse()
return cross_list, road_list
def get_road_direct_angle(self, roadid):
""""""
road = self.query_road(roadid)
if not road:
return -1
else:
return road.get_direct_angle()
def try_connect_cross(self, crossid_from, crossid_to, ignore_sideway=True) -> dict:
"""
尝试获得2个路口之间的连接信息
:param crossid_from:
:param crossid_to:
:param ignore_sideway 是否忽略辅路,默认: 忽略
:return: {'roadid': xxx, 'prev_roadids': [], 'mid_crossids': [], 'reason':'xxx'}
"""
# (0) 检查输入参数的有效性
cross_from = self.query_cross(crossid_from)
if not cross_from:
print('missing ' + crossid_from)
return None
next_cross = self.query_cross(crossid_to)
if not next_cross:
print('missing ' + crossid_to)
return None
# (1) 首先,尝试直接连通
find_road = None
find_road_list = [] # 存储可能存在的多个road
for roadid in cross_from.out_roadids:
road = self.query_road(roadid)
if ignore_sideway and road.is_sideway():
continue
if road.crossid_to == crossid_to:
find_road_list.append(road)
# 选择1个road
if len(find_road_list) == 1:
find_road = find_road_list[0]
elif len(find_road_list) > 1:
# 使用第一个不是辅路的road
for road in find_road_list:
if not road.is_sideway():
find_road = road
if not find_road:
# 如果全是辅路,则选择第一个
find_road = find_road_list[0]
if find_road:
res = {'roadid': find_road.id, 'prev_roadids': [], 'mid_crossids': []}
return res
# (2)当不能直连时,开始拓展查找模式
# 计算两个路口之间的角度
pA = MyPoint(cross_from.location.lng, cross_from.location.lat)
pB = MyPoint(next_cross.location.lng, next_cross.location.lat)
direct_angle = GeoFunc.calc_pole_angle(pA, pB)
# 尝试寻找
max_skip_cross_num = 5 # 最大允许跨过的非灯路口个数
find_road = None # 紧挨下游的road
prev_roadids = [] # 从上游路口到find_road之间的roadid序列
mid_crossids = [] # 中间跨越的路口ID集合
tmp_cross = cross_from
while True:
min_diff_angle = 360
cur_direct_road = None
for roadid in tmp_cross.out_roadids:
road = self.query_road(roadid)
# 按需舍弃辅路的road
if ignore_sideway and road.is_sideway():
continue
if road.crossid_to == crossid_to:
find_road = road
break
# 舍弃连接边缘路口的路段
tmp_next_cross = self.query_cross(road.crossid_to)
if not tmp_next_cross or tmp_next_cross.at_edge:
continue
else:
# 找出转角小于60的“最直”的road
diff_angle = GeoFunc.diff_pole_angle(direct_angle, road.get_direct_angle())
if diff_angle <= 60 and diff_angle < min_diff_angle:
cur_direct_road = road
min_diff_angle = diff_angle
# 找到了下游路口,直接返回结果
if find_road:
res = {'roadid': find_road.id, 'prev_roadids': prev_roadids, 'mid_crossids': mid_crossids}
return res
# 如果没有找到符合要求的“直行”road则返回错误
if not cur_direct_road:
# 没有找到直行的road
print('break at cross: %s. connect[%s %s]' % (tmp_cross.id, crossid_from, crossid_to))
res = {'roadid': None, 'prev_roadids': prev_roadids, 'mid_crossids': mid_crossids,
'reason': 'no direct'}
return res
# 找到了“直行”road但尚未找到下游路口, 则继续寻找
# 记录中间的路段ID和路口ID
prev_roadids.append(cur_direct_road.id)
mid_crossids.append(cur_direct_road.crossid_to)
# 检查一下当前“跨越”情形,是否满足[继续寻找]的要求
tmp_cross = self.query_cross(cur_direct_road.crossid_to)
# 跨越了信控路口,直接确认失败
if tmp_cross.type == CROSS_TYPE_LIGHT and not self.is_midroad_cross(tmp_cross.id): # 信控路口
res = {'roadid': None, 'prev_roadids': prev_roadids, 'mid_crossids': mid_crossids,
'reason': 'skip light'}
print('%s - %s mid %s' % (crossid_from, crossid_to, tmp_cross.id))
return res
# 超过最大跨路口个数限制,直接确认失败
if len(prev_roadids) > max_skip_cross_num:
# print('too_far %s - %s' % (crossid_from, crossid_to))
res = {'roadid': None, 'prev_roadids': prev_roadids, 'mid_crossids': mid_crossids, 'reason': 'too far'}
return res
def query_artery_crosses_ex(self, crossid_list, is_reversed, ignore_sideway=True):
"""
查询干线上的路口、路段列表
:param crossid_list:
:param is_reversed: 是否为逆向
:param ignore_sideway : 是否忽略辅路,默认:忽略
:return: cross_list, road_list, prev_roadids_list, mid_crossids_list
"""
if is_reversed:
orig_list = crossid_list
crossid_list = list(orig_list)
crossid_list.reverse()
cur_cross = self.query_cross(crossid_list[0])
cross_list = [cur_cross]
road_list = []
prev_roadids_list = []
mid_crossids_list = []
for i in range(1, len(crossid_list)):
from_crossid = crossid_list[i - 1]
to_crossid = crossid_list[i]
find_res = self.try_connect_cross(from_crossid, to_crossid, ignore_sideway)
find_road = None
if find_res['roadid']:
# 第一个路段,补全一下前向的路段
if i == 1:
road_list.append(None)
prev_roadids_list.append([])
find_road = self.query_road(find_res['roadid'])
road_list.append(find_road)
cross_list.append(self.query_cross(to_crossid))
prev_roadids_list.append(find_res['prev_roadids'])
mid_crossids_list.append(find_res['mid_crossids'])
else:
# 中间中断了,则立刻返回错误
print('break with: ' + find_res['reason'])
return False, False, False, False
# 再反转回来
if is_reversed:
cross_list.reverse()
road_list.reverse()
prev_roadids_list.reverse()
mid_crossids_list.reverse()
return cross_list, road_list, prev_roadids_list, mid_crossids_list
def query_unidir_wave_crosses(self, crossid_list, roadinfo: 'RoadInfoManager', sideway_start = 0):
"""
查询单向绿波上的各路口间的路段连接信息
@param crossid_list:
@param roadinfo: 全局的路段属性管理器
@param sideway_start: 从1开始
@return:
"""
cross_num = len(crossid_list)
# 设置主辅路连接标记
sideway_flags = []
for i in range(0, cross_num):
if sideway_start > 1 and i >= (sideway_start-1):
sideway_flags.append(True) # 辅路路口
else:
sideway_flags.append(False) # 主路路口
cur_cross = self.query_cross(crossid_list[0])
cross_list = [cur_cross]
road_list = []
prev_roadids_list = []
mid_crossids_list = []
for i in range(1, cross_num):
from_crossid = crossid_list[i - 1]
to_crossid = crossid_list[i]
# find_res = self.try_connect_cross(from_crossid, to_crossid, ignore_sideway)
find_res = self.super_connect_cross(from_crossid, to_crossid, roadinfo,
sideway_flags[i-1], sideway_flags[i])
find_road = None
if find_res['roadid']:
# 第一个路段,补全一下前向的路段
if i == 1:
road_list.append(None)
prev_roadids_list.append([])
find_road = self.query_road(find_res['roadid'])
road_list.append(find_road)
cross_list.append(self.query_cross(to_crossid))
prev_roadids_list.append(find_res['prev_roadids'])
mid_crossids_list.append(find_res['mid_crossids'])
else:
# 中间中断了,则立刻返回错误
print('break with: ' + find_res['reason'])
return False, False, False, False
return cross_list, road_list, prev_roadids_list, mid_crossids_list
def find_path(self, in_road: Road, DRidset: set(), acc_len: int, road_list: [], len_limit: int, roadinfo: 'RoadInfoManager'):
"""
两个路口之间的寻路
@param in_road: 将要检查的road对象
@param DRidset: 末端结果roadid集合
@param acc_len: 前序路径长度
@param road_list: 前序路段road数组
@param len_limit: 里程上限阈值
@param roadinfo: 公共数据用于查询road的长度
@return: [road], reason
"""
# 命中末段road
if in_road.id in DRidset:
road_list.append(in_road)
return road_list, 'ok'
road_len = roadinfo.query_roadinfo(in_road.id).length
# 里程超限,停止寻路
if acc_len + road_len >= len_limit:
return False, 'too_long'
else:
# 禁止跨过信控路口
if self.query_cross(in_road.crossid_to).type == CROSS_TYPE_LIGHT:
return False, 'skip light'
# 展开节点
next_rid_list = self.query_next_staight_roads(in_road.id, )
# 无下游,停止寻路
if len(next_rid_list) == 0:
return False, 'no next'
tmp_list = list(road_list)
tmp_list.append(in_road)
for rid in next_rid_list:
tmp_road = self.query_road(rid)
res, reason = self.find_path(tmp_road, DRidset, acc_len + road_len, tmp_list, len_limit, roadinfo)
# 找到了
if res:
return res, 'ok'
else:
continue
# 所有的下游都找不到,则停止寻路
return False, 'not_connect'
def super_connect_cross(self, crossid_from, crossid_to, roadinfo: 'RoadInfoManager', from_sideway=False, to_sideway=True) -> dict:
"""
尝试获得2个路口之间的连接信息默认为主路连接辅路
:param crossid_from:
:param crossid_to:
:param from_sideway: 前路口是否为辅路连接
:param to_sideway: 后路口是否为辅路连接
:return: {'roadid': xxx, 'prev_roadids': [], 'mid_crossids': [], 'reason':'xxx'}
"""
# (0) 检查输入参数的有效性
cross_from = self.query_cross(crossid_from)
if not cross_from:
print('missing ' + crossid_from)
return None
cross_to = self.query_cross(crossid_to)
if not cross_to:
print('missing ' + crossid_to)
return None
# (1) 找出Oset 和 Dset 和 len_limit
ORoadset = set()
DRidset = set()
for roadid in cross_from.out_roadids:
road = self.query_road(roadid)
is_sideway = road.is_sideway()
if is_sideway == from_sideway:
ORoadset.add(road)
for roadid in cross_to.in_roadids:
road = self.query_road(roadid)
is_sideway = road.is_sideway()
if is_sideway == to_sideway:
DRidset.add(roadid)
# 应该改为计算球面距离
cross_dist = cross_from.location.earth_dist(cross_to.location)
len_limit = cross_dist * 1.5
# (2)
acc_len = 0
for in_road in ORoadset:
road_list, reason = self.find_path(in_road, DRidset, acc_len, [], len_limit, roadinfo)
if not road_list:
logging.warning('crossid_from: ' + crossid_from + ', crossid_to: ' + crossid_to + ', reason: ' + reason)
# print('crossid_from: ' + crossid_from + ', crossid_to: ' + crossid_to + ', reason: ' + reason)
continue
# TODO: 构造结果: {'roadid': xxx, 'prev_roadids': [], 'mid_crossids': [], 'reason':'xxx'}
res = {'roadid': road_list[-1].id}
prev_roadids = []
mid_crossids = []
for i in range(0, len(road_list)-1):
prev_roadids.append(road_list[i].id)
mid_crossids.append(road_list[i].crossid_to)
res['prev_roadids'] = prev_roadids
res['mid_crossids'] = mid_crossids
return res
# default end
return {'roadid': None, 'prev_roadids': [], 'mid_crossids': [], 'reason':'no path'}
def pick_symmetrical_roads(self) -> dict[str: int]:
"""
从路网中提取出满足相邻路口互为上下游关系的路口和路段集合
:return: roadid_to_pairid
"""
crossid_set = set()
# 已经被处理过的进口道ID集合
scaned_inroadid_set = set()
pair_id = 0
roadid_to_pairid = {}
road_pair_list = []
hit_debug = False
bad_road_num = 0
# 遍历所有的路段
for (inrodid, road) in self.road_map.items():
if road.is_sideway():
# print('ignore sideway %s %s' % (inrodid, road.name))
continue
if inrodid in scaned_inroadid_set:
continue
else:
scaned_inroadid_set.add(inrodid)
id_from = road.crossid_from
id_to = road.crossid_to
# 寻找连接 id_to -> id_from 的路段
cross_from = self.query_cross(id_from)
cross_to = self.query_cross(id_to)
# 在 cross_from 的进口道中寻找
find_reversed_road = False
for tmp_inroad_id in cross_from.in_roadids:
tmp_inroad = self.query_road(tmp_inroad_id)
if not tmp_inroad:
continue
# 忽略辅路
if tmp_inroad.is_sideway():
continue
if tmp_inroad.crossid_from == id_to:
# 找到了对称的路段
# 构建pair
scaned_inroadid_set.add(tmp_inroad_id)
find_reversed_road = True
pair_id += 1
roadid_to_pairid[inrodid] = pair_id
roadid_to_pairid[tmp_inroad_id] = pair_id
road_pair_list.append((inrodid, tmp_inroad_id))
#
crossid_set.add(id_from)
crossid_set.add(id_to)
if hit_debug:
line_geo = '%s,%s' % (self.query_cross(id_from).location.toStr(),
self.query_cross(id_to).location.toStr())
print(line_geo)
break
if not find_reversed_road:
bad_road_num += 1
if bad_road_num % 100 == 1:
print('bad_road: ' + road.toString())
print('bad_road_geo: %s' % (road.get_geo()))
continue
print('pick paired_road %d from road count %d' % (len(roadid_to_pairid), len(self.road_map)))
return roadid_to_pairid
def pick_symmetrical_roads_list(self):
"""
从路网中提取出满足相邻路口互为上下游关系的路口和路段集合
:return: roadid_to_pairid
"""
crossid_set = set()
# 已经被处理过的进口道ID集合
scaned_inroadid_set = set()
pair_id = 0
roadid_to_pairid = {}
road_pair_list = []
hit_debug = False
bad_road_num = 0
# 遍历所有的路段
for (inrodid, road) in self.road_map.items():
pair_id += 1
if inrodid in scaned_inroadid_set:
continue
else:
scaned_inroadid_set.add(inrodid)
id_from = road.crossid_from
id_to = road.crossid_to
# 寻找连接 id_to -> id_from 的路段
cross_from = self.query_cross(id_from)
cross_to = self.query_cross(id_to)
# 在 cross_from 的进口道中寻找
find_reversed_road = False
for tmp_inroad_id in cross_from.in_roadids:
tmp_inroad = self.query_road(tmp_inroad_id)
if not tmp_inroad:
continue
if tmp_inroad.crossid_from == id_to:
# 找到了对称的路段
# 构建pair
scaned_inroadid_set.add(tmp_inroad_id)
find_reversed_road = True
pair_id += 1
roadid_to_pairid[inrodid] = pair_id
roadid_to_pairid[tmp_inroad_id] = pair_id
road_pair_list.append([inrodid, tmp_inroad_id])
#
crossid_set.add(id_from)
crossid_set.add(id_to)
if hit_debug:
line_geo = '%s,%s' % (self.query_cross(id_from).location.toStr(),
self.query_cross(id_to).location.toStr())
print(line_geo)
break
if not find_reversed_road:
bad_road_num += 1
if bad_road_num % 100 == 1:
print('bad_road: ' + road.toString())
print('bad_road_geo: %s' % (road.get_geo()))
continue
return road_pair_list
def get_road_inflows(self) -> dict[str, list]:
"""
获取每个路段的上游各个入流向的路段信息
:return: {roadid => [直行进入的路段列表,左转进入的路段列表,右转进入的路段列表]}
"""
data = {}
for flow in self.flow_map.values():
roadid_out = flow.roadid_out
roadid_in = flow.roadid_in
turn_type = flow.turn_type
if turn_type > 2:
continue
if roadid_out not in data:
data[roadid_out] = [[], [], []]
data[roadid_out][turn_type].append(roadid_in)
return data
@classmethod
def load_cross_profile(cls, filename: str) -> set:
"""
加载路口画像文件返回有效路口ID集合
"""
crossid_set = set()
f = open(filename, 'r')
for line in f:
line = line.rstrip('\r\n')
ss = line.split(' ')
crossid = ss[0]
good_flag = int(ss[1])
if good_flag == 1:
crossid_set.add(crossid)
f.close()
return crossid_set
def pick_sub_roadnet(self, crossid_set) -> 'RoadNet':
"""提取并返回子路网"""
subnet = RoadNet(self.nodeid)
for (cid, cross) in self.cross_map.items():
if cid not in crossid_set:
continue
else:
subnet.add_cross(cross)
for (rid, road) in self.road_map.items():
if road.crossid_from not in crossid_set and road.crossid_to not in crossid_set:
continue
else:
subnet.add_road(road)
for (fid, flow) in self.flow_map.items():
if flow.crossid not in crossid_set:
continue
else:
subnet.add_flow(flow)
return subnet
def clear_inner_roads(self):
change_cross_num = 0
for cross in self.cross_map.values():
inter_set = set(cross.in_roadids).intersection(set(cross.out_roadids))
if len(inter_set) > 0:
new_in_roadids = []
for in_rid in cross.in_roadids:
if in_rid not in inter_set:
new_in_roadids.append(in_rid)
cross.in_roadids = new_in_roadids
new_out_roadids = []
for out_rid in cross.out_roadids:
if out_rid not in inter_set:
new_out_roadids.append(out_rid)
cross.out_roadids = new_out_roadids
change_cross_num += 1
# end
if change_cross_num > 0:
logging.info('clear inner road for cross num: %d' % change_cross_num)
def get_other_inroad_info(self, crossid: str, main_inroadids: [], roadinfo: 'RoadInfoManager') -> []:
"""
获取指定路口的其他相交进口道ID
:param crossid: 路口ID
:param main_inroadids: 已经确定的绿波主路的进口道ID列表
:return: [ [srcDir, inroadid] ]
"""
if crossid not in self.cross_map:
print('missing cross %s' % crossid)
return None
cross = self.query_cross(crossid)
main_srcDir_set = set()
for rid in main_inroadids:
road = self.query_road(rid)
if not road:
print('missing road %s of cross %s' % (rid, crossid))
return False
main_srcDir_set.add(road.srcDir)
# slg补充上对向的车流来向
if len(main_srcDir_set) == 1:
only_srcDir = list(main_srcDir_set)[0]
main_srcDir_set.add(g_reversed_srcDir_mapping[only_srcDir])
inter_set = set(cross.in_roadids).intersection(set(cross.out_roadids))
srcDir_to_roads = {}
for in_rid in cross.in_roadids:
if in_rid in inter_set:
continue
in_road = self.query_road(in_rid)
if in_road.srcDir in main_srcDir_set:
continue
# 按srcDir分组
if in_road.srcDir in srcDir_to_roads:
srcDir_to_roads[in_road.srcDir].append(in_road)
else:
srcDir_to_roads[in_road.srcDir] = [in_road]
# [ [srcDir, inroadid] ]
other_inroad_infos = []
# 遍历 srcDir_to_roads
for srcDir, roads in srcDir_to_roads.items():
if len(roads) == 1:
other_inroad_infos.append([srcDir, roads[0].id])
else:
# 在相同srcDir的多个inroad中选择最优代表性的一个
# 如果有主路、有辅路,则
side_roads = []
main_roads = []
for road in roads:
if road.is_sideway():
side_roads.append(road)
else:
main_roads.append(road)
# 优先选择主路(非辅路)路段
selected_roads = []
if len(main_roads) > 0:
selected_roads = main_roads
else:
selected_roads = side_roads
# TODO: 从 selected_roads 选出车道个数多的
pick_road = None
max_lane_num = 0
for road in selected_roads:
rinfo = roadinfo.query_roadinfo(road.id)
if rinfo.lane_num > max_lane_num:
max_lane_num = rinfo.lane_num
pick_road = road
other_inroad_infos.append([srcDir, pick_road.id])
# return
return other_inroad_infos
def calc_road_turn_angle(self, road_from: Road, road_to: Road):
pt0 = self.query_cross(road_from.crossid_from).location
pt1 = self.query_cross(road_from.crossid_to).location
pt2 = self.query_cross(road_to.crossid_from).location
pt3 = self.query_cross(road_to.crossid_to).location
pA = MyPoint(pt0.lng, pt0.lat)
pB = MyPoint(pt1.lng, pt1.lat)
pC = MyPoint(pt2.lng, pt2.lat)
pD = MyPoint(pt3.lng, pt3.lat)
turn_angle = abs(GeoFunc.radian2angle(GeoFunc.calc_turn_radian(pA, pB, pC, pD)))
return turn_angle
def calc_road_turn_angle_without_abs(self, road_from: Road, road_to: Road):
# pt0 = self.query_cross(road_from.crossid_from).location
# 第一个点,取一定距离的上游几何点
pt0 = road_from.get_tail_up_pt()
# pt1 = self.query_cross(road_from.crossid_to).location
pt1 = road_from.path[-1]
# pt2 = self.query_cross(road_to.crossid_from).location
pt2 = road_to.path[0]
# 最后一个点,取一定距离的下游几何点
pt3 = road_to.get_head_down_pt()
# pt3 = self.query_cross(road_to.crossid_to).location
pA = MyPoint.makeOne(pt0)
pB = MyPoint.makeOne(pt1)
pC = MyPoint.makeOne(pt2)
pD = MyPoint.makeOne(pt3)
turn_angle = GeoFunc.radian2angle(GeoFunc.calc_turn_radian(pA, pB, pC, pD))
# print('%.6f,%.6f,%.6f,%.6f,%.6f,%.6f,%.6f,%.6f => %.2f' % (pA.X,pA.Y, pB.X,pB.Y, pC.X,pC.Y, pD.X,pD.Y, turn_angle))
return turn_angle
def get_straight_road_pairs(self, crossid: str = None, consider_sideway = False):
"""
查询指定路口的属于直行关系的in-out路段ID pair
:param crossid:
:return:
"""
road_pairs = []
for cross in self.cross_map.values():
if crossid and cross.id != crossid:
continue
inter_set = set(cross.in_roadids).intersection(set(cross.out_roadids))
# print(inter_set)
candidate_list = []
for in_rid in cross.in_roadids:
if in_rid in inter_set:
continue
in_road = self.query_road(in_rid)
if not in_road or (not consider_sideway and in_road.is_sideway()):
continue
for out_rid in cross.out_roadids:
if out_rid in inter_set:
continue
if in_rid == out_rid:
continue
out_road = self.query_road(out_rid)
if not out_road or (not consider_sideway and out_road.is_sideway()):
continue
# 不支持主-辅路构成pair
if in_road.is_sideway() != out_road.is_sideway():
continue
# 比较前后两个路段的转角
turn_angle = self.calc_road_turn_angle(in_road, out_road)
# print(in_road.name, out_road.name, turn_angle)
if turn_angle <= 45:
if in_road.name == out_road.name:
# 将同名道路的转角减少,以便于排序,排到最前面
turn_angle = turn_angle - 50
candidate_list.append([in_rid, out_rid, turn_angle])
# 按照广义的转角排序
pair_list = sorted(candidate_list, key=lambda x: x[2])
# for info in pair_list:
# print(info[0], info[1], round(info[2], 2))
# print(pair_list)
done_inroadid_set = set()
done_outroadid_set = set()
inout_name_map = dict()
# 依次选择全局最“直”的in-out构成pair
for info in pair_list:
in_rid = info[0]
out_rid = info[1]
in_name = in_road.name
out_name = out_road.name
if in_name in inout_name_map and out_name != inout_name_map[in_name]:
# 不符合已经确定的出入路名pair则跳过
continue
if out_name in inout_name_map and in_name != inout_name_map[out_name]:
# 不符合已经确定的出入路名pair则跳过
continue
if in_name not in inout_name_map and out_name not in inout_name_map:
# 构造出入路名pair
inout_name_map[in_name] = out_name
inout_name_map[out_name] = in_name
# 已经成pair的路段不再参与
if in_rid in done_inroadid_set or out_rid in done_outroadid_set:
continue
road_pairs.append([in_rid, out_rid])
done_inroadid_set.add(in_rid)
done_outroadid_set.add(out_rid)
return road_pairs
def get_straight_inroadid(self, crossid: str, out_roadid: str, consider_sideway = False):
"""
获取指定路口的出口道的直行进口道ID
"""
inout_road_pairs = self.get_straight_road_pairs(crossid, consider_sideway)
for pair in inout_road_pairs:
if out_roadid == pair[1]:
return pair[0]
# end not find
return None
def get_straight_outroadid(self, crossid: str, in_roadid: str, consider_sideway = False):
"""
获取指定路口的进口道的直行出口道ID
"""
inout_road_pairs = self.get_straight_road_pairs(crossid, consider_sideway)
for pair in inout_road_pairs:
if in_roadid == pair[0]:
return pair[1]
# end not find
return None
def get_turn_inroadid(self, crossid: str, out_roadid: str, want_turn_type = 1):
"""
获取指定路口的出口道的指定左右转的进口道ID
@param crossid: 路口ID
@param out_roadid: 出口道ID
@param want_turn_type: 要求的转向类型取值1,2表示左转 和 右转
@return: road or None
"""
out_road = self.query_road(out_roadid)
pC = MyPoint.makeOne(out_road.path[0])
pD = MyPoint.makeOne(out_road.path[-1])
cur_min_diff = 180
find_roadid = None
in_roadids = self.query_cross(crossid).in_roadids
for in_rid in in_roadids:
in_road = self.query_road(in_rid)
pA = MyPoint.makeOne(in_road.path[0])
pB = MyPoint.makeOne(in_road.path[-1])
turn_type, diff = GeoFunc.calc_turn_type_ex(pA, pB, pC, pD)
if turn_type != want_turn_type:
continue
if diff < cur_min_diff:
cur_min_diff = diff
find_roadid = in_road.id
return find_roadid
def get_turn_outroadid(self, crossid: str, in_roadid: str, want_turn_type = 1):
"""
获取指定路口的进口道的指定左右转的出口道ID
@param crossid:
@param in_roadid:
@param want_turn_type:
@return:
"""
in_road = self.query_road(in_roadid)
pA = MyPoint.makeOne(in_road.path[0])
pB = MyPoint.makeOne(in_road.path[-1])
cur_min_diff = 180
find_roadid = None
out_roadids = self.query_cross(crossid).out_roadids
for out_rid in out_roadids:
out_road = self.query_road(out_rid)
pC = MyPoint.makeOne(out_road.path[0])
pD = MyPoint.makeOne(out_road.path[-1])
turn_type, diff = GeoFunc.calc_turn_type_ex(pA, pB, pC, pD)
if turn_type != want_turn_type:
continue
if diff < cur_min_diff:
cur_min_diff = diff
find_roadid = out_road.id
return find_roadid
def do_cross_classify(self, road_to_info: 'RoadInfoManager'):
"""
内部执行路口的分级,返回各个等级的路口个数
"""
self.cross_rank = dict()
rank_stat = dict()
for cross in self.cross_map.values():
if cross.at_edge:
continue
if cross.type != CROSS_TYPE_LIGHT:
continue
max_inroad_rank = -1
for rid in cross.in_roadids:
roadinfo = road_to_info.query_roadinfo(rid)
rank = (int)(roadinfo.get_rank() * 2)
if rank > max_inroad_rank:
max_inroad_rank = rank
self.cross_rank[cross.id] = max_inroad_rank
if max_inroad_rank in rank_stat:
rank_stat[max_inroad_rank] += 1
else:
rank_stat[max_inroad_rank] = 1
# 统计一下各个rank的数字
return rank_stat
def query_cross_by_bound(self, bound: MapBound, road_to_info: 'RoadInfoManager') -> []:
"""
查询指定范围内的信控路口信息
"""
info_list = []
for cross in self.cross_map.values():
if cross.id == 'CR_11343097_2361427':
print('debug')
if cross.at_edge:
continue
if cross.type != CROSS_TYPE_LIGHT:
continue
loc: Pt = cross.location
if bound.in_bound(loc.lng, loc.lat):
corss_roads = cross.in_roadids + cross.out_roadids
max_rc = 10
for ro in corss_roads:
roadinfo = road_to_info.query_roadinfo(ro)
rc = comparable_rank(roadinfo.roadclass)
if rc < max_rc:
max_rc = rc
if max_rc > 8:
cross_class = 2
else:
cross_class = 1
info = {'crossid': cross.id, 'name': cross.name, 'location': loc.toStr(), 'type': cross.type,
'cross_class': cross_class}
info_list.append(info)
return info_list
def query_cross_by_rank(self, min_rank: int) -> []:
"""
查询不低于指定rank的信控路口信息
"""
info_list = []
for cross in self.cross_map.values():
if cross.at_edge:
continue
if cross.type != CROSS_TYPE_LIGHT:
continue
if cross.id in self.cross_rank:
rank = self.cross_rank[cross.id]
if rank >= min_rank:
loc: Pt = cross.location
info = {'crossid': cross.id, 'name': cross.name, 'location': loc.toStr()}
info_list.append(info)
return info_list
def update_cross_info(self, crossid, cross_info):
cross = self.cross_map[crossid]
cross.name = cross_info['name']
cross.location = Pt(float(cross_info['location'].split(',')[0]), float(cross_info['location'].split(',')[1]))
def update_road_net(self, roadid, road_info):
road = self.road_map[roadid]
road.srcDir = road_info['src_direct']
class RoadInfo:
def __init__(self):
self.roadid = ''
self.roadclass = 6
self.length = 0
self.lane_num = 1
self.turninfo = '1'
self.ffs = 60
self.speed_limit = 60
self.capacity = 1800
def get_rank(self):
rc_weight = 6 - get_comparable_rc(self.roadclass)
return (rc_weight + self.lane_num) / 2
def get_lanenum_of_turn(self, turn_type: int) -> int:
"""
获取指定转向类型的车道个数
:param turn_type:
:return:
"""
if turn_type < 0 or turn_type > 3:
return 0
hit_turnset = g_turnset_of_turn_type[turn_type]
lanenum = 0
turn_list = self.turninfo.split('|')
for item in turn_list:
turn = int(item)
if turn in hit_turnset:
lanenum += 1
return lanenum
def get_straight_left_lanenum(self) -> int:
"""
获取直左共用的车道个数
:param turn_type:
:return:
"""
hit_turnset_0 = g_turnset_of_turn_type[0]
hit_turnset_1 = g_turnset_of_turn_type[1]
hit_turnset = hit_turnset_0.intersection(hit_turnset_1)
lanenum = 0
turn_list = self.turninfo.split('|')
for item in turn_list:
turn = int(item)
if turn in hit_turnset:
lanenum += 1
return lanenum
def to_dict(self):
info = {}
info['roadclass'] = self.roadclass
info['length'] = self.length
info['lane_num'] = self.lane_num
info['ffs'] = self.ffs
info['capacity'] = self.capacity
if self.turninfo != '':
ss = self.turninfo.split('|')
else:
ss = [1]
info['lane_turn_list'] = ss
info['lane_turn_flags'] = turninfo2str(self.turninfo)
return info
@classmethod
def deserialize_from_txt(cls, line):
ss = line.split(' ')
roadinfo = RoadInfo()
roadinfo.roadid = ss[0]
roadinfo.roadclass = int(ss[1])
roadinfo.length = int(ss[2])
roadinfo.lane_num = int(ss[3])
roadinfo.turninfo = ss[4]
roadinfo.ffs = int(ss[5])
roadinfo.speed_limit = int(ss[6])
roadinfo.capacity = int(ss[7])
return roadinfo
# 定义交管路网的路段属性管理器类
class RoadInfoManager:
def __init__(self):
self.roadinfo_map = {}
def query_roadinfo(self, roadid) -> RoadInfo:
return self.roadinfo_map.get(roadid)
def load_from_file(self, filename):
f = open(filename, 'r', encoding='utf-8')
file_content = f.read()
f.close()
data = file_content.replace('\r', ' ')
lines = data.split('\n')
header_fields = lines[0].split(' ')
road_num = int(header_fields[1])
line_offset = 1
for i in range(0, road_num):
line = lines[i + line_offset]
roadinfo = RoadInfo.deserialize_from_txt(line)
self.roadinfo_map[roadinfo.roadid] = roadinfo
def toString(self):
# 统计一下,车道数分布
lane_stat = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
rc_stat = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
for roadinfo in self.roadinfo_map.values():
lane_num = roadinfo.lane_num
lane_stat[lane_num] += 1
rc_stat[roadinfo.roadclass] += 1
res = ''
for num in range(0, len(lane_stat)):
res += "[lane_%d: %d]" % (num, lane_stat[num])
res += "\n"
for rc in range(0, len(rc_stat)):
res += "[rc_%d: %d]" % (rc, rc_stat[rc])
return res
def update_road_info(self, roadid, road_info):
roadInfo = self.query_roadinfo(roadid)
roadInfo.roadclass = 6 if not road_info['from_road_class'] else road_info['from_road_class']
roadInfo.lane_num = road_info['lane_num']
roadInfo.length = road_info['from_road_length']
roadInfo.ffs = road_info['ffs']
roadInfo.capacity = road_info['capacity']
roadInfo.turninfo = road_info['lane_turn_info']
roadInfo.lane_turn_list = road_info['lane_turn_info'].split('|') if len(road_info['lane_turn_info']) > 1 else [1]
roadInfo.lane_turn_flags = turninfo2str(road_info['lane_turn_info'])
def check_point_in_bound(point1, point2, param_point):
x1, y1 = float(point1.split(',')[0]), float(point1.split(',')[1])
x2, y2 = float(point2.split(',')[0]), float(point2.split(',')[1])
x3, y3 = param_point.lng, param_point.lat
return (x1 <= x3 <= x2) and (y1 <= y3 <= y2)
# 20250904 更新城市边框定义方式 支持一个城市多个框选范围的情况
def check_point_in_bounds(bounds, param_point):
lng, lat = float(param_point.toStr().split(',')[0]), float(param_point.toStr().split(',')[1])
def _inside(poly):
n, inside = len(poly), False
j = n - 1
for i in range(n):
xi, yi = poly[i]
xj, yj = poly[j]
if ((yi > lat) != (yj > lat)) and \
(lng < (xj - xi) * (lat - yi) / (yj - yi + 1e-12) + xi):
inside = not inside
j = i
return inside
for s in bounds:
coords = list(map(float, s.split(',')))
if len(coords) < 6 or len(coords) % 2:
continue
poly = [(coords[i], coords[i+1]) for i in range(0, len(coords), 2)]
if _inside(poly):
return True
return False
# 20250904 更新城市边框定义方式 支持一个城市多个框选范围的情况 实现方式2
def point_in_any_bound(raw_bounds, param_point):
"""
pt: (lng, lat)
raw_bounds: ["x1,y1,x2,y2,...", "..."]
"""
lng, lat = param_point.toStr().split(',')[0], param_point.toStr().split(',')[1]
for s in raw_bounds:
coords = list(map(float, s.split(',')))
if len(coords) < 6 or len(coords) % 2:
continue
xy = [(coords[i], coords[i+1]) for i in range(0, len(coords), 2)]
if Path(xy).contains_point((lng, lat)):
return True
return False
if __name__ == '__main__':
turninfo = '4|1|5'
print(turninfo2str(turninfo))
# roadnet = RoadNet('Demo')
# f = open('D:/slgwork/slgcode/importer_bd/roadnet_demo.txt', 'r', encoding='utf-8')
# file_content = f.read()
# f.close()
# roadnet.load(file_content)
# print("ok")
# print(roadnet.toString())
#
# save_data = roadnet.serialize_as_txt()
# f = open("./tmp_roadnet.txt", "w", encoding='utf-8')
# f.write(save_data)
# f.close()