优化巡检页面查询逻辑

This commit is contained in:
wangxu 2025-12-09 09:38:55 +08:00
parent 33b0227f82
commit 5e0fe98325
7 changed files with 348 additions and 30 deletions

View File

@ -19,10 +19,10 @@ def load_roadnet_by_city(citycode: int):
logging.info('start load roadnet for %d' % citycode) logging.info('start load roadnet for %d' % citycode)
net_filename = './../common_roadnet/data/xl_roadnet_%d.txt' % citycode net_filename = './../common_roadnet/data/xl_roadnet_%d.txt' % citycode
roadinfo_filename = './../common_roadnet/data/xl_roadinfo_%d.txt' % citycode roadinfo_filename = './../common_roadnet/data/xl_roadinfo_%d.txt' % citycode
# roadlinks_filename = 'data/xl_roadlinks_%d.txt' % citycode roadlinks_filename = './../common_roadnet/data/xl_roadlinks_%d.txt' % citycode
g_roadnet.load_from_file(net_filename, citycode) g_roadnet.load_from_file(net_filename, citycode)
g_roadinfo_manager.load_from_file(roadinfo_filename) g_roadinfo_manager.load_from_file(roadinfo_filename)
# g_roadlinks_manager.load_from_file(roadlinks_filename) g_roadlinks_manager.load_from_file(roadlinks_filename)
logging.info('finish load roadnet for %d' % citycode) logging.info('finish load roadnet for %d' % citycode)

View File

@ -7,6 +7,7 @@ from apscheduler.schedulers.background import BackgroundScheduler
from flask import Flask, request, jsonify, redirect from flask import Flask, request, jsonify, redirect
from flask_cors import CORS from flask_cors import CORS
from flask_caching import Cache from flask_caching import Cache
from flask_compress import Compress
from werkzeug.exceptions import UnsupportedMediaType from werkzeug.exceptions import UnsupportedMediaType
from app.cross_evaluate_worker import * from app.cross_evaluate_worker import *
@ -16,6 +17,7 @@ app = Flask(__name__)
cache = Cache(app, config={'CACHE_TYPE': 'simple'}) cache = Cache(app, config={'CACHE_TYPE': 'simple'})
CORS(app, resources={r"/api/*": {"origins": "*"}}) CORS(app, resources={r"/api/*": {"origins": "*"}})
cache_keys = ['userid'] cache_keys = ['userid']
Compress(app)
# @app.before_request # @app.before_request
# def middleware_manage(): # def middleware_manage():

View File

