提交绿波优化任务工作流相关代码,未完成

This commit is contained in:
wangxu 2026-03-19 14:26:36 +08:00
parent 1622eb5928
commit 6e1220ea80
10 changed files with 414 additions and 64 deletions

View File

@ -241,6 +241,24 @@ def init_with_config():
if config.has_option('task_db', 'password'):
password = config.get('task_db', 'password')
g_task_db['password'] = password
# 新增绿波相关mysql链接信息
if config.has_option('wave_db', 'host'):
host = config.get('wave_db', 'host')
g_wave_db['host'] = host
if config.has_option('wave_db', 'port'):
port = int(config.get('wave_db', 'port'))
g_wave_db['port'] = port
if config.has_option('wave_db', 'dbname'):
dbname = config.get('wave_db', 'dbname')
g_wave_db['db'] = dbname
if config.has_option('wave_db', 'user'):
user = config.get('wave_db', 'user')
g_wave_db['user'] = user
if config.has_option('wave_db', 'password'):
password = config.get('wave_db', 'password')
g_wave_db['password'] = password
# redis
if config.has_section('redis'):
if config.has_option('redis', 'ip'):
@ -279,6 +297,8 @@ def init_with_config():
g_cross_delay_db['host'] = config.get('cross_delay_db', 'host_inner')
if config.has_option('task_db', 'host_inner'):
g_task_db['host'] = config.get('task_db', 'host_inner')
if config.has_option('wave_db', 'host_inner'):
g_wave_db['host'] = config.get('wave_db', 'host_inner')
print(g_dbinfo)
print(g_roadnet_db)
print(g_cloud_db)

View File

