cross_doctor/app/models_wave.py

109 lines
3.4 KiB
Python
Raw Permalink Normal View History

from app.models import *
class RoadLinkManager:
"""
路段与mlinkid的映射关系管理器
"""
def __init__(self):
self.road_taillink_map = {}
self.road_headlink_map = {}
def query_tail_mlinkid(self, roadid) -> str:
return self.road_taillink_map.get(roadid)
def query_head_mlinkid(self, roadid) -> str:
return self.road_headlink_map.get(roadid)
def load_from_file(self, filename):
f = open(filename, 'r', encoding='utf-8')
# RD_51213842743725921240_3 3 51213842743725922880,51213842743725922890,51213842743725921240
for line in f:
line = line.rstrip('\r\n')
fields = line.split(' ')
inroadid = fields[0]
mlinkid_list = fields[2].split(',')
tail_mlinkid = mlinkid_list[-1]
head_mlinkid = mlinkid_list[0]
self.road_taillink_map[inroadid] = tail_mlinkid
self.road_headlink_map[inroadid] = head_mlinkid
f.close()
class CrossOnWave:
"""
绿波上的一个路口信息
"""
def __init__(self):
self.crossid = ''
self.location = ''
self.name = ''
class GreenWave:
"""
绿波静态信息类
"""
def __init__(self):
self.waveid = ''
self.wavename = ''
self.srcDir = 'unknown'
self.crossinfo_list: [CrossOnWave] = []
self.tail_mlinkid_list = []
self.tail_mlinkid_list_backword = []
self.road_len_list = []
self.road_len_list_backword = []
def create_green_wave(crossid_list: [], roadnet: RoadNet, road2info: RoadInfoManager, road2link: RoadLinkManager):
cross_list, road_list, prev_roadids_list, mid_crossids_list = roadnet.query_artery_crosses_ex(crossid_list, is_reversed=False)
if not cross_list:
return None
cross_list_r, road_list_r, prev_roadids_list_r, mid_crossids_list_r = roadnet.query_artery_crosses_ex(crossid_list, is_reversed=True)
if not cross_list_r:
return None
gw = GreenWave()
# fill crossinfo list
for cross in cross_list:
cow = CrossOnWave()
cow.name = cross.name
cow.crossid = cross.id
cow.location = cross.location.toStr()
gw.crossinfo_list.append(cow)
road_len_list = []
road_len_list_backward = []
# fill road_len list
for road in road_list:
if not road:
continue
road_len = roadinfo.query_roadinfo(road.id).length
road_len_list.append(road_len)
for road in road_list_r:
if not road:
continue
road_len = roadinfo.query_roadinfo(road.id).length
road_len_list_backward.append(road_len)
gw.road_len_list = road_len_list
gw.road_len_list_backword = road_len_list_backward
# query inlink
# calc first cross's inlink
if __name__ == '__main__':
net_filename = 'data/xl_roadnet_110000.txt'
roadinfo_filename = 'data/xl_roadinfo_110000.txt'
roadlinks_filename = 'data/xl_roadlinks_110000.txt'
roadnet = RoadNet('Demo')
roadnet.load_from_file(net_filename)
roadinfo = RoadInfoManager()
roadinfo.load_from_file(roadinfo_filename)
road2links = RoadLinkManager()
road2links.load_from_file(roadlinks_filename)
crossid_list = ['CR_11626980_4004895', 'CR_11627289_4004937', 'CR_11628006_4005060', 'CR_11628481_4005152']
res = create_green_wave(crossid_list, roadnet, roadinfo, road2links)
print(res)
print('test ok')