@ -3,6 +3,7 @@
# @Date: 2025/11/4 15:13 # @Date: 2025/11/4 15:13
# @Description: 路口巡检页面相关接口 # @Description: 路口巡检页面相关接口
import json import json
import logging
from app.monitor_common import * from app.monitor_common import *
@ -260,26 +261,24 @@ def query_monitor_problems(params):
'sample': {}, 'sample': {},
'phase': [] 'phase': []
} }
row_list = db_cross.query_monitor_data_sql(nodeid, area_id, date_type, query_date) row_list = db_cross.query_monitor_data_sql(nodeid, area_id, date_type, query_date)
if len(row_list) < 1: if len(row_list) < 1:
return json.dumps(make_common_res(9, '没有查询到数据')) return json.dumps(make_common_res(9, '没有查询到数据'))
routing_crosses = db_tmnet.query_routing_crosses(nodeid, area_id) routing_crosses = db_tmnet.query_routing_crosses(nodeid, area_id)
routing_crosses_dict = {item['crossid']: item for item in routing_crosses} routing_crosses_dict = {item['crossid']: item for item in routing_crosses}
cross_roads_dir_dict = gen_crossids_roads_dir_dict_by_mysql([item['crossid'] for item in routing_crosses], nodeid)
for crossid in routing_crosses_dict.keys(): for crossid in routing_crosses_dict.keys():
cross_ledger_info_dict = query_cross_ledger_info(crossid, nodeid, area_id, userid) routing_crosses_dict[crossid]['roads_dir_dict'] = cross_roads_dir_dict[crossid]
if not cross_ledger_info_dict: # for crossid in routing_crosses_dict.keys():
logging.error('查询路口信息失败', crossid) # # cross_ledger_info_dict = query_cross_ledger_info(crossid, nodeid, area_id, userid)
continue # # cross_static_info, cross_ledger_info = gen_cross_static_info(crossid, nodeid, area_id, cross_ledger_info_dict)
cross_static_info, cross_ledger_info = gen_cross_static_info(crossid, nodeid, area_id, cross_ledger_info_dict) # roads_dir_dict = gen_roads_dir_dict_by_mysql(crossid, nodeid)
roads_dir_dict = gen_road_dir_dict(cross_ledger_info) # routing_crosses_dict[crossid]['roads_dir_dict'] = roads_dir_dict
routing_crosses_dict[crossid]['roads_dir_dict'] = roads_dir_dict # # cross_inroads = db_tmnet.query_cross_inroads(crossid, nodeid)
routing_crosses_dict[crossid]['cross_static_info'] = cross_static_info # # inroad_static_info_dict = {item['roadid']: item for item in cross_inroads}
cross_inroads = db_tmnet.query_cross_inroads(crossid, nodeid) # # routing_crosses_dict[crossid]['inroad_static_info_dict'] = inroad_static_info_dict
inroad_static_info_dict = {item['roadid']: item for item in cross_inroads} # # routing_crosses_dict[crossid]['cross_ledger_info'] = cross_ledger_info
routing_crosses_dict[crossid]['inroad_static_info_dict'] = inroad_static_info_dict
routing_crosses_dict[crossid]['cross_ledger_info'] = cross_ledger_info
cross_report_pb = pb.xl_cross_report_t() cross_report_pb = pb.xl_cross_report_t()
cross_report_pb.ParseFromString(row_list[0]['data']) cross_report_pb.ParseFromString(row_list[0]['data'])
cross_problem_records = db_cross.query_cross_problem_record(area_id, query_date, date_type) cross_problem_records = db_cross.query_cross_problem_record(area_id, query_date, date_type)

View File