@ -26,7 +26,7 @@ def query_monitor_task_usable_date_list(params):
if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list:
return json.dumps(make_common_res(5, '辖区id异常请检查后重试'))
day_list, week_list = db_cross.query_monitor_task_dates(nodeid, area_id)
day_list, week_list, month_list = db_cross.query_monitor_task_dates(nodeid, area_id)
tp_desc = db_tmnet.query_city_tp_info(nodeid, area_id)
if not tp_desc:
tp_info = [
@ -52,6 +52,7 @@ def query_monitor_task_usable_date_list(params):
res['data'] = {
'days': day_list,
'weeks': week_list,
'months': month_list,
'tp_info': tp_info,
'peak_tp': peak_tp
}
@ -78,7 +79,7 @@ def query_monitor_data(params):
date_type = check_param(params, 'date_type')
if not date_type:
return json.dumps(make_common_res(6, '缺少查询时间类型, 请刷新后重试'))
if date_type not in ['day', 'week', 'workday', 'weekend']:
if date_type not in ['day', 'week', 'workday', 'weekend', 'month']:
return json.dumps(make_common_res(7, '查询时间类型异常, 请刷新后重试'))
special_time_range = check_param(params, 'special_time_range')
if not special_time_range or special_time_range == '00:00-23:59':
@ -129,7 +130,7 @@ def query_monitor_data_trend(params):
date_type = check_param(params, 'date_type')
if not date_type:
return json.dumps(make_common_res(6, '缺少查询时间类型, 请刷新后重试'))
if date_type not in ['day', 'week', 'workday', 'weekend']:
if date_type not in ['day', 'week', 'workday', 'weekend', 'month']:
return json.dumps(make_common_res(7, '查询时间类型异常, 请刷新后重试'))
query_date = check_param(params, 'query_date')
if not query_date:
@ -165,7 +166,7 @@ def query_cross_tp_data_trend(params):
date_type = check_param(params, 'date_type')
if not date_type:
return json.dumps(make_common_res(6, '缺少查询时间类型, 请刷新后重试'))
if date_type not in ['day', 'week', 'workday', 'weekend']:
if date_type not in ['day', 'week', 'workday', 'weekend', 'month']:
return json.dumps(make_common_res(7, '查询时间类型异常, 请刷新后重试'))
weekdays = check_param(params, 'weekdays')
if not weekdays:
@ -192,7 +193,7 @@ def query_cross_tp_data_trend(params):
cross_static_info, cross_ledger_info = gen_cross_static_info(crossid, nodeid, area_id, cross_ledger_info_dict)
roads_dir_dict = gen_road_dir_dict(cross_ledger_info)
if date_type == 'day':
if date_type in ('day', 'month'):
month_ago_date = (datetime.now().date() - timedelta(days=30)).strftime('%Y%m%d')
now_prev_date = (datetime.now().date() - timedelta(days=1)).strftime('%Y%m%d')
date_list = generate_date_range(month_ago_date, now_prev_date)
@ -244,7 +245,7 @@ def query_monitor_problems(params):
date_type = check_param(params, 'date_type')
if not date_type:
return json.dumps(make_common_res(6, '缺少查询时间类型, 请刷新后重试'))
if date_type not in ['day', 'week', 'workday', 'weekend']:
if date_type not in ['day', 'week', 'workday', 'weekend', 'month']:
return json.dumps(make_common_res(7, '查询时间类型异常, 请刷新后重试'))
special_time_range = check_param(params, 'special_time_range')
if not special_time_range or special_time_range == '00:00-23:59':

View File

@ -67,11 +67,14 @@ class CrossDbHelper(TableDbHelperBase):
def query_monitor_task_dates(self, nodeid, area_id):
day_sql = f"select distinct day from traffic_{nodeid}.cross_inspect where citycode = {nodeid} and area_id = {area_id} and type = 'day'"
week_sql = f"select distinct day from traffic_{nodeid}.cross_inspect where citycode = {nodeid} and area_id = {area_id} and type = 'week'"
month_sql = f"select distinct day from traffic_{nodeid}.cross_inspect where citycode = {nodeid} and area_id = {area_id} and type = 'month'"
day_date_list = self.do_select(day_sql)
week_date_list = self.do_select(week_sql)
month_date_list = self.do_select(month_sql)
day_list = [item['day'] for item in day_date_list]
week_list = [item['day'] for item in week_date_list]
return day_list, week_list
month_list = [item['day'] for item in month_date_list]
return day_list, week_list, month_list
def query_monitor_data_sql(self, nodeid, area_id, date_type, query_date):
if date_type in ['week', 'workday', 'weekend']:
@ -86,6 +89,9 @@ class CrossDbHelper(TableDbHelperBase):
if date_type in ['week', 'workday', 'weekend']:
date_type = 'week'
limit_num = 10
elif date_type == 'month':
date_type = 'month'
limit_num = 12
sql = f"""
select * from traffic_{nodeid}.cross_inspect where area_id = {area_id} and type = '{date_type}' order by day desc limit {limit_num}
"""

View File

@ -6,6 +6,7 @@ from app.models import *
from app.models_wave import RoadLinkManager
from app.phase_db_func import PhaseTableDbHelper
from app.task_db_func import TaskDbHelper
from app.wave_db_func import WaveDBFunction
from app.workstation_db_function import WorkstationDbHelper
from tool.mysql_common_connector_pool import *
from app.user_db_func import *
@ -69,6 +70,14 @@ g_task_db = {
'db': 'task'
}
g_wave_tool_db = {
'host': '43.140.225.219',
'port': 3306,
'user': 'root',
'password': 'pmenJIn7EaK40oThn~~',
'db': 'greenwave'
}
# 服务配置
g_citycode_set = set()
g_config = {}
@ -89,6 +98,7 @@ g_cloud_pool = DatabaseManager(g_cloud_db)
g_cross_delay_pool = DatabaseManager(g_cross_delay_db)
g_user_pool = DatabaseManager(g_user_db)
g_task_pool = DatabaseManager(g_task_db)
g_wave_db = DatabaseManager(g_wave_tool_db)
# 全局的数据库对象
@ -102,6 +112,7 @@ db_task = TaskDbHelper(g_task_pool)
db_phasetable = PhaseTableDbHelper(g_db_pool)
db_tmnet = TmnetDbHelper(g_roadnet_pool)
db_workstation = WorkstationDbHelper(g_db_pool)
db_wave = WaveDBFunction(g_wave_db)
nodeid_list = []

View File

@ -1360,8 +1360,10 @@ def phase_tp_check_problems(routing_crosses, special_time_range, first_date, dat
6: '自动结束'
}
min_date = first_date
if date_type != 'day':
if date_type in ('week', 'workday', 'weekend'):
min_date = (datetime.strptime(first_date, "%Y%m%d") - timedelta(days=7)).strftime("%Y%m%d")
elif date_type == 'month':
min_date = datetime.strptime(first_date, '%Y%m%d').replace(day=1)
cross_examine_records = db_cross.query_crosses_examine_records(crossid_list, first_date, min_date)
for row in cross_examine_records:
start_hm = row['start_hm']
@ -1438,7 +1440,7 @@ def phase_tp_check_problems(routing_crosses, special_time_range, first_date, dat
# 配时方案相关异常
def monitor_phase_problems(nodeid, area_id, date_type, query_date, special_time_range, shield_info, routing_crosses, filter_shield):
weekdays = str(datetime.strptime(query_date, "%Y%m%d").weekday() + 1)
if date_type == 'week':
if date_type in ('week', 'month'):
weekdays = '1,2,3,4,5,6,7'
elif date_type == 'workday':
weekdays = '1,2,3,4,5'

View File

@ -175,13 +175,13 @@ class TaskDbHelper(TableDbHelperBase):
return count
def add_task(self, timestamp, creatorid, task_name, task_type, data_type, plan_begin_time, plan_end_time,publish_time,
executor, progress, task_state, description, crossids, sectionids, arteryids, comment,
executor, progress, task_state, description, crossids, waveid, wave_name, comment,
record_state, task_src, task_class, nodeid, area_id, task_type_class, full_review):
sql_insert = "insert into task (timestamp, creatorid, task_name, task_type, data_type, plan_begin_time, plan_end_time,publish_time," \
" executor, progress, task_state, description, crossids, sectionids, arteryids, comment," \
" executor, progress, task_state, description, crossids, waveid, wave_name, comment," \
" record_state, task_src, task_class, nodeid, area_id, task_type_class, full_review) values('%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s', %s, %s, %s)" % (
timestamp, creatorid, task_name, task_type, data_type, plan_begin_time, plan_end_time,publish_time,
executor, progress, task_state, description, crossids, sectionids, arteryids, comment,
executor, progress, task_state, description, crossids, waveid, wave_name, comment,
record_state, task_src, task_class, nodeid, area_id, task_type_class, full_review)
count = self.do_execute(sql_insert)
@ -257,6 +257,34 @@ class TaskDbHelper(TableDbHelperBase):
""" % task_no
return self.do_select(sql)
def query_ledger_task_crosses_info_by_area_id(self, nodeid, area_id):
sql = """
select * from ledger_task_detail where nodeid = %s and area_id = %s
""" % (nodeid, area_id)
res = self.do_select(sql)
task_info_dict = {}
for row in res:
task_no = row['task_no']
if task_no not in task_info_dict.keys():
task_info_dict[task_no] = []
task_info_dict[task_no].append(row)
task_res_dict = {}
for task_no in task_info_dict.keys():
task_cross_num = len(task_info_dict[task_no])
entered_cross_num = len([cross for cross in task_info_dict[task_no] if cross['ledger_status'] == 2 and cross['phase_status'] in (1, 3)])
approve_cross_num = len([cross for cross in task_info_dict[task_no] if cross['submit_status'] == 2])
entered_percent = int(entered_cross_num * 100 / task_cross_num) if task_cross_num > 0 else 0
approve_percent = int(approve_cross_num * 100 / task_cross_num) if task_cross_num > 0 else 0
task_res_dict[task_no] = {
'task_no': task_no,
'task_cross_num': task_cross_num,
'entered_cross_num': entered_cross_num,
'entered_percent': entered_percent,
'approve_cross_num': approve_cross_num,
'approve_percent': approve_percent
}
return task_res_dict
def query_ledger_task_crosses_pics(self, crossid_list):
crossids = "'" + "', '".join(item for item in crossid_list) + "'"
sql = """
@ -358,6 +386,22 @@ class TaskDbHelper(TableDbHelperBase):
""" % taskno
return self.do_select(sql)
def query_cross_entering_status(self, crossid_list):
crossids = "'" + "', '".join(item for item in crossid_list) + "'"
sql = """
select crossid, ledger_status, phase_status, update_time from ledger_task_detail where crossid in (%s) order by update_time asc
""" % crossids
return self.do_select(sql)
def query_greenwave_task_requirement_validation_info_sql(self, task_no, nodeid, area_id):
sql = """
select * from greenwave_task_detail where task_no = %s and nodeid = %s and area_id = %s
""" % (task_no, nodeid, area_id)
res = self.do_select(sql)
if res:
return res[0]['requirement_validation']
return None
#
# if __name__ == '__main__':
# tt_5min = get_latest_5min_timestamp()

View File

@ -91,14 +91,7 @@ def do_query_task_list(params):
if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list:
return json.dumps(make_common_res(5, '辖区id异常请检查后重试'))
#任务名称`task_name` varchar(300) DEFAULT NULL COMMENT '任务名称',
#任务类型`task_type` varchar(300) DEFAULT NULL COMMENT '任务类型',
#任务等级`task_class` varchar(300) DEFAULT NULL COMMENT '需求等级';
#计划开始时间`plan_begin_time` bigint DEFAULT NULL COMMENT '计划开始时间',
#计划结束时间`plan_end_time` bigint DEFAULT NULL COMMENT '计划结束时间',
#任务状态`task_state` int NOT NULL COMMENT '任务状态 0未开始1进行中2完成3挂起4废止',
#负责人`executor` varchar(300) DEFAULT NULL COMMENT '负责人',
#需求来源 `task_src` varchar(300) DEFAULT NULL COMMENT '需求来源';
all_task_list = db_task.query_all_tak_list(nodeid, area_id)
upload_file_dict = db_task.query_task_file(nodeid, area_id)
for task_info in all_task_list:
@ -137,52 +130,47 @@ def do_query_task_list(params):
for i in range(len(task_name)):
if any('\u4e00' <= c <= '\u9fff' for c in task_info['task_name']) and task_name[i] == list(task_name_pinyin)[i][0] and task_info not in name_list:
name_list.append(task_info)
filter_conditions = []
if task_type and len(task_type) > 0:
type_list.extend([task_info for task_info in all_task_list if int(task_type) == task_info['task_type']])
if len(type_list) == 0:
# 用于多种筛选条件取交集使用
type_list.append({'arteryids': -1})
filter_conditions.append((True, type_list))
if task_class and len(task_class) > 0:
class_list.extend([task_info for task_info in all_task_list if int(task_class) == task_info['task_class']])
if len(class_list) == 0:
# 用于多种筛选条件取交集使用
class_list.append({'arteryids': -1})
filter_conditions.append((True, class_list))
if plan_begin_time and len(plan_begin_time) > 0:
begin_list.extend([task_info for task_info in all_task_list if int(plan_begin_time) <= task_info['plan_begin_time']])
if len(begin_list) == 0:
# 用于多种筛选条件取交集使用
begin_list.append({'arteryids': -1})
filter_conditions.append((True, begin_list))
if plan_end_time and len(plan_end_time) > 0:
end_list.extend([task_info for task_info in all_task_list if int(plan_end_time) >= task_info['plan_end_time']])
if len(end_list) == 0:
# 用于多种筛选条件取交集使用
end_list.append({'arteryids': -1})
filter_conditions.append((True, end_list))
if task_state and len(task_state) > 0:
state_list.extend([task_info for task_info in all_task_list if int(task_state) == task_info['task_state']])
if len(state_list) == 0:
# 用于多种筛选条件取交集使用
state_list.append({'arteryids': -1})
filter_conditions.append((True, state_list))
if executor and len(executor) > 0:
executor_list.extend([task_info for task_info in all_task_list if executor == task_info['executor']])
if len(executor_list) == 0:
# 用于多种筛选条件取交集使用
executor_list.append({'arteryids': -1})
filter_conditions.append((True, executor_list))
if task_src and len(task_src) > 0:
task_src_list.extend([task_info for task_info in all_task_list if task_src == task_info['task_src']])
if len(task_src_list) == 0:
# 用于多种筛选条件取交集使用
task_src_list.append({'arteryids': -1})
non_empty_lists = [lst for lst in [name_list, type_list, class_list, begin_list, end_list, state_list, executor_list, task_src_list] if len(lst) > 0]
if non_empty_lists:
res_list = intersect_dicts(non_empty_lists)
else:
filter_conditions.append((True, task_src_list))
has_empty_filter = any(has_condition and len(lst) == 0 for has_condition, lst in filter_conditions)
if has_empty_filter:
res_list = []
else:
non_empty_lists = [lst for lst in [name_list, type_list, class_list, begin_list, end_list, state_list, executor_list, task_src_list] if len(lst) > 0]
if non_empty_lists:
res_list = intersect_dicts(non_empty_lists)
else:
res_list = []
if task_no and len(task_no) > 0:
res_list = [task_info for task_info in all_task_list if int(task_no) == task_info['taskno']]
if not task_name and not task_type and not task_class and not plan_begin_time and not plan_end_time and not task_state and not executor and not task_src and not task_no:
res_list = all_task_list
# task_list = db_task.query_task_list(task_name,task_type,data_type, task_class,plan_begin_time,plan_end_time,publish_time,task_state,executor,task_src, nodeid)
filtered_list = [d for d in res_list if d['arteryids'] != -1]
task_entering_info_dict = db_task.query_ledger_task_crosses_info_by_area_id(nodeid, area_id)
filtered_list = res_list
for task_info in filtered_list:
comment = task_info['comment']
if '_split_suffix_for_query_info' in comment:
@ -193,7 +181,7 @@ def do_query_task_list(params):
task_info['comment'] = comment_list
if task_info['task_type_class'] == 1:
item_task_no = task_info['taskno']
ledger_task_add_info = query_ledger_task_additional_info(item_task_no, nodeid, area_id)
ledger_task_add_info = task_entering_info_dict[item_task_no]
task_info['entered_percent'] = 100 if task_info['task_state'] == 4 else ledger_task_add_info['entered_percent']
task_info['approve_percent'] = 100 if task_info['task_state'] == 4 else ledger_task_add_info['approve_percent']
sorted_list = sorted(filtered_list, key=sort_key)
@ -483,15 +471,16 @@ def do_add_task(params):
data_type = check_param(params, 'data_type')
if not data_type:
return json.dumps(make_common_res(2, '关联路网状态缺失,请检查后重试'))
crossids, arteryids, sectionids = '', '', ''
crossids, waveid, wave_name = '', '', ''
if int(data_type) == 1:
crossids = check_param(params, 'crossids')
if not crossids:
return json.dumps(make_common_res(2, '当前所选关联路网状态为:关联路口,但关联路口信息缺失,请检查后重试'))
elif int(data_type) == 2:
arteryids = check_param(params, 'arteryids')
if not arteryids:
return json.dumps(make_common_res(2, '当前所选关联路网状态为:关联干线,但关联干线信息缺失,请检查后重试'))
elif int(data_type) == 3:
waveid = check_param(params, 'waveid')
wave_name = check_param(params, 'wave_name')
if not waveid or not wave_name:
return json.dumps(make_common_res(2, '当前所选关联路网状态为:关联绿波,但关联绿波信息缺失,请检查后重试'))
add_type = check_param(params, 'add_type')
if not add_type or add_type == 'normal':
plan_begin_time = check_param(params, 'plan_begin_time')
@ -540,7 +529,7 @@ def do_add_task(params):
if add_type and add_type != 'normal':
task_state = 0
count = db_task.add_task(timestamp, creatorid, task_name, task_type, data_type, plan_begin_time, plan_end_time, publish_time,
executor, progress, task_state, description, crossids, sectionids, arteryids, comment,
executor, progress, task_state, description, crossids, waveid, wave_name, comment,
record_task_state, task_src, task_class, nodeid, area_id, task_type_class, full_review)
if count != 1:
logging.error(str(params) + ' 添加任务报错!')
@ -575,6 +564,16 @@ def do_add_task(params):
if e:
logging.error(e)
return json.dumps(make_common_res(2, '台账任务创建成功,但路口列表配时方案状态更新失败,请反馈该情况至管理员!'))
elif task_type_class == 3:
if full_review != 1:
requirement_validation_json = check_param(params, 'requirement_validation_json')
if not requirement_validation_json:
return json.dumps(make_common_res(2, '缺少需求核验单信息,请检查后重试'))
predict_issue_time = check_param(params, 'predict_issue_time')
if not predict_issue_time:
return json.dumps(make_common_res(2, '计划下发时间信息,请检查后重试'))
pass
res = make_common_res(0, 'ok')
res['nodeid'] = nodeid
@ -966,9 +965,13 @@ def do_query_task_detail(params):
if task['task_type_class'] == 1:
ledger_task_additional_info = query_ledger_task_additional_info(taskno, nodeid, area_id)
task['ledger_task_additional_info'] = ledger_task_additional_info
if task['task_state'] == 4:
task['ledger_task_additional_info']['entered_percent'] = 100
task['ledger_task_additional_info']['approve_percent'] = 100
if task['task_state'] == 4:
task['ledger_task_additional_info']['entered_percent'] = 100
task['ledger_task_additional_info']['approve_percent'] = 100
elif task['task_type_class'] == 3:
# 绿波优化任务
greenwave_task_additional_info = gen_greenwave_task_additional_info(taskno, nodeid, area_id)
pass
task.pop('update_time')
res['desc'] = ''
res['data'] = task
@ -1013,10 +1016,11 @@ def gen_update_sql(params, task_old_info):
crossids = check_param(params, 'crossids')
if crossids and crossids != '' and crossids != task_old_info['crossids']:
modify_data += f", crossids = '{crossids}'"
elif int(data_type) == 2:
arteryids = check_param(params, 'arteryids')
if arteryids and arteryids != '' and arteryids != task_old_info['arteryids']:
modify_data += f", arteryids = '{arteryids}'"
elif int(data_type) == 3:
waveid = check_param(params, 'waveid')
wave_name = check_param(params, 'wave_name')
if (waveid and waveid != '' and waveid != task_old_info['waveid']) or (wave_name and wave_name != '' and wave_name != task_old_info['wave_name']):
modify_data += f", waveid = '{waveid}', wave_name = '{wave_name}'"
modify_item += prefix
modify_item += f"【编辑】关联路网信息:{task_old_info['data_type']} -> {data_type}"
executor = check_param(params, 'executor')
@ -1337,3 +1341,157 @@ def get_year_week(time_input):
# 使用 ISO 标准获取年周
year, week, _ = time_input.isocalendar()
return f"{year}-{week:02d}"
# 下方为绿波优化任务工作流相关代码 20260317
def verify_cross_entering_status(params):
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少nodeid 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少area_id 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少userid 请刷新后重试'))
area_list = db_user.query_areaid_list(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = map(int, area_list)
if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list:
return json.dumps(make_common_res(5, 'area_id异常请检查后重试'))
crossid_list = check_param(params, 'crossid_list')
if not crossid_list or len(crossid_list) < 1:
return json.dumps(make_common_res(6, '缺少crossid_list 请刷新后重试'))
cross_enter_status = db_task.query_cross_entering_status(crossid_list)
cross_status_dict = {}
for row in cross_enter_status:
cross_status_dict[row['crossid']] = {
'ledger_status': row['ledger_status'],
'phase_status': row['phase_status']
}
verify_res = True
for crossid in crossid_list:
if crossid not in cross_status_dict.keys():
verify_res = False
break
if cross_status_dict[crossid]['ledger_status'] != 1 or cross_status_dict[crossid]['phase_status'] not in [1, 3]:
verify_res = False
break
if verify_res:
return json.dumps(make_common_res(0, 'ok'))
else:
return json.dumps(make_common_res(1, '验证未通过'))
def query_usable_wave_list(params):
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少nodeid 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少area_id 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少userid 请刷新后重试'))
area_list = db_user.query_areaid_list(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = map(int, area_list)
if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list:
return json.dumps(make_common_res(5, 'area_id异常请检查后重试'))
area_cross_list = db_tmnet.query_cross_list_sql(nodeid, area_id)
area_crossid_list = [row['crossid'] for row in area_cross_list]
node_wave_cross_dict = db_wave.query_node_wave(nodeid)
usable_wave_list = []
for waveid in node_wave_cross_dict.keys():
wave_info = node_wave_cross_dict[waveid]
cross_list = wave_info['cross_list']
if set(cross_list) & set(area_crossid_list):
usable_wave_list.append(wave_info)
res = make_common_res(0, 'ok')
res['data'] = {
'wave_list': usable_wave_list,
'cross_list': area_cross_list
}
return json.dumps(res)
def query_wave_task_requirement_validation_detail(params):
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少nodeid 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少area_id 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少userid 请刷新后重试'))
area_list = db_user.query_areaid_list(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = map(int, area_list)
if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list:
return json.dumps(make_common_res(5, 'area_id异常请检查后重试'))
task_no = check_param(params, 'task_no')
if not task_no:
return json.dumps(make_common_res(6, '缺少任务编号, 请刷新后重试'))
requirement_validation_res = db_task.query_greenwave_task_requirement_validation_info_sql(task_no, nodeid, area_id)
if not requirement_validation_res:
return json.dumps(make_common_res(7, '需求确认单信息异常'))
res = make_common_res(0, 'ok')
res['data'] = requirement_validation_res
return json.dumps(res)
def query_wave_task_params(params):
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少nodeid 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少area_id 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少userid 请刷新后重试'))
area_list = db_user.query_areaid_list(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = map(int, area_list)
if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list:
return json.dumps(make_common_res(5, 'area_id异常请检查后重试'))
waveid = check_param(params, 'waveid')
if not waveid:
waveid = ''
wave_name = check_param(params, 'wave_name')
if not wave_name:
wave_name = ''
wave_crosses = check_param(params, 'wave_crosses')
if not wave_crosses or len(wave_crosses) < 1:
return json.dumps(make_common_res(8, '缺少wave_crosses 请刷新后重试'))
srcDir = check_param(params, 'srcDir')
if not srcDir:
srcDir = ''
# 需要查明 路口名称、信号机品牌、 最大相位数量、绿波时段信息、配时方案信息
wave_tp_info = []
if waveid != '' and srcDir != '':
wave_tp_info = db_wave.query_wave_tp_infos(waveid, srcDir)
wave_crosses_infos = db_tmnet.query_cross_infos(wave_crosses)
wave_cross_info_dict = {row['crossid']: row for row in wave_crosses_infos}
phase_infos, e = None, None
for crossid in wave_cross_info_dict.keys():
pass
def gen_greenwave_task_additional_info(taskno, nodeid, area_id):
# additional_info =
pass

View File

@ -675,9 +675,10 @@ class TmnetDbHelper(TableDbHelperBase):
t1.crossid,
if (t2.location is not null, t2.location, t1.location) as location,
t1.nodeid,
t1.area_id
from (select name,crossid, location,nodeid, area_id from `cross` where crossid in (%s) and at_edge=0 and isdeleted=0 ) as t1
left join (select name,crossid, location,nodeid, area_id from `cross_ledger_update_info` where crossid in (%s) and at_edge=0 and isdeleted=0 ) as t2 on t1.crossid=t2.crossid
t1.area_id,
if(t2.slc_company is not null, t2.slc_company, t1.slc_company) as slc_company
from (select name,crossid, location,nodeid, area_id, slc_company from `cross` where crossid in (%s) and at_edge=0 and isdeleted=0 ) as t1
left join (select name,crossid, location,nodeid, area_id, slc_company from `cross_ledger_update_info` where crossid in (%s) and at_edge=0 and isdeleted=0 ) as t2 on t1.crossid=t2.crossid
""" % (crossids, crossids)
cross_list = self.do_select(sql)
virtual_cross_sql = f'select name, crossid, location, nodeid, area_id from user_defined_cross where crossid in ({crossids})'

100
app/wave_db_func.py Normal file
View File

@ -0,0 +1,100 @@
# -*- coding: utf-8 -*-
# @Author: Owl
# @Date: 2026/3/17 12:01
# @Description:
from app.db_func_base import *
class WaveDBFunction(TableDbHelperBase):
def __init__(self, pool):
self.db_pool = pool
self.DB_Name = 'greenwave'
def query_node_wave(self, nodeid):
sql = """
select t1.name as wave_name, t1.waveid, t2.crossid, t1.service_status, t1.srcDir, t1.first_left, t1.last_left from wave_cross t2
inner join
(select name, waveid, service_status, srcDir, first_left, last_left from greenwave where orgid = %s and service_status = 1 and status = 0) t1
on t1.waveid = t2.waveid
""" % nodeid
res = self.do_select(sql)
wave_cross_dict = {}
for wave_name, waveid, crossid, service_status, srcDir, first_left, last_left in res:
left_coor = 0 if first_left == 0 and last_left == 0 else 1
if waveid not in wave_cross_dict:
wave_cross_dict[waveid] = {
'wave_name': wave_name,
'waveid': waveid,
'src_dir': srcDir,
'left_coor': left_coor,
'service_status': service_status,
'cross_list': [crossid]
}
else:
wave_cross_dict[waveid]['cross_list'].append(crossid)
return wave_cross_dict
def query_wave_tp_infos(self, waveid, srcDir):
sql = """
select type, weekday, tp_start, tp_end from wave_tps where waveid = '%s'
""" % waveid
res = self.do_select(sql)
wave_tp_infos = []
for type, weekday, tp_start, tp_end in res:
type_str = gen_type_str(type, srcDir)
weekday_str = gen_weekday_str(weekday)
wave_tp_infos.append({
'type_str': type_str,
'weekday_str': weekday_str,
'type': type,
'weekday': weekday,
'tp_start': tp_start,
'tp_end': tp_end
})
return wave_tp_infos
sir2Str = {
'S': ['南向北', '北向南'],
'E': ['东向西', '西向东'],
'W': ['西向东', '东向西'],
'N': ['北向南', '南向北'],
'NE': ['东北向西南', '西南向东北'],
'NW': ['西北向东南', '东南向西北'],
'SE': ['东南向西北', '西北向东南'],
'SW': ['西南向东北', '东北向西南']
}
weekday2Str = {
1: '周一',
2: '周二',
3: '周三',
4: '周四',
5: '周五',
6: '周六',
7: '周日'
}
def gen_type_str(type_int, srcDir):
type_str = '双向'
if type_int == 0:
type_str = sir2Str[srcDir][0]
elif type_int == 1:
type_str = sir2Str[srcDir][1]
return type_str
def gen_weekday_str(weekday):
if weekday == '1,2,3,4,5':
weekday_str = '工作日'
elif weekday == '6,7':
weekday_str = '节假日'
elif weekday == '1,2,3,4,5,6,7':
weekday_str = '全周'
else:
weekday_list = weekday.split(',')
weekday_str = weekday2Str[int(weekday_list[0])]
for i in range(1, len(weekday_list)):
weekday_str += '' + weekday2Str[int(weekday_list[i])]
return weekday_str

View File

@ -67,6 +67,13 @@ user = root
password = pmenJIn7EaK40oThn~~~
dbname = tmnet
[wave_db]
host = 43.140.225.219
host_inner = 172.21.32.10
user = root
password = pmenJIn7EaK40oThn~~
db_name = greenwave
[executive]
userid = 15836903493,18518481331