from typing import Dict, Any, List import app.console as console from app.GeoFunc import * from app.comm import * from matplotlib.path import Path # 车信箭头标记到转向集合的映射关系 g_turn2str = {'1': 's', '2': 'l', '3': 'r', '4': 'ls', '5': 'sr', '6': 't', '7': 'st', '8': 'lt', '9': 'lr', '10': 'lst', '11': '-', '12': 'lsr', '13': 'rt', '14': 'lrt', '15': 'srt', '16': 'lsrt', '17': 'bus', '19': 'reversible'} g_turnflag2type = {'s': 0, 'l': 1, 'r': 2, 't': 3} # 转向类型到车信箭头标记的映射关系 g_turnset_of_turn_type = [{1,4,5,7,10,12,15,16}, {2,4,8,9,10,12,14,16}, {3,5,6,7,8,10,13,14,15,16}, {6,7,8,10,13,15,16}] def turninfo2str(turninfo): global g_turn2str res = [] if turninfo == '': res.append(g_turn2str['1']) else: ss = turninfo.split('|') for turn in ss: if turn in g_turn2str.keys(): res.append(g_turn2str[turn]) else: turn = '1' res.append(g_turn2str[turn]) return res def turnflag_to_turn_type(token): global g_turnflag2type if token in g_turnflag2type: return g_turnflag2type[token] else: return 0 def get_comparable_rc(rc): """ 返回取值1~5的有可比性的道路等级 :param rc: :return: """ return rc if rc <= 5 else (rc - 5) class MapBound: """ 地图矩形范围 """ def __init__(self): self.min_lon = 0 self.min_lat = 0 self.max_lon = 0 self.max_lat = 0 def in_bound(self, lon: float, lat: float): """ 判断指定坐标是否落入bound内 """ return self.min_lon < lon < self.max_lon and self.min_lat < lat < self.max_lat @classmethod def parse_from_str(cls, bound_str): bound = MapBound() try: lonlat_str_list = bound_str.split(',') bound.min_lon = float(lonlat_str_list[0]) bound.min_lat = float(lonlat_str_list[1]) bound.max_lon = float(lonlat_str_list[2]) bound.max_lat = float(lonlat_str_list[3]) return bound except Exception: return None # 定义经纬度点 class Pt: def __init__(self, lng, lat): self.lng = lng self.lat = lat def copy(self): return Pt(self.lng, self.lat) def toStr(self): return "%.6f,%.6f" % (self.lng, self.lat) def dist(self, other): d = math.fabs(self.lng - other.lng) + math.fabs(self.lat - other.lat) # console.log( # '' + str(self.lng) + ' ' + str(other.lng) + ' ' + str(self.lat) + ' ' + str(other.lat) + ' = ' + str(d)) return d def sqrt_dist(self, other): d = math.sqrt((self.lng - other.lng)*(self.lng - other.lng) + (self.lat - other.lat)*(self.lat - other.lat)) return d CROSS_TYPE_NORMAL = 1 CROSS_TYPE_LIGHT = 2 # // 定义路口类 class Cross: def __init__(self, lng, lat): self.id = self.coord2id(lng, lat) self.at_edge = False # type 0:普通路口; 1:信控路口 self.type = 1 self.location = Pt(lng, lat) self.name = 'unknown' self.in_roadids = [] self.out_roadids = [] # 20250922 新增辖区id 辖区名称 信息 self.area_id = 0 self.area_name = '' def set_as_edge(self): self.at_edge = True def judge_if_edge(self): if len(self.in_roadids) <= 1 and len(self.out_roadids) <= 1: self.at_edge = True @staticmethod def coord2id(lng, lat): LNG = round(lng * 100000) LAT = round(lat * 100000) id = 'CR_' + str(LNG) + '_' + str(LAT) return id def add_inroad(self, roadid): self.in_roadids.append(roadid) def add_outroad(self, roadid): self.out_roadids.append(roadid) def get_location(self): return str(self.location.lng) + ',' + str(self.location.lat) def has_light(self) -> bool: """是否为信控路口""" return self.type == 1 def to_dict(self): info = {} info['crossid'] = self.id info['location'] = self.get_location() info['name'] = self.name info['type'] = self.type return info def toString(self): return self.id + " coord:" + str(self.location.lng) + ',' + str(self.location.lat) + ' inroad:' + str( len(self.in_roadid)) + ' outroad:' + str(len(self.out_roadids)) def serialize_as_txt(self): edge_flag = 1 if self.at_edge else 0 return self.id + ' ' + str(self.location.lng) + ',' + str(self.location.lat) + ' ' + self.name + ' ' + edge_flag @classmethod def deserialize_from_txt(cls, line): ss = line.split(' ') xys = ss[1].split(',') lng = float(xys[0]) lat = float(xys[1]) cross = Cross(lng, lat) cross.id = ss[0] if len(ss) >= 3: cross.name = ss[2] if len(ss) >= 4: flag = int(ss[3]) cross.at_edge = True if flag == 1 else False if len(ss) >= 5: cross.type = int(ss[4]) if cross.type < 1 or cross.type > 2: console.log('wrong type %s for cross %s' % (ss[4], cross.id)) cross.type = 2 else: # 如果不是边缘路口,则默认视作灯控路口 if not cross.at_edge: cross.type = 2 return cross class Road: def __init__(self, crossid_from, crossid_to): self.id = self.make_roadid(crossid_from, crossid_to) self.crossid_from = crossid_from self.crossid_to = crossid_to self.out_flows = [] self.name = 'unknown' self.path = [] self.srcDir = '' # 车流来向 def make_roadid(self, crossid_from, crossid_to): return 'RD_' + crossid_from + '_' + crossid_to def set_name(self, name): self.name = name def is_sideway(self): """该路段是否为辅路""" return self.name[-2:] == '辅路' def set_path(self, geo): # if self.crossid_to == '6ee88a42611bd4': # print('debug') xys = geo.split(',') pt_num = len(xys) // 2 for i in range(0, pt_num): self.path.append(Pt(float(xys[2 * i]), float(xys[2 * i + 1]))) self.srcDir = GeoFunc.calc_srcDir_tail(self.path) # self.srcDir = GeoFunc.calc_srcDir(MyPoint.makeOne(self.path[0]), MyPoint.makeOne(self.path[len(self.path) - 1])) def get_headfrom_srcDir(self): """ 获取路段起始段的车流来向 :return: """ return GeoFunc.calc_srcDir_head(self.path) def get_angle_from_head(self): return GeoFunc.calc_angle_head(self.path) def get_angle_from_tail(self): return GeoFunc.calc_angle_tail(self.path) def toPt(self, point: MyPoint): pt = Pt() pt.lng = point.X pt.lat = point.Y return pt def get_tail_up_pt(self): pt_list = self.path num = len(pt_list) pB = MyPoint.makeOne(pt_list[num - 1]) cur_pA_index = num - 2 while True: pA = MyPoint.makeOne(pt_list[cur_pA_index]) if cur_pA_index <= 0 or pA.get_dist(pB) >= 0.0005: ppt = MyPoint.makeOne(pt_list[cur_pA_index + 1]) left_dist = 0.0005 - ppt.get_dist(pB) mid_pt = ppt.calc_left_pt(pA, left_dist) # print('left_dist %f. dist %f. %.6f,%.6f' % (left_dist, pB.get_dist(mid_pt), mid_pt.X, mid_pt.Y)) return Pt(mid_pt.X, mid_pt.Y) # return pt_list[cur_pA_index] cur_pA_index -= 1 return None def get_head_down_pt(self): pt_list = self.path num = len(pt_list) pA = MyPoint.makeOne(pt_list[0]) cur_pB_index = 1 while True: pB = MyPoint.makeOne(pt_list[cur_pB_index]) if cur_pB_index >= num - 1 or pA.get_dist(pB) >= 0.0005: ppt = MyPoint.makeOne(pt_list[cur_pB_index-1]) left_dist = 0.0005 - pA.get_dist(ppt) mid_pt = ppt.calc_left_pt(pB, left_dist) # print('left_dist %f. dist %f. %.6f,%.6f' % (left_dist, pA.get_dist(mid_pt), mid_pt.X, mid_pt.Y)) return Pt(mid_pt.X, mid_pt.Y) # return pt_list[cur_pB_index] cur_pB_index += 1 return None def get_geo(self): ss = [] for pt in self.path: ss.append(str(pt.lng)) ss.append(str(pt.lat)) return ','.join(ss) def fill_coords(self, cross_map): cross_from = cross_map.get(self.crossid_from) cross_to = cross_map.get(self.crossid_to) self.path.append(cross_from.location.copy()) self.path.append(cross_to.location.copy()) self.srcDir = GeoFunc.calc_srcDir(MyPoint.makeOne(cross_from.location), MyPoint.makeOne(cross_to.location)) def add_outflow(self, flow): self.out_flows.append(flow) def toString(self): return self.crossid_from + ' => ' + self.crossid_to + ' name:' + self.name def serialize_as_txt(self): return self.id + ' ' + self.crossid_from + ' ' + self.crossid_to + ' ' + self.name @classmethod def deserialize_from_txt(cls, line): ss = line.split(' ') id = ss[0] crossid_from = ss[1] crossid_to = ss[2] road = Road(crossid_from, crossid_to) road.id = id road.set_name(ss[3]) if (len(ss) >= 5): geo = ss[4] road.set_path(geo) return road def get_direct_angle(self): pt0: Pt = self.path[0] pt1: Pt = self.path[-1] pA = MyPoint(pt0.lng, pt0.lat) pB = MyPoint(pt1.lng, pt1.lat) return GeoFunc.calc_pole_angle(pA, pB) # 定义流向类 class Flow: def __init__(self, crossid, roadid_in, roadid_out): self.crossid = crossid self.roadid_in = roadid_in self.roadid_out = roadid_out self.turn_type = -1 def getid(self): return hash('FL_' + self.roadid_in + '_' + str(self.turn_type) + self.roadid_out) # return 'FL_' + self.roadid_in + '_' + str(self.turn_type) def toString(self): return self.crossid + ' ' + self.roadid_in + ' ' + self.roadid_out + ' ' + str(self.turn_type) def serialize_as_txt(self): return self.toString() @classmethod def deserialize_from_txt(cls, line): ss = line.split(' ') flow = Flow(ss[0], ss[1], ss[2]) flow.turn_type = int(ss[3]) return flow # class CrossTopo: """ 路口拓扑,包含各个进口道方向[取值为标准八方向]所对应的转向[取值为0,1,2,3]数组 """ def __init__(self): # 存储结构:srcDir => [0/1/2/3]数组,数组内每个元素表示1个流向 self.srcDir_flows = {} # 添加一个流向 def add_flow_delay(self, srcDir, turn_type): data = None if srcDir in self.srcDir_flows: data = self.srcDir_flows[srcDir] else: data = [] if turn_type not in data: data.append(turn_type) self.srcDir_flows[srcDir] = data def get_srcDir_list(self) -> [str]: return list(self.srcDir_flows.keys()) def get_srcDir_pairs(self) -> (str, str): pair_list = [] srcDir_set = set(self.srcDir_flows.keys()) if 'N' in srcDir_set and 'S' in srcDir_set: pair_list.append(['N', 'S']) if 'E' in srcDir_set and 'W' in srcDir_set: pair_list.append(['E', 'W']) if 'NW' in srcDir_set and 'SE' in srcDir_set: pair_list.append(['NW', 'SE']) if 'NE' in srcDir_set and 'SW' in srcDir_set: pair_list.append(['NE', 'SW']) if len(pair_list) == 2: # 仅当存在两组对向时,才返回结果 return pair_list[0], pair_list[1] else: return False, False # 定义交管路网类 class RoadNet: def __init__(self, nodeid): self.nodeid = nodeid # 路口数据 self.cross_map = {} self.road_map = {} self.flow_map = {} self.bound_info = {} self.is_for_greenwave = False # 路口的rank信息 self.cross_rank = {} # 路口到所属citycode的映射关系 self.cross2city = {} # 疑似过街灯的信号路口集合 self.midroad_cross_idset = set() def collect_midroad_crosses(self): """ 识别路网中疑似在道路中间的不是交叉口的信控cross, 收集到self.midroad_cross_idset中 :return: """ for cross in self.cross_map.values(): if cross.type == CROSS_TYPE_LIGHT and len(cross.in_roadids) == 2: name_set = set() for rid in cross.in_roadids: road = self.query_road(rid) name_set.add(road.name) if len(name_set) == 1: self.midroad_cross_idset.add(cross.id) print('collect midroad_cross num: %d' % len(self.midroad_cross_idset)) def is_midroad_cross(self, crossid: str): """ 判断是不是道路中间的可跨越的信控路口 :param crossid: :return: """ return crossid in self.midroad_cross_idset def set_for_greenwave(self): self.is_for_greenwave = True def add_cross(self, cross): if self.bound_info and len(self.bound_info) > 0: bound_info = self.bound_info for item in bound_info: area_id = item['area_id'] area_name = item['area_name'] cross.area_id = area_id cross.area_name = area_name bound = item['bounds'] if check_point_in_bounds(bound, cross.location): self.cross_map[cross.id] = cross else: continue elif not self.bound_info: self.cross_map[cross.id] = cross def add_road(self, road): if road.crossid_from == road.crossid_to: return False if road.crossid_from not in self.cross_map.keys() and road.crossid_to not in self.cross_map.keys(): return False self.road_map[road.id] = road if (len(road.path) < 2): road.fill_coords(self.cross_map) # 更新路口的进出口道路 cross_from = self.cross_map.get(road.crossid_from) if cross_from: cross_from.add_outroad(road.id) cross_to = self.cross_map.get(road.crossid_to) if cross_to: cross_to.add_inroad(road.id) return True def add_bidir_road(self, road): reversed_road = Road(road.crossid_to, road.crossid_from) self.add_road(road) self.add_road(reversed_road) def add_flow(self, flow): self.flow_map[flow.getid()] = flow inroad = self.road_map.get(flow.roadid_in) if inroad: inroad.add_outflow(flow.getid()) # 自动生成所有的flow def fill_allflows(self): self.flow_map.clear() for road in self.road_map.values(): road.out_flows = [] add_num = 0 # 针对每个路口的每一组in-out路段,都生成一个flow for (id, cross) in self.cross_map.items(): in_roads = cross.in_roadids out_roads = cross.out_roadids # 遍历进口道 for in_roadid in in_roads: # 遍历出口道 for out_roadid in out_roads: flow = Flow(id, in_roadid, out_roadid) flow.turn_type = self.calc_turn_type(in_roadid, out_roadid) self.add_flow(flow) add_num += 1 print("add flow %d, final get flow %d" % (add_num, len(self.flow_map))) def query_cross(self, crossid) -> Cross: return self.cross_map.get(crossid) def query_road(self, roadid) -> Road: return self.road_map.get(roadid) def query_road_2(self, crossid, srcDir) -> Road: """查询路口指定来车方向的进口道""" inroad_ids = self.query_cross(crossid).in_roadids for rid in inroad_ids: road = self.query_road(rid) if road.srcDir == srcDir: return road return None def query_next_roads(self, roadid, srcDir) -> [str]: """查询下游路段""" res = [] road = self.query_road(roadid) next_roadids = self.query_cross(road.crossid_to).out_roadids for rid in next_roadids: next_road = self.query_road(rid) if next_road.srcDir == srcDir: res.append(rid) return res def query_next_staight_roads(self, roadid, angle_limit = 60): """ 查找路段的下游近似直行的road @param roadid: 输入roadid @param angle_limit: 转角限制,默认60度 @return: [roadid] """ res = [] this_road = self.query_road(roadid) tmp_cross = self.query_cross(this_road.crossid_to) direct_angle = this_road.get_direct_angle() for roadid in tmp_cross.out_roadids: road = self.query_road(roadid) # 找出转角小于60的road diff_angle = GeoFunc.diff_pole_angle(direct_angle, road.get_direct_angle()) if diff_angle <= angle_limit: res.append(road.id) return res def query_prev_roads(self, roadid, srcDir) -> [str]: """查询上游路段""" res = [] road = self.query_road(roadid) prev_roadids: list[str] = self.query_cross(road.crossid_from).in_roadids for rid in prev_roadids: prev_road = self.query_road(rid) if not prev_road: print('missing query_prev_roads roadid %s from road %s srcDir %s' % (rid, roadid, srcDir)) if prev_road.srcDir == srcDir: res.append(rid) return res def query_main_side_road_pair(self): """ 获取路网内包含的疑似主辅路的roadid的pair :return:[(roadid1, roadid2)] """ pair_list = [] # 遍历路口 for cross in self.cross_map.values(): if cross.at_edge: continue inroad_num = len(cross.in_roadids) for i in range(0, inroad_num): for j in range(i+1, inroad_num): rid1 = cross.in_roadids[i] rid2 = cross.in_roadids[j] road1 = self.query_road(rid1) road2 = self.query_road(rid2) if road1.crossid_from == road2.crossid_from and road1.srcDir == road2.srcDir: pair_list.append((rid1, rid2)) # return pair_list return pair_list def query_flow(self, flowid) -> Flow: return self.flow_map.get(flowid) def get_crossid_list(self): res = [] for id in self.cross_map.keys(): res.append(id) return res def get_valid_crossid_list(self) -> []: res = [] for (crossid, cross) in self.cross_map.items(): if cross.at_edge: continue else: res.append(crossid) return res # g_cross_list def fill_valid_crossids(self, crossid_list) -> int: fill_num = 0 for (crossid, cross) in self.cross_map.items(): if cross.at_edge: continue else: crossid_list.append(crossid) fill_num += 1 return fill_num def fill_valid_crossinfo(self, cross_list) -> int: fill_num = 0 for (crossid, cross) in self.cross_map.items(): if cross.at_edge: continue # elif len(cross.in_roadids) < 4: # continue else: cross_list.append({'crossid': crossid, 'type': cross.type, 'name': cross.name, 'location': cross.location.toStr()}) fill_num += 1 return fill_num def get_crossroads(self) -> []: """ 获取十字路口的信息数组 :return: """ cross_list = [] for (crossid, cross) in self.cross_map.items(): if cross.at_edge: continue else: # 进一步判断是否为十字路口 if len(cross.in_roadids) >= 4: cross_list.append([crossid, cross.name, cross.location.toStr()]) return cross_list def fill_crossroads(self, crossid_set) -> []: crossid_set.clear() for (crossid, cross) in self.cross_map.items(): if cross.at_edge: continue else: # 进一步判断是否为十字路口 if len(cross.in_roadids) >= 4: crossid_set.add(crossid) return crossid_set def get_cross_allflow(self, crossid) -> {}: flows = {} for (id, flow) in self.flow_map.items(): if flow.crossid == crossid: flows[id] = flow return flows def get_cross_topo(self, crossid) -> CrossTopo: cross = self.cross_map.get(crossid) if not cross: return False topo = CrossTopo() in_roads = cross.in_roadids # 遍历进口道 for in_roadid in in_roads: inroad = self.query_road(in_roadid) srcDir = inroad.srcDir for flowid in inroad.out_flows: turn_type = self.query_flow(flowid).turn_type topo.add_flow_delay(srcDir, turn_type) return topo def get_adjcross(self, crossid): cross = self.cross_map.get(crossid) adj_cross = {} for inroad_id in cross.in_roadids: road = self.query_road(inroad_id) adj_cross[road.srcDir] = self.query_cross(road.crossid_from) return adj_cross def get_adjcross_from(self, crossid, srcDir): cross = self.cross_map.get(crossid) adj_cross = None for inroad_id in cross.in_roadids: road = self.query_road(inroad_id) if road.srcDir == srcDir: adj_cross = self.query_cross(road.crossid_from) return adj_cross def search_cross(self, keyword): cross_list = [] for cross in self.cross_map.values(): if cross.name.find(keyword) >= 0: cross_list.append(cross) return cross_list # 根据位置查询路口 def hit_cross(self, lng, lat): pos = Pt(lng, lat) # 遍历map for (id, cross) in self.cross_map.items(): if (cross.location.dist(pos) < 0.001): console.log("hit_cross: " + id) return id else: console.log("hit_cross: falsse") return None def query_cross_all_srcDir(self, crossid): srcDir_list = [] cross = self.cross_map.get(crossid) for roadid in cross.in_roadids: road = self.road_map.get(roadid) srcDir_list.append(road.srcDir) return srcDir_list def toString(self): return ('RoadNet:%s cross_num:%d road_num:%d flow:%d' % (self.nodeid, len(self.cross_map), len(self.road_map), len(self.flow_map))) def serialize_as_txt(self): # 遍历路口 lines = [] cr_num = len(self.cross_map) road_num = len(self.road_map) flow_num = len(self.flow_map) headline = "cross %d road %d flow %d" % (cr_num, road_num, flow_num) lines.append(headline) # 遍历路口 for (id, cross) in self.cross_map.items(): lines.append(cross.serialize_as_txt()) # 遍历路段 for (id, road) in self.road_map.items(): lines.append(road.serialize_as_txt()) # 遍历流向 for (id, flow) in self.flow_map.items(): lines.append(flow.serialize_as_txt()) # 连接并返回 return '\n'.join(lines) def reset(self): self.cross_map.clear() self.road_map.clear() self.flow_map.clear() def calc_turn_type(self, in_roadid, out_roadid): in_road = self.query_road(in_roadid) out_road = self.query_road(out_roadid) if None == in_road or None == out_road: return None # # 简单地,根据入和出路段的首末点,来计算转角 # pA = MyPoint.makeOne(in_road.path[0]) # pB = MyPoint.makeOne(in_road.path[1]) # pC = MyPoint.makeOne(out_road.path[0]) # pD = MyPoint.makeOne(out_road.path[1]) # return GeoFunc.calc_turn_type(pA, pB, pC, pD) # 改为:基于进口道末端几何 和 出口道开头几何 的 转角计算 pA, pB = GeoFunc.pick_path_tail(in_road.path) pC, pD = GeoFunc.pick_path_head(out_road.path) return GeoFunc.calc_turn_type(pA, pB, pC, pD) def complete_roadname(self): # 遍历road add_roadname_num = 0 for road in self.road_map.values(): if road.name is not None and road.name != "unknown": continue for other in self.road_map.values(): if other.name is not None and other.name != "unknown": if (road.crossid_from == other.crossid_to and road.crossid_to == other.crossid_from): road.set_name('' + other.name) add_roadname_num += 1 msg = "add roadname num:" + str(add_roadname_num) console.log(msg) # alert(msg) def generate_crossname(self): generate_num = 0 for cross in self.cross_map.values(): if cross.at_edge: continue if len(cross.in_roadids) < 1 or len(cross.out_roadids) < 1: # console.log("bad cross: " + cross.id) continue if cross.name is not None and cross.name != "unkown": # console.log("had cross name: " + cross.name) continue in_roadname = self.query_road(cross.in_roadids[0]).name for roadid in cross.out_roadids: out_roadname = self.query_road(roadid).name if in_roadname != out_roadname: cross_name = in_roadname + out_roadname + '交叉口' cross.name = cross_name generate_num += 1 break console.log("generate cross name num:" + str(generate_num)) def load_from_file(self, filename, citycode: int = 0): # 在绿波模式下,需要累积加载多个城市的路网,因此,加载时,不执行重置操作 if not self.is_for_greenwave: self.reset() f = open(filename, 'r', encoding='utf-8') file_content = f.read() f.close() return self.load(file_content, citycode) def load_from_fix_road_dir_file(self, filename, citycode: int = 0): f = open(filename, 'r', encoding='utf-8') file_content = f.read() f.close() data = file_content.replace('\r', '') lines = data.split('\n') header_fields = lines[0].split(' ') road_num = int(header_fields[0]) line_offset = 1 for i in range(0, road_num): line = lines[i + line_offset] roadid = line.split(' ')[0] fix_dir = line.split(' ')[1] road = self.query_road(roadid) if road: road.srcDir = fix_dir # 传入的文件内容字符串 def load(self, data, citycode: int = 0): data = data.replace('\r', '') lines = data.split('\n') header_fields = lines[0].split(' ') cross_num = int(header_fields[1]) road_num = int(header_fields[3]) flow_num = int(header_fields[5]) # 读取所有路口 line_offset = 1 for i in range(0, cross_num): line = lines[i + line_offset] cross = Cross.deserialize_from_txt(line) # # 忽略非市区的路口 # if not cross.urban: # continue self.add_cross(cross) if cross.id == 'CR_11343097_2361427': print('debug load') if citycode > 0: self.cross2city[cross.id] = citycode # 读取所有进口道 line_offset += cross_num for i in range(0, road_num): line = lines[i + line_offset] self.add_road(Road.deserialize_from_txt(line)) # 读取所有流向 line_offset += road_num for i in range(0, flow_num): line = lines[i + line_offset] # 在绿波模式下,不加载flow数据 if not self.is_for_greenwave: self.add_flow(Flow.deserialize_from_txt(line)) if not self.is_for_greenwave: self.clear_inner_roads() def query_artery_crosses(self, crossid_list, is_reversed): """ 查询干线上的路口、路段列表 :param crossid_list: :return: """ if is_reversed: orig_list = crossid_list crossid_list = list(orig_list) crossid_list.reverse() cur_cross = self.query_cross(crossid_list[0]) cross_list = [cur_cross] road_list = [] head_srcDir = None for i in range(1, len(crossid_list)): next_crossid = crossid_list[i] find_road = None for roadid in cur_cross.out_roadids: road = self.query_road(roadid) if road.crossid_to == next_crossid: find_road = road break if not find_road: logging.warning("find road between [%s -> %s] fail" % (cur_cross.id, next_crossid)) return False, False if not head_srcDir: # 仅当找到第一个路段时,处理一次 head_srcDir = find_road.srcDir # 从第一个路口的进口道中寻找合适的进口道 for inroadid in cur_cross.in_roadids: road = self.query_road(inroadid) if road.srcDir == head_srcDir: road_list.append(road) break if len(road_list) <= 0: # 没有找到上游合适的进口道 road_list.append(None) # road_list.append(find_road) # update cur_cross = self.query_cross(next_crossid) cross_list.append(cur_cross) # 再反转回来 if is_reversed: cross_list.reverse() road_list.reverse() return cross_list, road_list def get_road_direct_angle(self, roadid): """""" road = self.query_road(roadid) if not road: return -1 else: return road.get_direct_angle() def try_connect_cross(self, crossid_from, crossid_to, ignore_sideway=True) -> dict: """ 尝试获得2个路口之间的连接信息 :param crossid_from: :param crossid_to: :param ignore_sideway 是否忽略辅路,默认: 忽略 :return: {'roadid': xxx, 'prev_roadids': [], 'mid_crossids': [], 'reason':'xxx'} """ # (0) 检查输入参数的有效性 cross_from = self.query_cross(crossid_from) if not cross_from: print('missing ' + crossid_from) return None next_cross = self.query_cross(crossid_to) if not next_cross: print('missing ' + crossid_to) return None # (1) 首先,尝试直接连通 find_road = None find_road_list = [] # 存储可能存在的多个road for roadid in cross_from.out_roadids: road = self.query_road(roadid) if ignore_sideway and road.is_sideway(): continue if road.crossid_to == crossid_to: find_road_list.append(road) # 选择1个road if len(find_road_list) == 1: find_road = find_road_list[0] elif len(find_road_list) > 1: # 使用第一个不是辅路的road for road in find_road_list: if not road.is_sideway(): find_road = road if not find_road: # 如果全是辅路,则选择第一个 find_road = find_road_list[0] if find_road: res = {'roadid': find_road.id, 'prev_roadids': [], 'mid_crossids': []} return res # (2)当不能直连时,开始拓展查找模式 # 计算两个路口之间的角度 pA = MyPoint(cross_from.location.lng, cross_from.location.lat) pB = MyPoint(next_cross.location.lng, next_cross.location.lat) direct_angle = GeoFunc.calc_pole_angle(pA, pB) # 尝试寻找 max_skip_cross_num = 5 # 最大允许跨过的非灯路口个数 find_road = None # 紧挨下游的road prev_roadids = [] # 从上游路口到find_road之间的roadid序列 mid_crossids = [] # 中间跨越的路口ID集合 tmp_cross = cross_from while True: min_diff_angle = 360 cur_direct_road = None for roadid in tmp_cross.out_roadids: road = self.query_road(roadid) # 按需,舍弃辅路的road if ignore_sideway and road.is_sideway(): continue if road.crossid_to == crossid_to: find_road = road break # 舍弃连接边缘路口的路段 tmp_next_cross = self.query_cross(road.crossid_to) if not tmp_next_cross or tmp_next_cross.at_edge: continue else: # 找出转角小于60的“最直”的road diff_angle = GeoFunc.diff_pole_angle(direct_angle, road.get_direct_angle()) if diff_angle <= 60 and diff_angle < min_diff_angle: cur_direct_road = road min_diff_angle = diff_angle # 找到了下游路口,直接返回结果 if find_road: res = {'roadid': find_road.id, 'prev_roadids': prev_roadids, 'mid_crossids': mid_crossids} return res # 如果没有找到符合要求的“直行”road,则返回错误 if not cur_direct_road: # 没有找到直行的road print('break at cross: %s. connect[%s %s]' % (tmp_cross.id, crossid_from, crossid_to)) res = {'roadid': None, 'prev_roadids': prev_roadids, 'mid_crossids': mid_crossids, 'reason': 'no direct'} return res # 找到了“直行”road,但尚未找到下游路口, 则继续寻找 # 记录中间的路段ID和路口ID prev_roadids.append(cur_direct_road.id) mid_crossids.append(cur_direct_road.crossid_to) # 检查一下当前“跨越”情形,是否满足[继续寻找]的要求 tmp_cross = self.query_cross(cur_direct_road.crossid_to) # 跨越了信控路口,直接确认失败 if tmp_cross.type == CROSS_TYPE_LIGHT and not self.is_midroad_cross(tmp_cross.id): # 信控路口 res = {'roadid': None, 'prev_roadids': prev_roadids, 'mid_crossids': mid_crossids, 'reason': 'skip light'} print('%s - %s mid %s' % (crossid_from, crossid_to, tmp_cross.id)) return res # 超过最大跨路口个数限制,直接确认失败 if len(prev_roadids) > max_skip_cross_num: # print('too_far %s - %s' % (crossid_from, crossid_to)) res = {'roadid': None, 'prev_roadids': prev_roadids, 'mid_crossids': mid_crossids, 'reason': 'too far'} return res def query_artery_crosses_ex(self, crossid_list, is_reversed, ignore_sideway=True): """ 查询干线上的路口、路段列表 :param crossid_list: :param is_reversed: 是否为逆向 :param ignore_sideway : 是否忽略辅路,默认:忽略 :return: cross_list, road_list, prev_roadids_list, mid_crossids_list """ if is_reversed: orig_list = crossid_list crossid_list = list(orig_list) crossid_list.reverse() cur_cross = self.query_cross(crossid_list[0]) cross_list = [cur_cross] road_list = [] prev_roadids_list = [] mid_crossids_list = [] for i in range(1, len(crossid_list)): from_crossid = crossid_list[i - 1] to_crossid = crossid_list[i] find_res = self.try_connect_cross(from_crossid, to_crossid, ignore_sideway) find_road = None if find_res['roadid']: # 第一个路段,补全一下前向的路段 if i == 1: road_list.append(None) prev_roadids_list.append([]) find_road = self.query_road(find_res['roadid']) road_list.append(find_road) cross_list.append(self.query_cross(to_crossid)) prev_roadids_list.append(find_res['prev_roadids']) mid_crossids_list.append(find_res['mid_crossids']) else: # 中间中断了,则立刻返回错误 print('break with: ' + find_res['reason']) return False, False, False, False # 再反转回来 if is_reversed: cross_list.reverse() road_list.reverse() prev_roadids_list.reverse() mid_crossids_list.reverse() return cross_list, road_list, prev_roadids_list, mid_crossids_list def query_unidir_wave_crosses(self, crossid_list, roadinfo: 'RoadInfoManager', sideway_start = 0): """ 查询单向绿波上的各路口间的路段连接信息 @param crossid_list: @param roadinfo: 全局的路段属性管理器 @param sideway_start: 从1开始 @return: """ cross_num = len(crossid_list) # 设置主辅路连接标记 sideway_flags = [] for i in range(0, cross_num): if sideway_start > 1 and i >= (sideway_start-1): sideway_flags.append(True) # 辅路路口 else: sideway_flags.append(False) # 主路路口 cur_cross = self.query_cross(crossid_list[0]) cross_list = [cur_cross] road_list = [] prev_roadids_list = [] mid_crossids_list = [] for i in range(1, cross_num): from_crossid = crossid_list[i - 1] to_crossid = crossid_list[i] # find_res = self.try_connect_cross(from_crossid, to_crossid, ignore_sideway) find_res = self.super_connect_cross(from_crossid, to_crossid, roadinfo, sideway_flags[i-1], sideway_flags[i]) find_road = None if find_res['roadid']: # 第一个路段,补全一下前向的路段 if i == 1: road_list.append(None) prev_roadids_list.append([]) find_road = self.query_road(find_res['roadid']) road_list.append(find_road) cross_list.append(self.query_cross(to_crossid)) prev_roadids_list.append(find_res['prev_roadids']) mid_crossids_list.append(find_res['mid_crossids']) else: # 中间中断了,则立刻返回错误 print('break with: ' + find_res['reason']) return False, False, False, False return cross_list, road_list, prev_roadids_list, mid_crossids_list def find_path(self, in_road: Road, DRidset: set(), acc_len: int, road_list: [], len_limit: int, roadinfo: 'RoadInfoManager'): """ 两个路口之间的寻路 @param in_road: 将要检查的road对象 @param DRidset: 末端结果roadid集合 @param acc_len: 前序路径长度 @param road_list: 前序路段road数组 @param len_limit: 里程上限阈值 @param roadinfo: 公共数据,用于查询road的长度 @return: [road], reason """ # 命中末段road if in_road.id in DRidset: road_list.append(in_road) return road_list, 'ok' road_len = roadinfo.query_roadinfo(in_road.id).length # 里程超限,停止寻路 if acc_len + road_len >= len_limit: return False, 'too_long' else: # 禁止跨过信控路口 if self.query_cross(in_road.crossid_to).type == CROSS_TYPE_LIGHT: return False, 'skip light' # 展开节点 next_rid_list = self.query_next_staight_roads(in_road.id, ) # 无下游,停止寻路 if len(next_rid_list) == 0: return False, 'no next' tmp_list = list(road_list) tmp_list.append(in_road) for rid in next_rid_list: tmp_road = self.query_road(rid) res, reason = self.find_path(tmp_road, DRidset, acc_len + road_len, tmp_list, len_limit, roadinfo) # 找到了 if res: return res, 'ok' else: continue # 所有的下游都找不到,则停止寻路 return False, 'not_connect' def super_connect_cross(self, crossid_from, crossid_to, roadinfo: 'RoadInfoManager', from_sideway=False, to_sideway=True) -> dict: """ 尝试获得2个路口之间的连接信息,默认为:主路连接辅路 :param crossid_from: :param crossid_to: :param from_sideway: 前路口是否为辅路连接 :param to_sideway: 后路口是否为辅路连接 :return: {'roadid': xxx, 'prev_roadids': [], 'mid_crossids': [], 'reason':'xxx'} """ # (0) 检查输入参数的有效性 cross_from = self.query_cross(crossid_from) if not cross_from: print('missing ' + crossid_from) return None cross_to = self.query_cross(crossid_to) if not cross_to: print('missing ' + crossid_to) return None # (1) 找出Oset 和 Dset 和 len_limit ORoadset = set() DRidset = set() for roadid in cross_from.out_roadids: road = self.query_road(roadid) is_sideway = road.is_sideway() if is_sideway == from_sideway: ORoadset.add(road) for roadid in cross_to.in_roadids: road = self.query_road(roadid) is_sideway = road.is_sideway() if is_sideway == to_sideway: DRidset.add(roadid) # 应该改为计算球面距离 cross_dist = cross_from.location.earth_dist(cross_to.location) len_limit = cross_dist * 1.5 # (2) acc_len = 0 for in_road in ORoadset: road_list, reason = self.find_path(in_road, DRidset, acc_len, [], len_limit, roadinfo) if not road_list: logging.warning('crossid_from: ' + crossid_from + ', crossid_to: ' + crossid_to + ', reason: ' + reason) # print('crossid_from: ' + crossid_from + ', crossid_to: ' + crossid_to + ', reason: ' + reason) continue # TODO: 构造结果: {'roadid': xxx, 'prev_roadids': [], 'mid_crossids': [], 'reason':'xxx'} res = {'roadid': road_list[-1].id} prev_roadids = [] mid_crossids = [] for i in range(0, len(road_list)-1): prev_roadids.append(road_list[i].id) mid_crossids.append(road_list[i].crossid_to) res['prev_roadids'] = prev_roadids res['mid_crossids'] = mid_crossids return res # default end return {'roadid': None, 'prev_roadids': [], 'mid_crossids': [], 'reason':'no path'} def pick_symmetrical_roads(self) -> dict[str: int]: """ 从路网中提取出满足相邻路口互为上下游关系的路口和路段集合 :return: roadid_to_pairid """ crossid_set = set() # 已经被处理过的进口道ID集合 scaned_inroadid_set = set() pair_id = 0 roadid_to_pairid = {} road_pair_list = [] hit_debug = False bad_road_num = 0 # 遍历所有的路段 for (inrodid, road) in self.road_map.items(): if road.is_sideway(): # print('ignore sideway %s %s' % (inrodid, road.name)) continue if inrodid in scaned_inroadid_set: continue else: scaned_inroadid_set.add(inrodid) id_from = road.crossid_from id_to = road.crossid_to # 寻找连接 id_to -> id_from 的路段 cross_from = self.query_cross(id_from) cross_to = self.query_cross(id_to) # 在 cross_from 的进口道中寻找 find_reversed_road = False for tmp_inroad_id in cross_from.in_roadids: tmp_inroad = self.query_road(tmp_inroad_id) if not tmp_inroad: continue # 忽略辅路 if tmp_inroad.is_sideway(): continue if tmp_inroad.crossid_from == id_to: # 找到了对称的路段 # 构建pair scaned_inroadid_set.add(tmp_inroad_id) find_reversed_road = True pair_id += 1 roadid_to_pairid[inrodid] = pair_id roadid_to_pairid[tmp_inroad_id] = pair_id road_pair_list.append((inrodid, tmp_inroad_id)) # crossid_set.add(id_from) crossid_set.add(id_to) if hit_debug: line_geo = '%s,%s' % (self.query_cross(id_from).location.toStr(), self.query_cross(id_to).location.toStr()) print(line_geo) break if not find_reversed_road: bad_road_num += 1 if bad_road_num % 100 == 1: print('bad_road: ' + road.toString()) print('bad_road_geo: %s' % (road.get_geo())) continue print('pick paired_road %d from road count %d' % (len(roadid_to_pairid), len(self.road_map))) return roadid_to_pairid def pick_symmetrical_roads_list(self): """ 从路网中提取出满足相邻路口互为上下游关系的路口和路段集合 :return: roadid_to_pairid """ crossid_set = set() # 已经被处理过的进口道ID集合 scaned_inroadid_set = set() pair_id = 0 roadid_to_pairid = {} road_pair_list = [] hit_debug = False bad_road_num = 0 # 遍历所有的路段 for (inrodid, road) in self.road_map.items(): pair_id += 1 if inrodid in scaned_inroadid_set: continue else: scaned_inroadid_set.add(inrodid) id_from = road.crossid_from id_to = road.crossid_to # 寻找连接 id_to -> id_from 的路段 cross_from = self.query_cross(id_from) cross_to = self.query_cross(id_to) # 在 cross_from 的进口道中寻找 find_reversed_road = False for tmp_inroad_id in cross_from.in_roadids: tmp_inroad = self.query_road(tmp_inroad_id) if not tmp_inroad: continue if tmp_inroad.crossid_from == id_to: # 找到了对称的路段 # 构建pair scaned_inroadid_set.add(tmp_inroad_id) find_reversed_road = True pair_id += 1 roadid_to_pairid[inrodid] = pair_id roadid_to_pairid[tmp_inroad_id] = pair_id road_pair_list.append([inrodid, tmp_inroad_id]) # crossid_set.add(id_from) crossid_set.add(id_to) if hit_debug: line_geo = '%s,%s' % (self.query_cross(id_from).location.toStr(), self.query_cross(id_to).location.toStr()) print(line_geo) break if not find_reversed_road: bad_road_num += 1 if bad_road_num % 100 == 1: print('bad_road: ' + road.toString()) print('bad_road_geo: %s' % (road.get_geo())) continue return road_pair_list def get_road_inflows(self) -> dict[str, list]: """ 获取每个路段的上游各个入流向的路段信息 :return: {roadid => [直行进入的路段列表,左转进入的路段列表,右转进入的路段列表]} """ data = {} for flow in self.flow_map.values(): roadid_out = flow.roadid_out roadid_in = flow.roadid_in turn_type = flow.turn_type if turn_type > 2: continue if roadid_out not in data: data[roadid_out] = [[], [], []] data[roadid_out][turn_type].append(roadid_in) return data @classmethod def load_cross_profile(cls, filename: str) -> set: """ 加载路口画像文件,返回有效路口ID集合 """ crossid_set = set() f = open(filename, 'r') for line in f: line = line.rstrip('\r\n') ss = line.split(' ') crossid = ss[0] good_flag = int(ss[1]) if good_flag == 1: crossid_set.add(crossid) f.close() return crossid_set def pick_sub_roadnet(self, crossid_set) -> 'RoadNet': """提取并返回子路网""" subnet = RoadNet(self.nodeid) for (cid, cross) in self.cross_map.items(): if cid not in crossid_set: continue else: subnet.add_cross(cross) for (rid, road) in self.road_map.items(): if road.crossid_from not in crossid_set and road.crossid_to not in crossid_set: continue else: subnet.add_road(road) for (fid, flow) in self.flow_map.items(): if flow.crossid not in crossid_set: continue else: subnet.add_flow(flow) return subnet def clear_inner_roads(self): change_cross_num = 0 for cross in self.cross_map.values(): inter_set = set(cross.in_roadids).intersection(set(cross.out_roadids)) if len(inter_set) > 0: new_in_roadids = [] for in_rid in cross.in_roadids: if in_rid not in inter_set: new_in_roadids.append(in_rid) cross.in_roadids = new_in_roadids new_out_roadids = [] for out_rid in cross.out_roadids: if out_rid not in inter_set: new_out_roadids.append(out_rid) cross.out_roadids = new_out_roadids change_cross_num += 1 # end if change_cross_num > 0: logging.info('clear inner road for cross num: %d' % change_cross_num) def get_other_inroad_info(self, crossid: str, main_inroadids: [], roadinfo: 'RoadInfoManager') -> []: """ 获取指定路口的其他相交进口道ID :param crossid: 路口ID :param main_inroadids: 已经确定的绿波主路的进口道ID列表 :return: [ [srcDir, inroadid] ] """ if crossid not in self.cross_map: print('missing cross %s' % crossid) return None cross = self.query_cross(crossid) main_srcDir_set = set() for rid in main_inroadids: road = self.query_road(rid) if not road: print('missing road %s of cross %s' % (rid, crossid)) return False main_srcDir_set.add(road.srcDir) # slg补充上对向的车流来向 if len(main_srcDir_set) == 1: only_srcDir = list(main_srcDir_set)[0] main_srcDir_set.add(g_reversed_srcDir_mapping[only_srcDir]) inter_set = set(cross.in_roadids).intersection(set(cross.out_roadids)) srcDir_to_roads = {} for in_rid in cross.in_roadids: if in_rid in inter_set: continue in_road = self.query_road(in_rid) if in_road.srcDir in main_srcDir_set: continue # 按srcDir分组 if in_road.srcDir in srcDir_to_roads: srcDir_to_roads[in_road.srcDir].append(in_road) else: srcDir_to_roads[in_road.srcDir] = [in_road] # [ [srcDir, inroadid] ] other_inroad_infos = [] # 遍历 srcDir_to_roads for srcDir, roads in srcDir_to_roads.items(): if len(roads) == 1: other_inroad_infos.append([srcDir, roads[0].id]) else: # 在相同srcDir的多个inroad中,选择最优代表性的一个 # 如果有主路、有辅路,则 side_roads = [] main_roads = [] for road in roads: if road.is_sideway(): side_roads.append(road) else: main_roads.append(road) # 优先选择主路(非辅路)路段 selected_roads = [] if len(main_roads) > 0: selected_roads = main_roads else: selected_roads = side_roads # TODO: 从 selected_roads 选出车道个数多的 pick_road = None max_lane_num = 0 for road in selected_roads: rinfo = roadinfo.query_roadinfo(road.id) if rinfo.lane_num > max_lane_num: max_lane_num = rinfo.lane_num pick_road = road other_inroad_infos.append([srcDir, pick_road.id]) # return return other_inroad_infos def calc_road_turn_angle(self, road_from: Road, road_to: Road): pt0 = self.query_cross(road_from.crossid_from).location pt1 = self.query_cross(road_from.crossid_to).location pt2 = self.query_cross(road_to.crossid_from).location pt3 = self.query_cross(road_to.crossid_to).location pA = MyPoint(pt0.lng, pt0.lat) pB = MyPoint(pt1.lng, pt1.lat) pC = MyPoint(pt2.lng, pt2.lat) pD = MyPoint(pt3.lng, pt3.lat) turn_angle = abs(GeoFunc.radian2angle(GeoFunc.calc_turn_radian(pA, pB, pC, pD))) return turn_angle def calc_road_turn_angle_without_abs(self, road_from: Road, road_to: Road): # pt0 = self.query_cross(road_from.crossid_from).location # 第一个点,取一定距离的上游几何点 pt0 = road_from.get_tail_up_pt() # pt1 = self.query_cross(road_from.crossid_to).location pt1 = road_from.path[-1] # pt2 = self.query_cross(road_to.crossid_from).location pt2 = road_to.path[0] # 最后一个点,取一定距离的下游几何点 pt3 = road_to.get_head_down_pt() # pt3 = self.query_cross(road_to.crossid_to).location pA = MyPoint.makeOne(pt0) pB = MyPoint.makeOne(pt1) pC = MyPoint.makeOne(pt2) pD = MyPoint.makeOne(pt3) turn_angle = GeoFunc.radian2angle(GeoFunc.calc_turn_radian(pA, pB, pC, pD)) # print('%.6f,%.6f,%.6f,%.6f,%.6f,%.6f,%.6f,%.6f => %.2f' % (pA.X,pA.Y, pB.X,pB.Y, pC.X,pC.Y, pD.X,pD.Y, turn_angle)) return turn_angle def get_straight_road_pairs(self, crossid: str = None, consider_sideway = False): """ 查询指定路口的属于直行关系的in-out路段ID pair :param crossid: :return: """ road_pairs = [] for cross in self.cross_map.values(): if crossid and cross.id != crossid: continue inter_set = set(cross.in_roadids).intersection(set(cross.out_roadids)) # print(inter_set) candidate_list = [] for in_rid in cross.in_roadids: if in_rid in inter_set: continue in_road = self.query_road(in_rid) if not in_road or (not consider_sideway and in_road.is_sideway()): continue for out_rid in cross.out_roadids: if out_rid in inter_set: continue if in_rid == out_rid: continue out_road = self.query_road(out_rid) if not out_road or (not consider_sideway and out_road.is_sideway()): continue # 不支持主-辅路构成pair if in_road.is_sideway() != out_road.is_sideway(): continue # 比较前后两个路段的转角 turn_angle = self.calc_road_turn_angle(in_road, out_road) # print(in_road.name, out_road.name, turn_angle) if turn_angle <= 45: if in_road.name == out_road.name: # 将同名道路的转角减少,以便于排序,排到最前面 turn_angle = turn_angle - 50 candidate_list.append([in_rid, out_rid, turn_angle]) # 按照广义的转角排序 pair_list = sorted(candidate_list, key=lambda x: x[2]) # for info in pair_list: # print(info[0], info[1], round(info[2], 2)) # print(pair_list) done_inroadid_set = set() done_outroadid_set = set() inout_name_map = dict() # 依次选择全局最“直”的in-out,构成pair for info in pair_list: in_rid = info[0] out_rid = info[1] in_name = in_road.name out_name = out_road.name if in_name in inout_name_map and out_name != inout_name_map[in_name]: # 不符合已经确定的出入路名pair,则跳过 continue if out_name in inout_name_map and in_name != inout_name_map[out_name]: # 不符合已经确定的出入路名pair,则跳过 continue if in_name not in inout_name_map and out_name not in inout_name_map: # 构造出入路名pair inout_name_map[in_name] = out_name inout_name_map[out_name] = in_name # 已经成pair的路段不再参与 if in_rid in done_inroadid_set or out_rid in done_outroadid_set: continue road_pairs.append([in_rid, out_rid]) done_inroadid_set.add(in_rid) done_outroadid_set.add(out_rid) return road_pairs def get_straight_inroadid(self, crossid: str, out_roadid: str, consider_sideway = False): """ 获取指定路口的出口道的直行进口道ID """ inout_road_pairs = self.get_straight_road_pairs(crossid, consider_sideway) for pair in inout_road_pairs: if out_roadid == pair[1]: return pair[0] # end not find return None def get_straight_outroadid(self, crossid: str, in_roadid: str, consider_sideway = False): """ 获取指定路口的进口道的直行出口道ID """ inout_road_pairs = self.get_straight_road_pairs(crossid, consider_sideway) for pair in inout_road_pairs: if in_roadid == pair[0]: return pair[1] # end not find return None def get_turn_inroadid(self, crossid: str, out_roadid: str, want_turn_type = 1): """ 获取指定路口的出口道的指定左右转的进口道ID @param crossid: 路口ID @param out_roadid: 出口道ID @param want_turn_type: 要求的转向类型取值:1,2表示:左转 和 右转 @return: road or None """ out_road = self.query_road(out_roadid) pC = MyPoint.makeOne(out_road.path[0]) pD = MyPoint.makeOne(out_road.path[-1]) cur_min_diff = 180 find_roadid = None in_roadids = self.query_cross(crossid).in_roadids for in_rid in in_roadids: in_road = self.query_road(in_rid) pA = MyPoint.makeOne(in_road.path[0]) pB = MyPoint.makeOne(in_road.path[-1]) turn_type, diff = GeoFunc.calc_turn_type_ex(pA, pB, pC, pD) if turn_type != want_turn_type: continue if diff < cur_min_diff: cur_min_diff = diff find_roadid = in_road.id return find_roadid def get_turn_outroadid(self, crossid: str, in_roadid: str, want_turn_type = 1): """ 获取指定路口的进口道的指定左右转的出口道ID @param crossid: @param in_roadid: @param want_turn_type: @return: """ in_road = self.query_road(in_roadid) pA = MyPoint.makeOne(in_road.path[0]) pB = MyPoint.makeOne(in_road.path[-1]) cur_min_diff = 180 find_roadid = None out_roadids = self.query_cross(crossid).out_roadids for out_rid in out_roadids: out_road = self.query_road(out_rid) pC = MyPoint.makeOne(out_road.path[0]) pD = MyPoint.makeOne(out_road.path[-1]) turn_type, diff = GeoFunc.calc_turn_type_ex(pA, pB, pC, pD) if turn_type != want_turn_type: continue if diff < cur_min_diff: cur_min_diff = diff find_roadid = out_road.id return find_roadid def do_cross_classify(self, road_to_info: 'RoadInfoManager'): """ 内部执行路口的分级,返回各个等级的路口个数 """ self.cross_rank = dict() rank_stat = dict() for cross in self.cross_map.values(): if cross.at_edge: continue if cross.type != CROSS_TYPE_LIGHT: continue max_inroad_rank = -1 for rid in cross.in_roadids: roadinfo = road_to_info.query_roadinfo(rid) rank = (int)(roadinfo.get_rank() * 2) if rank > max_inroad_rank: max_inroad_rank = rank self.cross_rank[cross.id] = max_inroad_rank if max_inroad_rank in rank_stat: rank_stat[max_inroad_rank] += 1 else: rank_stat[max_inroad_rank] = 1 # 统计一下各个rank的数字 return rank_stat def query_cross_by_bound(self, bound: MapBound, road_to_info: 'RoadInfoManager') -> []: """ 查询指定范围内的信控路口信息 """ info_list = [] for cross in self.cross_map.values(): if cross.id == 'CR_11343097_2361427': print('debug') if cross.at_edge: continue if cross.type != CROSS_TYPE_LIGHT: continue loc: Pt = cross.location if bound.in_bound(loc.lng, loc.lat): corss_roads = cross.in_roadids + cross.out_roadids max_rc = 10 for ro in corss_roads: roadinfo = road_to_info.query_roadinfo(ro) rc = comparable_rank(roadinfo.roadclass) if rc < max_rc: max_rc = rc if max_rc > 8: cross_class = 2 else: cross_class = 1 info = {'crossid': cross.id, 'name': cross.name, 'location': loc.toStr(), 'type': cross.type, 'cross_class': cross_class} info_list.append(info) return info_list def query_cross_by_rank(self, min_rank: int) -> []: """ 查询不低于指定rank的信控路口信息 """ info_list = [] for cross in self.cross_map.values(): if cross.at_edge: continue if cross.type != CROSS_TYPE_LIGHT: continue if cross.id in self.cross_rank: rank = self.cross_rank[cross.id] if rank >= min_rank: loc: Pt = cross.location info = {'crossid': cross.id, 'name': cross.name, 'location': loc.toStr()} info_list.append(info) return info_list def update_cross_info(self, crossid, cross_info): cross = self.cross_map[crossid] cross.name = cross_info['name'] cross.location = Pt(float(cross_info['location'].split(',')[0]), float(cross_info['location'].split(',')[1])) def update_road_net(self, roadid, road_info): road = self.road_map[roadid] road.srcDir = road_info['src_direct'] class RoadInfo: def __init__(self): self.roadid = '' self.roadclass = 6 self.length = 0 self.lane_num = 1 self.turninfo = '1' self.ffs = 60 self.speed_limit = 60 self.capacity = 1800 def get_rank(self): rc_weight = 6 - get_comparable_rc(self.roadclass) return (rc_weight + self.lane_num) / 2 def get_lanenum_of_turn(self, turn_type: int) -> int: """ 获取指定转向类型的车道个数 :param turn_type: :return: """ if turn_type < 0 or turn_type > 3: return 0 hit_turnset = g_turnset_of_turn_type[turn_type] lanenum = 0 turn_list = self.turninfo.split('|') for item in turn_list: turn = int(item) if turn in hit_turnset: lanenum += 1 return lanenum def get_straight_left_lanenum(self) -> int: """ 获取直左共用的车道个数 :param turn_type: :return: """ hit_turnset_0 = g_turnset_of_turn_type[0] hit_turnset_1 = g_turnset_of_turn_type[1] hit_turnset = hit_turnset_0.intersection(hit_turnset_1) lanenum = 0 turn_list = self.turninfo.split('|') for item in turn_list: turn = int(item) if turn in hit_turnset: lanenum += 1 return lanenum def to_dict(self): info = {} info['roadclass'] = self.roadclass info['length'] = self.length info['lane_num'] = self.lane_num info['ffs'] = self.ffs info['capacity'] = self.capacity if self.turninfo != '': ss = self.turninfo.split('|') else: ss = [1] info['lane_turn_list'] = ss info['lane_turn_flags'] = turninfo2str(self.turninfo) return info @classmethod def deserialize_from_txt(cls, line): ss = line.split(' ') roadinfo = RoadInfo() roadinfo.roadid = ss[0] roadinfo.roadclass = int(ss[1]) roadinfo.length = int(ss[2]) roadinfo.lane_num = int(ss[3]) roadinfo.turninfo = ss[4] roadinfo.ffs = int(ss[5]) roadinfo.speed_limit = int(ss[6]) roadinfo.capacity = int(ss[7]) return roadinfo # 定义交管路网的路段属性管理器类 class RoadInfoManager: def __init__(self): self.roadinfo_map = {} def query_roadinfo(self, roadid) -> RoadInfo: return self.roadinfo_map.get(roadid) def load_from_file(self, filename): f = open(filename, 'r', encoding='utf-8') file_content = f.read() f.close() data = file_content.replace('\r', ' ') lines = data.split('\n') header_fields = lines[0].split(' ') road_num = int(header_fields[1]) line_offset = 1 for i in range(0, road_num): line = lines[i + line_offset] roadinfo = RoadInfo.deserialize_from_txt(line) self.roadinfo_map[roadinfo.roadid] = roadinfo def toString(self): # 统计一下,车道数分布 lane_stat = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] rc_stat = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] for roadinfo in self.roadinfo_map.values(): lane_num = roadinfo.lane_num lane_stat[lane_num] += 1 rc_stat[roadinfo.roadclass] += 1 res = '' for num in range(0, len(lane_stat)): res += "[lane_%d: %d]" % (num, lane_stat[num]) res += "\n" for rc in range(0, len(rc_stat)): res += "[rc_%d: %d]" % (rc, rc_stat[rc]) return res def update_road_info(self, roadid, road_info): roadInfo = self.query_roadinfo(roadid) roadInfo.roadclass = 6 if not road_info['from_road_class'] else road_info['from_road_class'] roadInfo.lane_num = road_info['lane_num'] roadInfo.length = road_info['from_road_length'] roadInfo.ffs = road_info['ffs'] roadInfo.capacity = road_info['capacity'] roadInfo.turninfo = road_info['lane_turn_info'] roadInfo.lane_turn_list = road_info['lane_turn_info'].split('|') if len(road_info['lane_turn_info']) > 1 else [1] roadInfo.lane_turn_flags = turninfo2str(road_info['lane_turn_info']) def check_point_in_bound(point1, point2, param_point): x1, y1 = float(point1.split(',')[0]), float(point1.split(',')[1]) x2, y2 = float(point2.split(',')[0]), float(point2.split(',')[1]) x3, y3 = param_point.lng, param_point.lat return (x1 <= x3 <= x2) and (y1 <= y3 <= y2) # 20250904 更新城市边框定义方式 支持一个城市多个框选范围的情况 def check_point_in_bounds(bounds, param_point): lng, lat = float(param_point.toStr().split(',')[0]), float(param_point.toStr().split(',')[1]) def _inside(poly): n, inside = len(poly), False j = n - 1 for i in range(n): xi, yi = poly[i] xj, yj = poly[j] if ((yi > lat) != (yj > lat)) and \ (lng < (xj - xi) * (lat - yi) / (yj - yi + 1e-12) + xi): inside = not inside j = i return inside for s in bounds: coords = list(map(float, s.split(','))) if len(coords) < 6 or len(coords) % 2: continue poly = [(coords[i], coords[i+1]) for i in range(0, len(coords), 2)] if _inside(poly): return True return False # 20250904 更新城市边框定义方式 支持一个城市多个框选范围的情况 (实现方式2) def point_in_any_bound(raw_bounds, param_point): """ pt: (lng, lat) raw_bounds: ["x1,y1,x2,y2,...", "..."] """ lng, lat = param_point.toStr().split(',')[0], param_point.toStr().split(',')[1] for s in raw_bounds: coords = list(map(float, s.split(','))) if len(coords) < 6 or len(coords) % 2: continue xy = [(coords[i], coords[i+1]) for i in range(0, len(coords), 2)] if Path(xy).contains_point((lng, lat)): return True return False if __name__ == '__main__': turninfo = '4|1|5' print(turninfo2str(turninfo)) # roadnet = RoadNet('Demo') # f = open('D:/slgwork/slgcode/importer_bd/roadnet_demo.txt', 'r', encoding='utf-8') # file_content = f.read() # f.close() # roadnet.load(file_content) # print("ok") # print(roadnet.toString()) # # save_data = roadnet.serialize_as_txt() # f = open("./tmp_roadnet.txt", "w", encoding='utf-8') # f.write(save_data) # f.close()