Compare commits

...

3 Commits

Author SHA1 Message Date
yinzijian 4219d99106 新增绿波优化任务相关查询绿波时段路口配时方案
Signed-off-by: yinzijian <yinzijian@haomozhixing.onaliyun.com>
2026-03-19 16:53:47 +08:00
wangxu a8d074b463 修复绿波数据库链接初始化问题 2026-03-19 14:49:51 +08:00
wangxu 6e1220ea80 提交绿波优化任务工作流相关代码,未完成 2026-03-19 14:26:36 +08:00
14 changed files with 594 additions and 67 deletions

View File

@ -241,6 +241,24 @@ def init_with_config():
if config.has_option('task_db', 'password'):
password = config.get('task_db', 'password')
g_task_db['password'] = password
# 新增绿波相关mysql链接信息
if config.has_option('wave_db', 'host'):
host = config.get('wave_db', 'host')
g_wave_tool_db['host'] = host
if config.has_option('wave_db', 'port'):
port = int(config.get('wave_db', 'port'))
g_wave_tool_db['port'] = port
if config.has_option('wave_db', 'dbname'):
dbname = config.get('wave_db', 'dbname')
g_wave_tool_db['db'] = dbname
if config.has_option('wave_db', 'user'):
user = config.get('wave_db', 'user')
g_wave_tool_db['user'] = user
if config.has_option('wave_db', 'password'):
password = config.get('wave_db', 'password')
g_wave_tool_db['password'] = password
# redis
if config.has_section('redis'):
if config.has_option('redis', 'ip'):
@ -279,6 +297,8 @@ def init_with_config():
g_cross_delay_db['host'] = config.get('cross_delay_db', 'host_inner')
if config.has_option('task_db', 'host_inner'):
g_task_db['host'] = config.get('task_db', 'host_inner')
if config.has_option('wave_db', 'host_inner'):
g_wave_db['host'] = config.get('wave_db', 'host_inner')
print(g_dbinfo)
print(g_roadnet_db)
print(g_cloud_db)
@ -291,6 +311,7 @@ def init_with_config():
g_user_pool.init_pool(g_user_db)
g_cross_delay_pool.init_pool(g_cross_delay_db)
g_task_pool.init_pool(g_task_db)
g_wave_db.init_pool(g_wave_tool_db)
# 本机服务配置
if config.has_section('server'):

View File