@ -521,6 +521,195 @@ def gen_road_dir_dict(cross_ledger_info_dict):
return roads_dir_dict return roads_dir_dict
def gen_crossids_roads_dir_dict_by_mysql(crossids, nodeid):
inroad_res_list = db_tmnet.query_crosses_inroads(crossids, nodeid)
outroad_res_list = db_tmnet.query_road_by_crosses_from(nodeid, crossids)
user_defined_inroads_res_list = db_tmnet.query_user_definded_road_by_cross_list(crossids, nodeid)
user_defined_outroads_res_list = db_tmnet.query_user_defined_raod_by_cross_list_from(crossids, nodeid)
inroad_res_list.extend(user_defined_inroads_res_list)
outroad_res_list.extend(user_defined_outroads_res_list)
cross_roads_dict = {}
for row in inroad_res_list:
crossid = row['to_crossid']
if crossid in cross_roads_dict.keys():
cross_roads_dict[crossid]['inroads'][row['roadid']] = row
else:
cross_roads_dict[crossid] = {
'inroads': {
row['roadid']: row
},
'outroads': {}
}
for row in outroad_res_list:
crossid = row['from_crossid']
if crossid in cross_roads_dict.keys():
cross_roads_dict[crossid]['outroads'][row['roadid']] = row
else:
cross_roads_dict[crossid] = {
'inroads': {},
'outroads': {
row['roadid']: row
}
}
crosses_roads_dir_dict = {}
for crossid in cross_roads_dict.keys():
inroad_info_dict = cross_roads_dict[crossid]['inroads']
inroadids = list(inroad_info_dict.keys())
outroad_info_dict = cross_roads_dict[crossid]['outroads']
outroadids = list(outroad_info_dict.keys())
outroadid_angle_dict = calc_outroad_angle(outroadids)
roads_dir_dict = {}
for roadid in inroadids:
road = g_roadnet.query_road(roadid)
if not road:
inroad_angle = calc_user_defined_inroad_angle(roadid)
else:
inroad_angle = road.get_angle_from_head()
min_angle_diff, min_angle_outroadid = 360, ''
for outroadid in outroadid_angle_dict.keys():
if outroadid not in outroadids:
continue
outroad = g_roadnet.query_road(outroadid)
if outroad and road and outroad.crossid_from == road.crossid_to and outroad.crossid_to == road.crossid_from:
min_angle_diff = 0
min_angle_outroadid = outroadid
break
outroad_angle = outroadid_angle_dict[outroadid]
angle_diff = GeoFunc.diff_pole_angle(inroad_angle, outroad_angle)
if angle_diff < min_angle_diff:
min_angle_diff = angle_diff
min_angle_outroadid = outroadid
if min_angle_diff > 30:
min_angle_outroadid = ''
if min_angle_outroadid != '':
outroadid_angle_dict.pop(min_angle_outroadid)
src_direct = inroad_info_dict[roadid]['src_direct']
roads_dir_dict[src_direct] = {
'in': roadid,
'out': min_angle_outroadid if min_angle_outroadid != '' else '-',
'entry_lane_num': len(inroad_info_dict[roadid]['lane_turn_info'].split('|')),
'exit_lane_num': '-' if min_angle_outroadid == '' else outroad_info_dict[min_angle_outroadid]['lane_num']
}
for outroadid in outroadid_angle_dict.keys():
out_road = g_roadnet.query_road(outroadid)
out_src_direct = out_road.get_headfrom_srcDir() if out_road else get_headfrom_srcDir_user_defined(outroadid)
roads_dir_dict[src_reverse[out_src_direct]] = {
'in': '-',
'out': outroadid,
'entry_lane_num': '-',
'exit_lane_num': outroad_info_dict[outroadid]['lane_num']
}
crosses_roads_dir_dict[crossid] = roads_dir_dict
return crosses_roads_dir_dict
def get_headfrom_srcDir_user_defined(roadid):
road_info = db_tmnet.query_user_definded_raod_by_roadid(roadid)
coords = road_info[0]['coords']
pt_str_arr, pt_list = [], []
pts_array = coords.split(',')
for i in range(0, len(pts_array), 2):
pt_str_arr.append({
'lon': pts_array[i],
'lat': pts_array[i + 1]
})
for pt_str in pt_str_arr:
pt_list.append(Pt(float(pt_str['lon']), float(pt_str['lat'])))
srcDir = GeoFunc.calc_srcDir_head(pt_list)
return srcDir
def gen_roads_dir_dict_by_mysql(crossid, nodeid):
roads_dir_dict = {}
inroads_res = db_tmnet.query_cross_inroads(crossid, nodeid)
outroad_res = db_tmnet.query_road_by_cross_from(nodeid, crossid)
# 20250919 新增用户定义新增进出口道信息
user_defined_inroads_res = db_tmnet.query_user_definded_raod(crossid, nodeid)
user_defined_outroads_res = db_tmnet.query_user_definded_raod_by_cross_from(crossid, nodeid)
inroads_res.extend(user_defined_inroads_res)
inroadids = [item['roadid'] for item in inroads_res]
outroad_res.extend(user_defined_outroads_res)
outroadids = [item['roadid'] for item in outroad_res]
outroadid_angle_dict = calc_outroad_angle(outroadids)
inroad_info_dict = {item['roadid']: item for item in inroads_res}
outroad_info_dict = {item['roadid']: item for item in outroad_res}
for roadid in inroadids:
road = g_roadnet.query_road(roadid)
if not road:
inroad_angle = calc_user_defined_inroad_angle(roadid)
else:
inroad_angle = road.get_angle_from_head()
min_angle_diff, min_angle_outroadid = 360, ''
for outroadid in outroadid_angle_dict.keys():
if outroadid not in outroadids:
continue
outroad = g_roadnet.query_road(outroadid)
if outroad and road and outroad.crossid_from == road.crossid_to and outroad.crossid_to == road.crossid_from:
min_angle_diff = 0
min_angle_outroadid = outroadid
break
outroad_angle = outroadid_angle_dict[outroadid]
angle_diff = GeoFunc.diff_pole_angle(inroad_angle, outroad_angle)
if angle_diff < min_angle_diff:
min_angle_diff = angle_diff
min_angle_outroadid = outroadid
if min_angle_diff > 30:
min_angle_outroadid = ''
src_direct = inroad_info_dict[roadid]['src_direct']
roads_dir_dict[src_direct] = {
'in': roadid,
'out': min_angle_outroadid if min_angle_outroadid != '' else '-',
'entry_lane_num': len(inroad_info_dict[roadid]['lane_turn_info'].split('|')),
'exit_lane_num': '-' if min_angle_outroadid == '' else outroad_info_dict[min_angle_outroadid]['lane_num']
}
return roads_dir_dict
def calc_outroad_angle(outroadids):
outroadid_angle_dict = {}
for outroadid in outroadids:
road = g_roadnet.query_road(outroadid)
if not road:
angle = calc_user_defined_outroad_angle(outroadid)
else:
angle = road.get_angle_from_head()
filp_angle = GeoFunc.flip_gps_angle(angle)
outroadid_angle_dict[outroadid] = filp_angle
return outroadid_angle_dict
def calc_user_defined_outroad_angle(roadid):
road_info = db_tmnet.query_user_definded_raod_by_roadid(roadid)
coords = road_info[0]['coords']
pt_str_arr, pt_list = [], []
pts_array = coords.split(',')
for i in range(0, len(pts_array), 2):
pt_str_arr.append({
'lon': pts_array[i],
'lat': pts_array[i + 1]
})
for pt_str in pt_str_arr:
pt_list.append(Pt(float(pt_str['lon']), float(pt_str['lat'])))
angle = GeoFunc.calc_angle_head(pt_list)
return angle
def calc_user_defined_inroad_angle(roadid):
road_info = db_tmnet.query_user_definded_raod_by_roadid(roadid)
coords = road_info[0]['coords']
pt_str_arr, pt_list = [], []
pts_array = coords.split(',')
for i in range(0, len(pts_array), 2):
pt_str_arr.append({
'lon': pts_array[i],
'lat': pts_array[i + 1]
})
for pt_str in pt_str_arr:
pt_list.append(Pt(float(pt_str['lon']), float(pt_str['lat'])))
angle = GeoFunc.calc_angle_tail(pt_list)
return angle
def calc_inroad_imbalance_index(inroad_delay_pb_list): def calc_inroad_imbalance_index(inroad_delay_pb_list):
tmp_list = copy.deepcopy(inroad_delay_pb_list) tmp_list = copy.deepcopy(inroad_delay_pb_list)
for item in tmp_list: for item in tmp_list:

