# -*- coding: utf-8 -*- # @Author: Owl # @Date: 2026/3/17 12:01 # @Description: import json from app.db_func_base import * class WaveDBFunction(TableDbHelperBase): def __init__(self, pool): self.db_pool = pool self.DB_Name = 'greenwave' def query_node_wave(self, nodeid): sql = """ select t1.name as wave_name, t1.waveid, t2.crossid, t1.service_status, t1.srcDir, t1.first_left, t1.last_left from wave_cross t2 inner join (select name, waveid, service_status, srcDir, first_left, last_left from greenwave where orgid = %s and service_status = 1 and status = 0) t1 on t1.waveid = t2.waveid """ % nodeid res = self.do_select(sql) wave_cross_dict = {} for row in res: wave_name, waveid, crossid, service_status, srcDir, first_left, last_left = row['wave_name'], row['waveid'], row['crossid'], row['service_status'], row['srcDir'], row['first_left'], row['last_left'] left_coor = 0 if first_left == 0 and last_left == 0 else 1 if waveid not in wave_cross_dict: wave_cross_dict[waveid] = { 'wave_name': wave_name, 'waveid': waveid, 'src_dir': srcDir, 'left_coor': left_coor, 'service_status': service_status, 'cross_list': [crossid] } else: wave_cross_dict[waveid]['cross_list'].append(crossid) return wave_cross_dict def query_wave_tp_infos(self, waveid, srcDir): sql = """ select id, type, weekday, tp_start, tp_end from wave_tps where waveid = '%s' """ % waveid res = self.do_select(sql) wave_tp_infos = [] for row in res: type_str = gen_type_str(type, srcDir) weekday_str = gen_weekday_str(row['weekday']) wave_tp_infos.append({ 'wave_tp_id': row['id'], 'type_str': type_str, 'weekday_str': weekday_str, 'type': row['type'], 'weekday': row['weekday'], 'tp_start': row['tp_start'], 'tp_end': row['tp_end'] }) return wave_tp_infos def query_wave_tp_optimize_records(self, waveid): sql = """ select * from wave_plan_params where waveid = '%s' order by create_time asc """ % waveid res = self.do_select(sql) tp_info_dict = {} for row in res: create_time = row['create_time'].strftime('%Y-%m-%d %H:%M:%S') base_params = row['base_params'] tp_info = json.loads(base_params)['formState']['tp'] tp_info_dict[tp_info] = { 'create_time': create_time, 'comment': row['comment'] } return tp_info_dict def query_wave_info(self, waveid): sql = """ select * from greenwave where waveid = '%s' """ % waveid res = self.do_select(sql) if res: return res[0] return None def query_wave_crosses(self, waveid): sql = """ select crossid from wave_cross where waveid = '%s' """ % waveid res = self.do_select(sql) cross_list = [item['crossid'] for item in res] return cross_list def open_wave_tp_monitor_status(self, wave_tp_id_list): wave_tp_ids = ','.join(wave_tp_id_list) check_sql = """ select is_polling from wave_tps where id in (%s) and is_polling = 0 """ % wave_tp_ids res = self.do_select(check_sql) need_update_num = len(res) sql = """ update wave_tps set is_polling = 1 where id in (%s) """ % wave_tp_ids ret = self.do_execute(sql) if ret == need_update_num: return True else: return False def open_wave_monitor_status(self, waveid, monitor_job_id): check_sql = """ select waveid from job_waves where jobid = '%s' and waveid = '%s' """ % (monitor_job_id, waveid) res = self.do_select(check_sql) if res: return True else: insert_sql = """ insert into job_waves (waveid, jobid, is_polling) value ('%s', '%s', 0) """ ret = self.do_execute(insert_sql) return ret == 1 def query_usable_monitor_job_list(self, nodeid): sql = """ select * from greenwave.wave_supervision where orgid = %s and closed = 0 """ % nodeid return self.do_select(sql) sir2Str = { 'S': ['南向北', '北向南'], 'E': ['东向西', '西向东'], 'W': ['西向东', '东向西'], 'N': ['北向南', '南向北'], 'NE': ['东北向西南', '西南向东北'], 'NW': ['西北向东南', '东南向西北'], 'SE': ['东南向西北', '西北向东南'], 'SW': ['西南向东北', '东北向西南'] } weekday2Str = { 1: '周一', 2: '周二', 3: '周三', 4: '周四', 5: '周五', 6: '周六', 7: '周日' } def gen_type_str(type_int, srcDir): type_str = '双向' if type_int == 0: type_str = sir2Str[srcDir][0] elif type_int == 1: type_str = sir2Str[srcDir][1] return type_str def gen_weekday_str(weekday): if weekday == '1,2,3,4,5': weekday_str = '工作日' elif weekday == '6,7': weekday_str = '节假日' elif weekday == '1,2,3,4,5,6,7': weekday_str = '全周' else: weekday_list = weekday.split(',') weekday_str = weekday2Str[int(weekday_list[0])] for i in range(1, len(weekday_list)): weekday_str += '、' + weekday2Str[int(weekday_list[i])] return weekday_str