@ -26,7 +26,7 @@ def query_monitor_task_usable_date_list(params):
if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list:
return json.dumps(make_common_res(5, '辖区id异常请检查后重试'))
day_list, week_list = db_cross.query_monitor_task_dates(nodeid, area_id)
day_list, week_list, month_list = db_cross.query_monitor_task_dates(nodeid, area_id)
tp_desc = db_tmnet.query_city_tp_info(nodeid, area_id)
if not tp_desc:
tp_info = [
@ -52,6 +52,7 @@ def query_monitor_task_usable_date_list(params):
res['data'] = {
'days': day_list,
'weeks': week_list,
'months': month_list,
'tp_info': tp_info,
'peak_tp': peak_tp
}
@ -78,7 +79,7 @@ def query_monitor_data(params):
date_type = check_param(params, 'date_type')
if not date_type:
return json.dumps(make_common_res(6, '缺少查询时间类型, 请刷新后重试'))
if date_type not in ['day', 'week', 'workday', 'weekend']:
if date_type not in ['day', 'week', 'workday', 'weekend', 'month']:
return json.dumps(make_common_res(7, '查询时间类型异常, 请刷新后重试'))
special_time_range = check_param(params, 'special_time_range')
if not special_time_range or special_time_range == '00:00-23:59':
@ -129,7 +130,7 @@ def query_monitor_data_trend(params):
date_type = check_param(params, 'date_type')
if not date_type:
return json.dumps(make_common_res(6, '缺少查询时间类型, 请刷新后重试'))
if date_type not in ['day', 'week', 'workday', 'weekend']:
if date_type not in ['day', 'week', 'workday', 'weekend', 'month']:
return json.dumps(make_common_res(7, '查询时间类型异常, 请刷新后重试'))
query_date = check_param(params, 'query_date')
if not query_date:
@ -165,7 +166,7 @@ def query_cross_tp_data_trend(params):
date_type = check_param(params, 'date_type')
if not date_type:
return json.dumps(make_common_res(6, '缺少查询时间类型, 请刷新后重试'))
if date_type not in ['day', 'week', 'workday', 'weekend']:
if date_type not in ['day', 'week', 'workday', 'weekend', 'month']:
return json.dumps(make_common_res(7, '查询时间类型异常, 请刷新后重试'))
weekdays = check_param(params, 'weekdays')
if not weekdays:
@ -192,7 +193,7 @@ def query_cross_tp_data_trend(params):
cross_static_info, cross_ledger_info = gen_cross_static_info(crossid, nodeid, area_id, cross_ledger_info_dict)
roads_dir_dict = gen_road_dir_dict(cross_ledger_info)
if date_type == 'day':
if date_type in ('day', 'month'):
month_ago_date = (datetime.now().date() - timedelta(days=30)).strftime('%Y%m%d')
now_prev_date = (datetime.now().date() - timedelta(days=1)).strftime('%Y%m%d')
date_list = generate_date_range(month_ago_date, now_prev_date)
@ -244,7 +245,7 @@ def query_monitor_problems(params):
date_type = check_param(params, 'date_type')
if not date_type:
return json.dumps(make_common_res(6, '缺少查询时间类型, 请刷新后重试'))
if date_type not in ['day', 'week', 'workday', 'weekend']:
if date_type not in ['day', 'week', 'workday', 'weekend', 'month']:
return json.dumps(make_common_res(7, '查询时间类型异常, 请刷新后重试'))
special_time_range = check_param(params, 'special_time_range')
if not special_time_range or special_time_range == '00:00-23:59':

View File

@ -67,11 +67,14 @@ class CrossDbHelper(TableDbHelperBase):
def query_monitor_task_dates(self, nodeid, area_id):
day_sql = f"select distinct day from traffic_{nodeid}.cross_inspect where citycode = {nodeid} and area_id = {area_id} and type = 'day'"
week_sql = f"select distinct day from traffic_{nodeid}.cross_inspect where citycode = {nodeid} and area_id = {area_id} and type = 'week'"
month_sql = f"select distinct day from traffic_{nodeid}.cross_inspect where citycode = {nodeid} and area_id = {area_id} and type = 'month'"
day_date_list = self.do_select(day_sql)
week_date_list = self.do_select(week_sql)
month_date_list = self.do_select(month_sql)
day_list = [item['day'] for item in day_date_list]
week_list = [item['day'] for item in week_date_list]
return day_list, week_list
month_list = [item['day'] for item in month_date_list]
return day_list, week_list, month_list
def query_monitor_data_sql(self, nodeid, area_id, date_type, query_date):
if date_type in ['week', 'workday', 'weekend']:
@ -86,6 +89,9 @@ class CrossDbHelper(TableDbHelperBase):
if date_type in ['week', 'workday', 'weekend']:
date_type = 'week'
limit_num = 10
elif date_type == 'month':
date_type = 'month'
limit_num = 12
sql = f"""
select * from traffic_{nodeid}.cross_inspect where area_id = {area_id} and type = '{date_type}' order by day desc limit {limit_num}
"""

View File

@ -6,6 +6,7 @@ from app.models import *
from app.models_wave import RoadLinkManager
from app.phase_db_func import PhaseTableDbHelper
from app.task_db_func import TaskDbHelper
from app.wave_db_func import WaveDBFunction
from app.workstation_db_function import WorkstationDbHelper
from tool.mysql_common_connector_pool import *
from app.user_db_func import *
@ -69,6 +70,14 @@ g_task_db = {
'db': 'task'
}
g_wave_tool_db = {
'host': '43.140.225.219',
'port': 3306,
'user': 'root',
'password': 'pmenJIn7EaK40oThn~~',
'db': 'greenwave'
}
# 服务配置
g_citycode_set = set()
g_config = {}
@ -89,6 +98,7 @@ g_cloud_pool = DatabaseManager(g_cloud_db)
g_cross_delay_pool = DatabaseManager(g_cross_delay_db)
g_user_pool = DatabaseManager(g_user_db)
g_task_pool = DatabaseManager(g_task_db)
g_wave_db = DatabaseManager(g_wave_tool_db)
# 全局的数据库对象
@ -102,6 +112,7 @@ db_task = TaskDbHelper(g_task_pool)
db_phasetable = PhaseTableDbHelper(g_db_pool)
db_tmnet = TmnetDbHelper(g_roadnet_pool)
db_workstation = WorkstationDbHelper(g_db_pool)
db_wave = WaveDBFunction(g_wave_db)
nodeid_list = []

View File

@ -1360,8 +1360,10 @@ def phase_tp_check_problems(routing_crosses, special_time_range, first_date, dat
6: '自动结束'
}
min_date = first_date
if date_type != 'day':
if date_type in ('week', 'workday', 'weekend'):
min_date = (datetime.strptime(first_date, "%Y%m%d") - timedelta(days=7)).strftime("%Y%m%d")
elif date_type == 'month':
min_date = datetime.strptime(first_date, '%Y%m%d').replace(day=1)
cross_examine_records = db_cross.query_crosses_examine_records(crossid_list, first_date, min_date)
for row in cross_examine_records:
start_hm = row['start_hm']
@ -1438,7 +1440,7 @@ def phase_tp_check_problems(routing_crosses, special_time_range, first_date, dat
# 配时方案相关异常
def monitor_phase_problems(nodeid, area_id, date_type, query_date, special_time_range, shield_info, routing_crosses, filter_shield):
weekdays = str(datetime.strptime(query_date, "%Y%m%d").weekday() + 1)
if date_type == 'week':
if date_type in ('week', 'month'):
weekdays = '1,2,3,4,5,6,7'
elif date_type == 'workday':
weekdays = '1,2,3,4,5'

View File

@ -175,13 +175,13 @@ class TaskDbHelper(TableDbHelperBase):
return count
def add_task(self, timestamp, creatorid, task_name, task_type, data_type, plan_begin_time, plan_end_time,publish_time,
executor, progress, task_state, description, crossids, sectionids, arteryids, comment,
executor, progress, task_state, description, crossids, waveid, wave_name, comment,
record_state, task_src, task_class, nodeid, area_id, task_type_class, full_review):
sql_insert = "insert into task (timestamp, creatorid, task_name, task_type, data_type, plan_begin_time, plan_end_time,publish_time," \
" executor, progress, task_state, description, crossids, sectionids, arteryids, comment," \
" executor, progress, task_state, description, crossids, waveid, wave_name, comment," \
" record_state, task_src, task_class, nodeid, area_id, task_type_class, full_review) values('%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s', %s, %s, %s)" % (
timestamp, creatorid, task_name, task_type, data_type, plan_begin_time, plan_end_time,publish_time,
executor, progress, task_state, description, crossids, sectionids, arteryids, comment,
executor, progress, task_state, description, crossids, waveid, wave_name, comment,
record_state, task_src, task_class, nodeid, area_id, task_type_class, full_review)
count = self.do_execute(sql_insert)
@ -257,6 +257,34 @@ class TaskDbHelper(TableDbHelperBase):
""" % task_no
return self.do_select(sql)
def query_ledger_task_crosses_info_by_area_id(self, nodeid, area_id):
sql = """
select * from ledger_task_detail where nodeid = %s and area_id = %s
""" % (nodeid, area_id)
res = self.do_select(sql)
task_info_dict = {}
for row in res:
task_no = row['task_no']
if task_no not in task_info_dict.keys():
task_info_dict[task_no] = []
task_info_dict[task_no].append(row)
task_res_dict = {}
for task_no in task_info_dict.keys():
task_cross_num = len(task_info_dict[task_no])
entered_cross_num = len([cross for cross in task_info_dict[task_no] if cross['ledger_status'] == 2 and cross['phase_status'] in (1, 3)])
approve_cross_num = len([cross for cross in task_info_dict[task_no] if cross['submit_status'] == 2])
entered_percent = int(entered_cross_num * 100 / task_cross_num) if task_cross_num > 0 else 0
approve_percent = int(approve_cross_num * 100 / task_cross_num) if task_cross_num > 0 else 0
task_res_dict[task_no] = {
'task_no': task_no,
'task_cross_num': task_cross_num,
'entered_cross_num': entered_cross_num,
'entered_percent': entered_percent,
'approve_cross_num': approve_cross_num,
'approve_percent': approve_percent
}
return task_res_dict
def query_ledger_task_crosses_pics(self, crossid_list):
crossids = "'" + "', '".join(item for item in crossid_list) + "'"
sql = """
@ -358,6 +386,22 @@ class TaskDbHelper(TableDbHelperBase):
""" % taskno
return self.do_select(sql)
def query_cross_entering_status(self, crossid_list):
crossids = "'" + "', '".join(item for item in crossid_list) + "'"
sql = """
select crossid, ledger_status, phase_status, update_time from ledger_task_detail where crossid in (%s) order by update_time asc
""" % crossids
return self.do_select(sql)
def query_greenwave_task_requirement_validation_info_sql(self, task_no, nodeid, area_id):
sql = """
select * from greenwave_task_detail where task_no = %s and nodeid = %s and area_id = %s
""" % (task_no, nodeid, area_id)
res = self.do_select(sql)
if res:
return res[0]['requirement_validation']
return None
#
# if __name__ == '__main__':
# tt_5min = get_latest_5min_timestamp()

View File

@ -91,14 +91,7 @@ def do_query_task_list(params):
if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list:
return json.dumps(make_common_res(5, '辖区id异常请检查后重试'))
#任务名称`task_name` varchar(300) DEFAULT NULL COMMENT '任务名称',
#任务类型`task_type` varchar(300) DEFAULT NULL COMMENT '任务类型',
#任务等级`task_class` varchar(300) DEFAULT NULL COMMENT '需求等级';
#计划开始时间`plan_begin_time` bigint DEFAULT NULL COMMENT '计划开始时间',
#计划结束时间`plan_end_time` bigint DEFAULT NULL COMMENT '计划结束时间',
#任务状态`task_state` int NOT NULL COMMENT '任务状态 0未开始1进行中2完成3挂起4废止',
#负责人`executor` varchar(300) DEFAULT NULL COMMENT '负责人',
#需求来源 `task_src` varchar(300) DEFAULT NULL COMMENT '需求来源';
all_task_list = db_task.query_all_tak_list(nodeid, area_id)
upload_file_dict = db_task.query_task_file(nodeid, area_id)
for task_info in all_task_list:
@ -137,52 +130,47 @@ def do_query_task_list(params):
for i in range(len(task_name)):
if any('\u4e00' <= c <= '\u9fff' for c in task_info['task_name']) and task_name[i] == list(task_name_pinyin)[i][0] and task_info not in name_list:
name_list.append(task_info)
filter_conditions = []
if task_type and len(task_type) > 0:
type_list.extend([task_info for task_info in all_task_list if int(task_type) == task_info['task_type']])
if len(type_list) == 0:
# 用于多种筛选条件取交集使用
type_list.append({'arteryids': -1})
filter_conditions.append((True, type_list))
if task_class and len(task_class) > 0:
class_list.extend([task_info for task_info in all_task_list if int(task_class) == task_info['task_class']])
if len(class_list) == 0:
# 用于多种筛选条件取交集使用
class_list.append({'arteryids': -1})
filter_conditions.append((True, class_list))
if plan_begin_time and len(plan_begin_time) > 0:
begin_list.extend([task_info for task_info in all_task_list if int(plan_begin_time) <= task_info['plan_begin_time']])
if len(begin_list) == 0:
# 用于多种筛选条件取交集使用
begin_list.append({'arteryids': -1})
filter_conditions.append((True, begin_list))
if plan_end_time and len(plan_end_time) > 0:
end_list.extend([task_info for task_info in all_task_list if int(plan_end_time) >= task_info['plan_end_time']])
if len(end_list) == 0:
# 用于多种筛选条件取交集使用
end_list.append({'arteryids': -1})
filter_conditions.append((True, end_list))
if task_state and len(task_state) > 0:
state_list.extend([task_info for task_info in all_task_list if int(task_state) == task_info['task_state']])
if len(state_list) == 0:
# 用于多种筛选条件取交集使用
state_list.append({'arteryids': -1})
filter_conditions.append((True, state_list))
if executor and len(executor) > 0:
executor_list.extend([task_info for task_info in all_task_list if executor == task_info['executor']])
if len(executor_list) == 0:
# 用于多种筛选条件取交集使用
executor_list.append({'arteryids': -1})
filter_conditions.append((True, executor_list))
if task_src and len(task_src) > 0:
task_src_list.extend([task_info for task_info in all_task_list if task_src == task_info['task_src']])
if len(task_src_list) == 0:
# 用于多种筛选条件取交集使用
task_src_list.append({'arteryids': -1})
non_empty_lists = [lst for lst in [name_list, type_list, class_list, begin_list, end_list, state_list, executor_list, task_src_list] if len(lst) > 0]
if non_empty_lists:
res_list = intersect_dicts(non_empty_lists)
else:
filter_conditions.append((True, task_src_list))
has_empty_filter = any(has_condition and len(lst) == 0 for has_condition, lst in filter_conditions)
if has_empty_filter:
res_list = []
else:
non_empty_lists = [lst for lst in [name_list, type_list, class_list, begin_list, end_list, state_list, executor_list, task_src_list] if len(lst) > 0]
if non_empty_lists:
res_list = intersect_dicts(non_empty_lists)
else:
res_list = []
if task_no and len(task_no) > 0:
res_list = [task_info for task_info in all_task_list if int(task_no) == task_info['taskno']]
if not task_name and not task_type and not task_class and not plan_begin_time and not plan_end_time and not task_state and not executor and not task_src and not task_no:
res_list = all_task_list
# task_list = db_task.query_task_list(task_name,task_type,data_type, task_class,plan_begin_time,plan_end_time,publish_time,task_state,executor,task_src, nodeid)
filtered_list = [d for d in res_list if d['arteryids'] != -1]
task_entering_info_dict = db_task.query_ledger_task_crosses_info_by_area_id(nodeid, area_id)
filtered_list = res_list
for task_info in filtered_list:
comment = task_info['comment']
if '_split_suffix_for_query_info' in comment:
@ -193,7 +181,7 @@ def do_query_task_list(params):
task_info['comment'] = comment_list
if task_info['task_type_class'] == 1:
item_task_no = task_info['taskno']
ledger_task_add_info = query_ledger_task_additional_info(item_task_no, nodeid, area_id)
ledger_task_add_info = task_entering_info_dict[item_task_no]
task_info['entered_percent'] = 100 if task_info['task_state'] == 4 else ledger_task_add_info['entered_percent']
task_info['approve_percent'] = 100 if task_info['task_state'] == 4 else ledger_task_add_info['approve_percent']
sorted_list = sorted(filtered_list, key=sort_key)
@ -483,15 +471,16 @@ def do_add_task(params):
data_type = check_param(params, 'data_type')
if not data_type:
return json.dumps(make_common_res(2, '关联路网状态缺失,请检查后重试'))
crossids, arteryids, sectionids = '', '', ''
crossids, waveid, wave_name = '', '', ''
if int(data_type) == 1:
crossids = check_param(params, 'crossids')
if not crossids:
return json.dumps(make_common_res(2, '当前所选关联路网状态为:关联路口,但关联路口信息缺失,请检查后重试'))
elif int(data_type) == 2:
arteryids = check_param(params, 'arteryids')
if not arteryids:
return json.dumps(make_common_res(2, '当前所选关联路网状态为:关联干线,但关联干线信息缺失,请检查后重试'))
elif int(data_type) == 3:
waveid = check_param(params, 'waveid')
wave_name = check_param(params, 'wave_name')
if not waveid or not wave_name:
return json.dumps(make_common_res(2, '当前所选关联路网状态为:关联绿波,但关联绿波信息缺失,请检查后重试'))
add_type = check_param(params, 'add_type')
if not add_type or add_type == 'normal':
plan_begin_time = check_param(params, 'plan_begin_time')
@ -540,7 +529,7 @@ def do_add_task(params):
if add_type and add_type != 'normal':
task_state = 0
count = db_task.add_task(timestamp, creatorid, task_name, task_type, data_type, plan_begin_time, plan_end_time, publish_time,
executor, progress, task_state, description, crossids, sectionids, arteryids, comment,
executor, progress, task_state, description, crossids, waveid, wave_name, comment,
record_task_state, task_src, task_class, nodeid, area_id, task_type_class, full_review)
if count != 1:
logging.error(str(params) + ' 添加任务报错!')
@ -575,6 +564,16 @@ def do_add_task(params):
if e:
logging.error(e)
return json.dumps(make_common_res(2, '台账任务创建成功,但路口列表配时方案状态更新失败,请反馈该情况至管理员!'))
elif task_type_class == 3:
if full_review != 1:
requirement_validation_json = check_param(params, 'requirement_validation_json')
if not requirement_validation_json:
return json.dumps(make_common_res(2, '缺少需求核验单信息,请检查后重试'))
predict_issue_time = check_param(params, 'predict_issue_time')
if not predict_issue_time:
return json.dumps(make_common_res(2, '计划下发时间信息,请检查后重试'))
pass
res = make_common_res(0, 'ok')
res['nodeid'] = nodeid
@ -966,9 +965,13 @@ def do_query_task_detail(params):
if task['task_type_class'] == 1:
ledger_task_additional_info = query_ledger_task_additional_info(taskno, nodeid, area_id)
task['ledger_task_additional_info'] = ledger_task_additional_info
if task['task_state'] == 4:
task['ledger_task_additional_info']['entered_percent'] = 100
task['ledger_task_additional_info']['approve_percent'] = 100
if task['task_state'] == 4:
task['ledger_task_additional_info']['entered_percent'] = 100
task['ledger_task_additional_info']['approve_percent'] = 100
elif task['task_type_class'] == 3:
# 绿波优化任务
greenwave_task_additional_info = gen_greenwave_task_additional_info(taskno, nodeid, area_id)
pass
task.pop('update_time')
res['desc'] = ''
res['data'] = task
@ -1013,10 +1016,11 @@ def gen_update_sql(params, task_old_info):
crossids = check_param(params, 'crossids')
if crossids and crossids != '' and crossids != task_old_info['crossids']:
modify_data += f", crossids = '{crossids}'"
elif int(data_type) == 2:
arteryids = check_param(params, 'arteryids')
if arteryids and arteryids != '' and arteryids != task_old_info['arteryids']:
modify_data += f", arteryids = '{arteryids}'"
elif int(data_type) == 3:
waveid = check_param(params, 'waveid')
wave_name = check_param(params, 'wave_name')
if (waveid and waveid != '' and waveid != task_old_info['waveid']) or (wave_name and wave_name != '' and wave_name != task_old_info['wave_name']):
modify_data += f", waveid = '{waveid}', wave_name = '{wave_name}'"
modify_item += prefix
modify_item += f"【编辑】关联路网信息:{task_old_info['data_type']} -> {data_type}"
executor = check_param(params, 'executor')
@ -1337,3 +1341,157 @@ def get_year_week(time_input):
# 使用 ISO 标准获取年周
year, week, _ = time_input.isocalendar()
return f"{year}-{week:02d}"
# 下方为绿波优化任务工作流相关代码 20260317
def verify_cross_entering_status(params):
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少nodeid 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少area_id 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少userid 请刷新后重试'))
area_list = db_user.query_areaid_list(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = map(int, area_list)
if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list:
return json.dumps(make_common_res(5, 'area_id异常请检查后重试'))
crossid_list = check_param(params, 'crossid_list')
if not crossid_list or len(crossid_list) < 1:
return json.dumps(make_common_res(6, '缺少crossid_list 请刷新后重试'))
cross_enter_status = db_task.query_cross_entering_status(crossid_list)
cross_status_dict = {}
for row in cross_enter_status:
cross_status_dict[row['crossid']] = {
'ledger_status': row['ledger_status'],
'phase_status': row['phase_status']
}
verify_res = True
for crossid in crossid_list:
if crossid not in cross_status_dict.keys():
verify_res = False
break
if cross_status_dict[crossid]['ledger_status'] != 1 or cross_status_dict[crossid]['phase_status'] not in [1, 3]:
verify_res = False
break
if verify_res:
return json.dumps(make_common_res(0, 'ok'))
else:
return json.dumps(make_common_res(1, '验证未通过'))
def query_usable_wave_list(params):
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少nodeid 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少area_id 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少userid 请刷新后重试'))
area_list = db_user.query_areaid_list(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = map(int, area_list)
if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list:
return json.dumps(make_common_res(5, 'area_id异常请检查后重试'))
area_cross_list = db_tmnet.query_cross_list_sql(nodeid, area_id)
area_crossid_list = [row['crossid'] for row in area_cross_list]
node_wave_cross_dict = db_wave.query_node_wave(nodeid)
usable_wave_list = []
for waveid in node_wave_cross_dict.keys():
wave_info = node_wave_cross_dict[waveid]
cross_list = wave_info['cross_list']
if set(cross_list) & set(area_crossid_list):
usable_wave_list.append(wave_info)
res = make_common_res(0, 'ok')
res['data'] = {
'wave_list': usable_wave_list,
'cross_list': area_cross_list
}
return json.dumps(res)
def query_wave_task_requirement_validation_detail(params):
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少nodeid 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少area_id 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少userid 请刷新后重试'))
area_list = db_user.query_areaid_list(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = map(int, area_list)
if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list:
return json.dumps(make_common_res(5, 'area_id异常请检查后重试'))
task_no = check_param(params, 'task_no')
if not task_no:
return json.dumps(make_common_res(6, '缺少任务编号, 请刷新后重试'))
requirement_validation_res = db_task.query_greenwave_task_requirement_validation_info_sql(task_no, nodeid, area_id)
if not requirement_validation_res:
return json.dumps(make_common_res(7, '需求确认单信息异常'))
res = make_common_res(0, 'ok')
res['data'] = requirement_validation_res
return json.dumps(res)
def query_wave_task_params(params):
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少nodeid 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少area_id 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少userid 请刷新后重试'))
area_list = db_user.query_areaid_list(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = map(int, area_list)
if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list:
return json.dumps(make_common_res(5, 'area_id异常请检查后重试'))
waveid = check_param(params, 'waveid')
if not waveid:
waveid = ''
wave_name = check_param(params, 'wave_name')
if not wave_name:
wave_name = ''
wave_crosses = check_param(params, 'wave_crosses')
if not wave_crosses or len(wave_crosses) < 1:
return json.dumps(make_common_res(8, '缺少wave_crosses 请刷新后重试'))
srcDir = check_param(params, 'srcDir')
if not srcDir:
srcDir = ''
# 需要查明 路口名称、信号机品牌、 最大相位数量、绿波时段信息、配时方案信息
wave_tp_info = []
if waveid != '' and srcDir != '':
wave_tp_info = db_wave.query_wave_tp_infos(waveid, srcDir)
wave_crosses_infos = db_tmnet.query_cross_infos(wave_crosses)
wave_cross_info_dict = {row['crossid']: row for row in wave_crosses_infos}
phase_infos, e = None, None
for crossid in wave_cross_info_dict.keys():
pass
def gen_greenwave_task_additional_info(taskno, nodeid, area_id):
# additional_info =
pass

View File

@ -675,9 +675,10 @@ class TmnetDbHelper(TableDbHelperBase):
t1.crossid,
if (t2.location is not null, t2.location, t1.location) as location,
t1.nodeid,
t1.area_id
from (select name,crossid, location,nodeid, area_id from `cross` where crossid in (%s) and at_edge=0 and isdeleted=0 ) as t1
left join (select name,crossid, location,nodeid, area_id from `cross_ledger_update_info` where crossid in (%s) and at_edge=0 and isdeleted=0 ) as t2 on t1.crossid=t2.crossid
t1.area_id,
if(t2.slc_company is not null, t2.slc_company, t1.slc_company) as slc_company
from (select name,crossid, location,nodeid, area_id, slc_company from `cross` where crossid in (%s) and at_edge=0 and isdeleted=0 ) as t1
left join (select name,crossid, location,nodeid, area_id, slc_company from `cross_ledger_update_info` where crossid in (%s) and at_edge=0 and isdeleted=0 ) as t2 on t1.crossid=t2.crossid
""" % (crossids, crossids)
cross_list = self.do_select(sql)
virtual_cross_sql = f'select name, crossid, location, nodeid, area_id from user_defined_cross where crossid in ({crossids})'

100
app/wave_db_func.py Normal file
View File

@ -0,0 +1,100 @@
# -*- coding: utf-8 -*-
# @Author: Owl
# @Date: 2026/3/17 12:01
# @Description:
from app.db_func_base import *
class WaveDBFunction(TableDbHelperBase):
def __init__(self, pool):
self.db_pool = pool
self.DB_Name = 'greenwave'
def query_node_wave(self, nodeid):
sql = """
select t1.name as wave_name, t1.waveid, t2.crossid, t1.service_status, t1.srcDir, t1.first_left, t1.last_left from wave_cross t2
inner join
(select name, waveid, service_status, srcDir, first_left, last_left from greenwave where orgid = %s and service_status = 1 and status = 0) t1
on t1.waveid = t2.waveid
""" % nodeid
res = self.do_select(sql)
wave_cross_dict = {}
for wave_name, waveid, crossid, service_status, srcDir, first_left, last_left in res:
left_coor = 0 if first_left == 0 and last_left == 0 else 1
if waveid not in wave_cross_dict:
wave_cross_dict[waveid] = {
'wave_name': wave_name,
'waveid': waveid,
'src_dir': srcDir,
'left_coor': left_coor,
'service_status': service_status,
'cross_list': [crossid]
}
else:
wave_cross_dict[waveid]['cross_list'].append(crossid)
return wave_cross_dict
def query_wave_tp_infos(self, waveid, srcDir):
sql = """
select type, weekday, tp_start, tp_end from wave_tps where waveid = '%s'
""" % waveid
res = self.do_select(sql)
wave_tp_infos = []
for type, weekday, tp_start, tp_end in res:
type_str = gen_type_str(type, srcDir)
weekday_str = gen_weekday_str(weekday)
wave_tp_infos.append({
'type_str': type_str,
'weekday_str': weekday_str,
'type': type,
'weekday': weekday,
'tp_start': tp_start,
'tp_end': tp_end
})
return wave_tp_infos
sir2Str = {
'S': ['南向北', '北向南'],
'E': ['东向西', '西向东'],
'W': ['西向东', '东向西'],
'N': ['北向南', '南向北'],
'NE': ['东北向西南', '西南向东北'],
'NW': ['西北向东南', '东南向西北'],
'SE': ['东南向西北', '西北向东南'],
'SW': ['西南向东北', '东北向西南']
}
weekday2Str = {
1: '周一',
2: '周二',
3: '周三',
4: '周四',
5: '周五',
6: '周六',
7: '周日'
}
def gen_type_str(type_int, srcDir):
type_str = '双向'
if type_int == 0:
type_str = sir2Str[srcDir][0]
elif type_int == 1:
type_str = sir2Str[srcDir][1]
return type_str
def gen_weekday_str(weekday):
if weekday == '1,2,3,4,5':
weekday_str = '工作日'
elif weekday == '6,7':
weekday_str = '节假日'
elif weekday == '1,2,3,4,5,6,7':
weekday_str = '全周'
else:
weekday_list = weekday.split(',')
weekday_str = weekday2Str[int(weekday_list[0])]
for i in range(1, len(weekday_list)):
weekday_str += '' + weekday2Str[int(weekday_list[i])]
return weekday_str

View File

@ -67,6 +67,13 @@ user = root
password = pmenJIn7EaK40oThn~~~
dbname = tmnet
[wave_db]
host = 49.232.183.190
host_inner = 172.21.32.14
user = root
password = pmenJIn7EaK40oThn~~
db_name = greenwave
[executive]
userid = 15836903493,18518481331

View File

@ -256,3 +256,75 @@ def LedgerTaskDetailPhaseState(citycode: int, crossids: List[str]):
return None, e
finally:
channel.close()
def TaskWaveCrossTpPhaseRPC(citycode: int, crossids: List[str], tps: List):
"""
绿波优化任务相关根据路口时段查询配时方案
citycode = 130200
crossids = ['crossid1','crossid2']
tps = [{'weekday': '1,2,3,4,5','tp_start':'09:00', 'tp_end':'10:00'}, {'weekday': '6,7', 'tp_start':'11:00', 'tp_end':'12:00'}]
"""
stub, channel = channel_stub()
try:
return_data = []
return_data_map = {}
request_params = phase_server_pb2.TaskWaveCrossTpPhaseRequest(citycode=citycode,crossids=crossids,tps=tps)
response = stub.TaskWaveCrossTpPhase(request_params, timeout=30)
if response.code != 0:
raise Exception(response.msg)
if len(response.data) > 0:
for item_data in response.data:
if len(item_data.details) <= 0:
continue
for item_detail in item_data.details:
if len(item_detail.stages) <= 0:
continue
if item_data.crossid not in return_data_map:
return_data_map[item_data.crossid] = {}
key = f"{item_detail.search_tp_start}-{item_detail.search_tp_end}"
if key not in return_data_map[item_data.crossid]:
return_data_map[item_data.crossid][key] = {}
return_data_map[item_data.crossid][key] = {
'max_duration': item_detail.max_duration,
'search_tp_start': item_detail.search_tp_start,
'search_tp_end': item_detail.search_tp_end,
'schedule_id': item_detail.schedule_id,
'schedule_week': item_detail.schedule_week,
'tp_start': item_detail.tp_start,
'tp_end': item_detail.tp_end,
'planid': item_detail.planid,
'plan_name': item_detail.plan_name,
'cycle': item_detail.cycle,
'offset': item_detail.offset,
'phase_list': [],
}
for item_stage in item_detail.stages:
return_data_map[item_data.crossid][key]['phase_list'].append({
'phase_name': item_stage.stage_name,
'phase_time': item_stage.stage_duration,
})
for item_crossid in crossids:
if item_crossid in return_data_map:
item_return_data = {
'crossid': item_crossid,
'tp_infos': [],
}
for item_tp in tps:
tp_key = f"{item_tp['tp_start']}-{item_tp['tp_end']}"
if tp_key not in return_data_map[item_crossid]:
continue
item_return_data['tp_infos'].append({
'tp_start': return_data_map[item_crossid][tp_key]['search_tp_start'],
'tp_end': return_data_map[item_crossid][tp_key]['search_tp_end'],
'offset': return_data_map[item_crossid][tp_key]['offset'],
'phase_list': return_data_map[item_crossid][tp_key]['phase_list']
})
if len(item_return_data['tp_infos']) > 0:
return_data.append(item_return_data)
return return_data, None
except Exception as e:
return None, e
finally:
channel.close()

View File

@ -34,6 +34,8 @@ service PhaseService {
rpc CrossPhaseDiagnosisByCity (CrossPhaseDiagnosisByCityRequest) returns (CrossPhaseDiagnosisByCityResponse);
//
rpc LedgerTaskDetailPhaseStateUpdate (LedgerTaskDetailPhaseStateUpdateRequest) returns (LedgerTaskDetailPhaseStateUpdateResponse);
//绿
rpc TaskWaveCrossTpPhase (TaskWaveCrossTpPhaseRequest) returns (TaskWaveCrossTpPhaseResponse);
}
message EmptyRequest {}
@ -428,3 +430,49 @@ message LedgerTaskDetailPhaseStateUpdateResponse {
string msg = 2;
}
message TaskWaveCrossTpPhaseRequest {
int32 citycode = 1;
repeated string crossids = 2;
repeated TpInfo tps = 3;
message TpInfo {
string weekday = 1;
string tp_start = 2;
string tp_end = 3;
}
}
message TaskWaveCrossTpPhaseResponse {
int32 code = 1;
string msg = 2;
repeated List data = 3;
message List {
string crossid = 1;
repeated detail_info details = 2;
}
message detail_info {
int32 max_duration = 1; //
string search_tp_start = 2; //
string search_tp_end = 3; //
int32 schedule_id = 4;
string schedule_week = 5;
string tp_start = 6;
string tp_end = 7;
int32 planid = 8;
string plan_name = 9;
int32 cycle = 10;
int32 coord_phaseid = 11;
int32 offset = 12;
repeated stage_data stages = 13;
}
message stage_data {
int32 stageid = 1;
string stage_name = 2;
int32 stage_duration = 3;
int32 green = 4;
int32 yellow = 5;
int32 allred = 6;
int32 redyellow = 7;
string phaseids = 8;
string phase_name = 9;
}
}

File diff suppressed because one or more lines are too long

View File

@ -114,6 +114,11 @@ class PhaseServiceStub(object):
request_serializer=phase__server__pb2.LedgerTaskDetailPhaseStateUpdateRequest.SerializeToString,
response_deserializer=phase__server__pb2.LedgerTaskDetailPhaseStateUpdateResponse.FromString,
_registered_method=True)
self.TaskWaveCrossTpPhase = channel.unary_unary(
'/phase_server.PhaseService/TaskWaveCrossTpPhase',
request_serializer=phase__server__pb2.TaskWaveCrossTpPhaseRequest.SerializeToString,
response_deserializer=phase__server__pb2.TaskWaveCrossTpPhaseResponse.FromString,
_registered_method=True)
class PhaseServiceServicer(object):
@ -231,6 +236,13 @@ class PhaseServiceServicer(object):
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def TaskWaveCrossTpPhase(self, request, context):
"""绿波优化任务相关根据时段路口配时方案接口
"""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def add_PhaseServiceServicer_to_server(servicer, server):
rpc_method_handlers = {
@ -314,6 +326,11 @@ def add_PhaseServiceServicer_to_server(servicer, server):
request_deserializer=phase__server__pb2.LedgerTaskDetailPhaseStateUpdateRequest.FromString,
response_serializer=phase__server__pb2.LedgerTaskDetailPhaseStateUpdateResponse.SerializeToString,
),
'TaskWaveCrossTpPhase': grpc.unary_unary_rpc_method_handler(
servicer.TaskWaveCrossTpPhase,
request_deserializer=phase__server__pb2.TaskWaveCrossTpPhaseRequest.FromString,
response_serializer=phase__server__pb2.TaskWaveCrossTpPhaseResponse.SerializeToString,
),
}
generic_handler = grpc.method_handlers_generic_handler(
'phase_server.PhaseService', rpc_method_handlers)
@ -756,3 +773,30 @@ class PhaseService(object):
timeout,
metadata,
_registered_method=True)
@staticmethod
def TaskWaveCrossTpPhase(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(
request,
target,
'/phase_server.PhaseService/TaskWaveCrossTpPhase',
phase__server__pb2.TaskWaveCrossTpPhaseRequest.SerializeToString,
phase__server__pb2.TaskWaveCrossTpPhaseResponse.FromString,
options,
channel_credentials,
insecure,
call_credentials,
compression,
wait_for_ready,
timeout,
metadata,
_registered_method=True)