View File

@ -341,10 +341,16 @@ def gen_cross_delay_info_list(userid, area_id, nodeid, date_type, cross_report_p
all_cross_index_dict[crossid]['internet'] = internet_name all_cross_index_dict[crossid]['internet'] = internet_name
all_cross_index_dict[crossid]['slc_company'] = slc_company_name all_cross_index_dict[crossid]['slc_company'] = slc_company_name
all_cross_index_dict[crossid]['worst_service_level'] = 'A' all_cross_index_dict[crossid]['worst_service_level'] = 'A'
if crossid not in all_cross_index_dict_prev:
continue
delay_infos = all_cross_index_dict[crossid]['delay_infos'] delay_infos = all_cross_index_dict[crossid]['delay_infos']
for weekdays in delay_infos.keys():
tp_delay_infos = delay_infos[weekdays]
for tp_info in tp_delay_infos:
if tp_info['service_level'] != '-':
if tp_info['service_level'] > all_cross_index_dict[crossid]['worst_service_level']:
all_cross_index_dict[crossid]['worst_service_level'] = tp_info['service_level']
if crossid not in all_cross_index_dict_prev:
continue
delay_infos_prev = all_cross_index_dict_prev[crossid]['delay_infos'] delay_infos_prev = all_cross_index_dict_prev[crossid]['delay_infos']
weekdays_list = list(delay_infos.keys()) weekdays_list = list(delay_infos.keys())
for weekdays in weekdays_list: for weekdays in weekdays_list:
@ -357,8 +363,6 @@ def gen_cross_delay_info_list(userid, area_id, nodeid, date_type, cross_report_p
tp_start, tp_end = tp_info['start_time'], tp_info['end_time'] tp_start, tp_end = tp_info['start_time'], tp_info['end_time']
if tp_start == tp_info_prev['start_time'] and tp_end == tp_info_prev['end_time']: if tp_start == tp_info_prev['start_time'] and tp_end == tp_info_prev['end_time']:
if tp_info['service_level'] != '-': if tp_info['service_level'] != '-':
if tp_info['service_level'] > all_cross_index_dict[crossid]['worst_service_level']:
all_cross_index_dict[crossid]['worst_service_level'] = tp_info['service_level']
if tp_info_prev['service_level'] != '-': if tp_info_prev['service_level'] != '-':
if tp_info['service_level'] < tp_info_prev['service_level']: if tp_info['service_level'] < tp_info_prev['service_level']:
tp_info['service_level_color'] = 2 tp_info['service_level_color'] = 2
@ -523,6 +527,8 @@ def gen_week_str(tp_info):
week_str = '全周' week_str = '全周'
elif weekdays == '1,2,3,4,5': elif weekdays == '1,2,3,4,5':
week_str = '工作日' week_str = '工作日'
elif weekdays == '6,7':
week_str = '周末'
else: else:
weekday_list = str(weekdays).split(',') weekday_list = str(weekdays).split(',')
weekday_str_list = [] weekday_str_list = []
@ -1295,14 +1301,13 @@ def unmatched_lane_num_problems(routing_crosses, shield_info, filter_shield):
for crossid in routing_crosses.keys(): for crossid in routing_crosses.keys():
cross_name = routing_crosses[crossid]['name'] cross_name = routing_crosses[crossid]['name']
crossno = routing_crosses[crossid]['crossno'] crossno = routing_crosses[crossid]['crossno']
cross_ledger_info = routing_crosses[crossid]['cross_ledger_info'] roads_dir_dict = routing_crosses[crossid]['roads_dir_dict']
road_infos = cross_ledger_info['roads']
err_src_dict = [] err_src_dict = []
for src_dir in road_infos.keys(): for src_dir in roads_dir_dict.keys():
entry_lane_num = road_infos[src_dir]['entry_lane_num'] entry_lane_num = roads_dir_dict[src_dir]['entry_lane_num']
exit_lane_num = '-' exit_lane_num = '-'
if src_reverse[src_dir] in road_infos.keys(): if src_reverse[src_dir] in roads_dir_dict.keys():
exit_lane_num = road_infos[src_reverse[src_dir]]['exit_lane_num'] exit_lane_num = roads_dir_dict[src_reverse[src_dir]]['exit_lane_num']
if exit_lane_num != '-' and entry_lane_num != '-' and entry_lane_num - exit_lane_num > 2: if exit_lane_num != '-' and entry_lane_num != '-' and entry_lane_num - exit_lane_num > 2:
err_src_dict.append(srcDir_toStr(src_dir) + '进口 - ' + srcDir_toStr(src_reverse[src_dir]) + '出口') err_src_dict.append(srcDir_toStr(src_dir) + '进口 - ' + srcDir_toStr(src_reverse[src_dir]) + '出口')
key = crossid key = crossid

View File

@ -2,7 +2,9 @@
# @Author: Owl # @Author: Owl
# @Date: 2025/11/18 9:50 # @Date: 2025/11/18 9:50
# @Description: 巡检相关接口入口类 # @Description: 巡检相关接口入口类
from flask import request import gzip
from flask import request, jsonify, Response
from app.cross_monitor_worker import * from app.cross_monitor_worker import *
from app.cross_eva_views import app from app.cross_eva_views import app
@ -15,22 +17,66 @@ def get_monitor_usable_dates():
@app.route('/api/query_monitor_data', methods=['POST']) @app.route('/api/query_monitor_data', methods=['POST'])
def query_monitor_data_api(): def query_monitor_data_api():
return query_monitor_data(request.json) res = query_monitor_data(request.json)
# 手动gzip压缩
compressed_data = gzip.compress(res.encode('utf-8'), compresslevel=6)
return Response(
compressed_data,
headers={
'Content-Encoding': 'gzip',
'Content-Type': 'application/json',
'Content-Length': str(len(compressed_data))
}
)
@app.route('/api/query_monitor_data_trend', methods=['POST']) @app.route('/api/query_monitor_data_trend', methods=['POST'])
def query_monitor_data_trend_api(): def query_monitor_data_trend_api():
return query_monitor_data_trend(request.json) res = query_monitor_data_trend(request.json)
# 手动gzip压缩
compressed_data = gzip.compress(res.encode('utf-8'), compresslevel=6)
return Response(
compressed_data,
headers={
'Content-Encoding': 'gzip',
'Content-Type': 'application/json',
'Content-Length': str(len(compressed_data))
}
)
@app.route('/api/query_cross_tp_data_trend', methods=['POST']) @app.route('/api/query_cross_tp_data_trend', methods=['POST'])
def query_cross_tp_data_trend_api(): def query_cross_tp_data_trend_api():
return query_cross_tp_data_trend(request.json) res = query_cross_tp_data_trend(request.json)
# 手动gzip压缩
compressed_data = gzip.compress(res.encode('utf-8'), compresslevel=6)
return Response(
compressed_data,
headers={
'Content-Encoding': 'gzip',
'Content-Type': 'application/json',
'Content-Length': str(len(compressed_data))
}
)
@app.route('/api/query_monitor_problems', methods=['POST']) @app.route('/api/query_monitor_problems', methods=['POST'])
def query_monitor_problems_api(): def query_monitor_problems_api():
return query_monitor_problems(request.json) res = query_monitor_problems(request.json)
# 手动gzip压缩
compressed_data = gzip.compress(res.encode('utf-8'), compresslevel=6)
return Response(
compressed_data,
headers={
'Content-Encoding': 'gzip',
'Content-Type': 'application/json',
'Content-Length': str(len(compressed_data))
}
)
@app.route('/api/update_cross_problem_shield_state', methods=['POST']) @app.route('/api/update_cross_problem_shield_state', methods=['POST'])

View File

@ -95,6 +95,83 @@ class TmnetDbHelper(TableDbHelperBase):
""" % (nodeid, crossid, nodeid, crossid) """ % (nodeid, crossid, nodeid, crossid)
return self.do_select(sql) return self.do_select(sql)
def query_crosses_inroads(self, crossids, nodeid):
crossids_str = "'" + "', '".join(item for item in crossids) + "'"
sql = """
select
t1.roadid,
if(t2.from_crossid is not null, t2.from_crossid, t1.from_crossid) as from_crossid,
if(t2.to_crossid is not null, t2.to_crossid, t1.to_crossid) as to_crossid,
if(t2.name is not null, t2.name, t1.name) as name,
if(t2.src_direct is not null, t2.src_direct, t1.src_direct) as src_direct,
if(t2.lane_turn_info is not null, t2.lane_turn_info, t1.lane_turn_info) as lane_turn_info
from
(select * from road where nodeid = '%s' and recordstate=0 and to_crossid in (%s) and (is_sup_road is null or is_sup_road<>1)) t1
left join
(select * from road_ledger_update_info where nodeid = '%s' and recordstate=0 and to_crossid in (%s) and (is_sup_road is null or is_sup_road<>1)) t2
on t1.roadid = t2.roadid;
""" % (nodeid, crossids_str, nodeid, crossids_str)
return self.do_select(sql)
def query_road_by_crosses_from(self, nodeid, crossids):
crossids_str = "'" + "', '".join(item for item in crossids) + "'"
sql_query = """
select
t1.roadid,
if(t2.from_crossid is not null, t2.from_crossid, t1.from_crossid) as from_crossid,
if(t2.to_crossid is not null, t2.to_crossid, t1.to_crossid) as to_crossid,
if(t2.name is not null, t2.name, t1.name) as name,
if(t2.coords is not null, t2.coords, t1.coords) as coords,
if(t2.lane_num is not null, t2.lane_num, t1.lane_num) as lane_num,
if(t2.lane_turn_info is not null, t2.lane_turn_info, t1.lane_turn_info) as lane_turn_info
from
(select * from road where nodeid='%s' and recordstate=0 and from_crossid in (%s) and (is_sup_road is null or is_sup_road<>1)) t1
left join
(select * from road_ledger_update_info where nodeid = '%s' and recordstate=0 and from_crossid in (%s) and (is_sup_road is null or is_sup_road<>1)) t2
on t1.roadid = t2.roadid;
""" % (nodeid, crossids_str, nodeid, crossids_str)
return self.do_select(sql_query)
def query_road_by_cross_from(self, nodeid, crossid):
sql_query = """
select
t1.roadid,
if(t2.from_crossid is not null, t2.from_crossid, t1.from_crossid) as from_crossid,
if(t2.to_crossid is not null, t2.to_crossid, t1.to_crossid) as to_crossid,
if(t2.name is not null, t2.name, t1.name) as name,
if(t2.coords is not null, t2.coords, t1.coords) as coords,
if(t2.lane_num is not null, t2.lane_num, t1.lane_num) as lane_num,
if(t2.lane_turn_info is not null, t2.lane_turn_info, t1.lane_turn_info) as lane_turn_info
from
(select * from road where nodeid='%s' and recordstate=0 and from_crossid='%s' and (is_sup_road is null or is_sup_road<>1)) t1
left join
(select * from road_ledger_update_info where nodeid = '%s' and recordstate=0 and from_crossid='%s' and (is_sup_road is null or is_sup_road<>1)) t2
on t1.roadid = t2.roadid;
""" % (nodeid, crossid, nodeid, crossid)
return self.do_select(sql_query)
def query_user_definded_raod_by_roadid(self, roadid):
sql = "select coords from user_defined_roads where roadid = '%s'"
return self.do_select(sql % (roadid))
def query_user_definded_raod(self, crossid, nodeid):
sql = "select * from user_defined_roads where nodeid = '%s' and to_crossid = '%s' and recordstate=0 and (is_sup_road is null or is_sup_road<>1)"
return self.do_select(sql % (nodeid, crossid))
def query_user_definded_road_by_cross_list(self, crossid_list, nodeid):
crossid_str = "'" + "', '".join(item for item in crossid_list) + "'"
sql = "select * from user_defined_roads where nodeid = '%s' and to_crossid in (%s) and recordstate=0 and (is_sup_road is null or is_sup_road<>1)"
return self.do_select(sql % (nodeid, crossid_str))
def query_user_defined_raod_by_cross_list_from(self, crossid_list, nodeid):
crossid_str = "'" + "', '".join(item for item in crossid_list) + "'"
sql = "select * from user_defined_roads where nodeid = '%s' and from_crossid in (%s) and recordstate=0 and (is_sup_road is null or is_sup_road<>1)"
return self.do_select(sql % (nodeid, crossid_str))
def query_user_definded_raod_by_cross_from(self, crossid, nodeid):
sql = "select * from user_defined_roads where nodeid = '%s' and from_crossid = '%s' and recordstate=0 and (is_sup_road is null or is_sup_road<>1)"
return self.do_select(sql % (nodeid, crossid))
def query_base_info(self, nodeid, area_id): def query_base_info(self, nodeid, area_id):
sql = "select * from ledger.leger_base_info where nodeid= %d and area_id = %s" % (int(nodeid), int(area_id)) sql = "select * from ledger.leger_base_info where nodeid= %d and area_id = %s" % (int(nodeid), int(area_id))
return self.do_select(sql) return self.do_select(sql)