cross_doctor/app/wave_db_func.py

174 lines
5.7 KiB
Python

# -*- coding: utf-8 -*-
# @Author: Owl
# @Date: 2026/3/17 12:01
# @Description:
import json
from app.db_func_base import *
class WaveDBFunction(TableDbHelperBase):
def __init__(self, pool):
self.db_pool = pool
self.DB_Name = 'greenwave'
def query_node_wave(self, nodeid):
sql = """
select t1.name as wave_name, t1.waveid, t2.crossid, t1.service_status, t1.srcDir, t1.first_left, t1.last_left from wave_cross t2
inner join
(select name, waveid, service_status, srcDir, first_left, last_left from greenwave where orgid = %s and service_status = 1 and status = 0) t1
on t1.waveid = t2.waveid
""" % nodeid
res = self.do_select(sql)
wave_cross_dict = {}
for row in res:
wave_name, waveid, crossid, service_status, srcDir, first_left, last_left = row['wave_name'], row['waveid'], row['crossid'], row['service_status'], row['srcDir'], row['first_left'], row['last_left']
left_coor = 0 if first_left == 0 and last_left == 0 else 1
if waveid not in wave_cross_dict:
wave_cross_dict[waveid] = {
'wave_name': wave_name,
'waveid': waveid,
'src_dir': srcDir,
'left_coor': left_coor,
'service_status': service_status,
'cross_list': [crossid]
}
else:
wave_cross_dict[waveid]['cross_list'].append(crossid)
return wave_cross_dict
def query_wave_tp_infos(self, waveid, srcDir):
sql = """
select id, type, weekday, tp_start, tp_end from wave_tps where waveid = '%s'
""" % waveid
res = self.do_select(sql)
wave_tp_infos = []
for row in res:
type_str = gen_type_str(type, srcDir)
weekday_str = gen_weekday_str(row['weekday'])
wave_tp_infos.append({
'wave_tp_id': row['id'],
'type_str': type_str,
'weekday_str': weekday_str,
'type': row['type'],
'weekday': row['weekday'],
'tp_start': row['tp_start'],
'tp_end': row['tp_end']
})
return wave_tp_infos
def query_wave_tp_optimize_records(self, waveid):
sql = """
select * from wave_plan_params where waveid = '%s' order by create_time asc
""" % waveid
res = self.do_select(sql)
tp_info_dict = {}
for row in res:
create_time = row['create_time'].strftime('%Y-%m-%d %H:%M:%S')
base_params = row['base_params']
tp_info = json.loads(base_params)['formState']['tp']
tp_info_dict[tp_info] = {
'create_time': create_time,
'comment': row['comment']
}
return tp_info_dict
def query_wave_info(self, waveid):
sql = """
select * from greenwave where waveid = '%s'
""" % waveid
res = self.do_select(sql)
if res:
return res[0]
return None
def query_wave_crosses(self, waveid):
sql = """
select crossid from wave_cross where waveid = '%s'
""" % waveid
res = self.do_select(sql)
cross_list = [item['crossid'] for item in res]
return cross_list
def open_wave_tp_monitor_status(self, wave_tp_id_list):
wave_tp_ids = ','.join(wave_tp_id_list)
check_sql = """
select is_polling from wave_tps where id in (%s) and is_polling = 0
""" % wave_tp_ids
res = self.do_select(check_sql)
need_update_num = len(res)
sql = """
update wave_tps set is_polling = 1 where id in (%s)
""" % wave_tp_ids
ret = self.do_execute(sql)
if ret == need_update_num:
return True
else:
return False
def open_wave_monitor_status(self, waveid, monitor_job_id):
check_sql = """
select waveid from job_waves where jobid = '%s' and waveid = '%s'
""" % (monitor_job_id, waveid)
res = self.do_select(check_sql)
if res:
return True
else:
insert_sql = """
insert into job_waves (waveid, jobid, is_polling) value ('%s', '%s', 0)
"""
ret = self.do_execute(insert_sql)
return ret == 1
def query_usable_monitor_job_list(self, nodeid):
sql = """
select * from greenwave.wave_supervision where orgid = %s and closed = 0
""" % nodeid
return self.do_select(sql)
sir2Str = {
'S': ['南向北', '北向南'],
'E': ['东向西', '西向东'],
'W': ['西向东', '东向西'],
'N': ['北向南', '南向北'],
'NE': ['东北向西南', '西南向东北'],
'NW': ['西北向东南', '东南向西北'],
'SE': ['东南向西北', '西北向东南'],
'SW': ['西南向东北', '东北向西南']
}
weekday2Str = {
1: '周一',
2: '周二',
3: '周三',
4: '周四',
5: '周五',
6: '周六',
7: '周日'
}
def gen_type_str(type_int, srcDir):
type_str = '双向'
if type_int == 0:
type_str = sir2Str[srcDir][0]
elif type_int == 1:
type_str = sir2Str[srcDir][1]
return type_str
def gen_weekday_str(weekday):
if weekday == '1,2,3,4,5':
weekday_str = '工作日'
elif weekday == '6,7':
weekday_str = '节假日'
elif weekday == '1,2,3,4,5,6,7':
weekday_str = '全周'
else:
weekday_list = weekday.split(',')
weekday_str = weekday2Str[int(weekday_list[0])]
for i in range(1, len(weekday_list)):
weekday_str += '' + weekday2Str[int(weekday_list[i])]
return weekday_str