cross_doctor/daily_cross_pahse_problems.py

279 lines
10 KiB
Python
Raw Permalink Normal View History

# -*- coding: utf-8 -*-
# @Author: Owl
# @Date: 2025/10/28 16:25
# @Description:
import argparse
# from app.gw_db_func import GreenWaveDbHelper
from app.gwperf_cloud_db_func import *
from app.eva_common import *
from tool.mysql_common_connector_pool import DatabaseManager
g_cloud_db = {
'host': 'bj-cdb-64eqw2oe.sql.tencentcdb.com',
'port': 26807,
'user': 'root',
'password': 'pmenJIn7EaK40oThn',
'db': 'gwperf'
}
g_dbinfo = {
'host': '120.53.125.169',
'port': 3306,
'user': 'root',
'password': 'pmenJIn7EaK40oThn~~~',
'db': 'cross_doctor_matedata'
}
src_str2_eng = {
"": "E",
"": "S",
"西": "W",
"": "N",
"东南": "SE",
"东北": "NE",
"西南": "SW",
"西北": "NW"
}
g_cloud_pool = DatabaseManager(g_cloud_db)
g_db_cloud = GWPerfCloudDbHelper(g_cloud_pool)
g_db_pool = DatabaseManager(g_dbinfo)
g_db_cross = TmnetDbHelper(g_db_pool)
def dev_args():
try:
parser = argparse.ArgumentParser()
parser.add_argument('--dev', default=0, help='开发环境参数0否1是')
parser.add_argument('--day', default=0, help='开发环境参数0否1是')
args = parser.parse_args()
return args
except SystemExit:
pass
return None
def init(args):
dev, day = 0, 0
if args:
dev = int(args.dev)
day = args.day
if dev == 0:
g_cloud_db['host'] = '172.21.32.41'
g_cloud_db['port'] = 3306
g_dbinfo['host'] = '172.21.32.21'
g_db_pool.init_pool(g_dbinfo)
g_cloud_pool.init_pool(g_cloud_db)
if day == 0:
day = (datetime.now() - timedelta(days=1)).strftime('%Y%m%d')
today_data = g_db_cloud.query_all_cross_examine_problems(day)
yes_str = datetime.strptime(day, '%Y%m%d').strftime('%Y-%m-%d')
yesterday_records = g_db_cross.query_all_cross_examine_records(yes_str)
yesterday_data_dict = parse_records2dict(yesterday_records)
today_records = {}
for row in today_data:
start_hm = row['start_hm']
end_hm = row['end_hm']
crossid = row['crossid']
if row['xxlight_details'] and row['xxlight_details'] != '':
xxlight_detail = row['xxlight_details']
xxlight_reason = row['xxlight_reason']
key = '{}-{}-{}'.format(start_hm, end_hm, crossid)
today_records[key] = {
'cont_times': 1,
'phase_type': xxlight_detail,
'phase_detail': xxlight_reason,
'index_detail': '',
'index_detail_r': ''
}
insert_list, update_list = calc_final_state_color(yesterday_data_dict, today_records, day)
insert_ret, update_ret, update_e, insert_e = 0, 0, None, None
# 先清除当前天的数据]
if len(insert_list) > 0:
insert_ret, insert_e = g_db_cross.insert_cross_examine_records(insert_list)
if len(update_list) > 0:
update_ret, update_e = g_db_cross.update_cross_examine_records(update_list)
if insert_ret == len(insert_list) and update_ret == len(update_list) and not insert_e and not update_e:
print('success')
else:
print('insert_ret: {}, update_ret: {}'.format(insert_ret, update_ret))
print('insert_list: {}, update_list: {}'.format(len(insert_list), len(update_list)))
print(update_e)
print(insert_e)
print('failed')
def parse_records2dict(records):
record_dict = {}
for row in records:
start_hm = row['start_hm']
end_hm = row['end_hm']
crossid = row['crossid']
key = '{}-{}-{}'.format(start_hm, end_hm, crossid)
phase_type = row['phase_type']
phase_detail = row['phase_detail']
final_state = row['final_state']
level_color = row['level_color']
first_date = row['first_date']
change_red_time = row['change_red_daynums']
cont_times = row['cont_times']
record_dict[key] = {
'final_state': final_state,
'level_color': level_color,
'first_date': first_date,
'change_red_daynums': change_red_time,
'phase_type': phase_type,
'phase_detail': phase_detail,
'cont_times': cont_times
}
return record_dict
def calc_final_state_color(yesterday_data_dict, today_records, day):
insert_list, update_list = [], []
for key in today_records.keys():
start_hm = key.split('-')[0]
end_hm = key.split('-')[1]
crossid = key.split('-')[2]
phase_type = today_records[key]['phase_type']
phase_detail = today_records[key]['phase_detail']
if key in yesterday_data_dict.keys():
phase_cont_times = today_records[key]['cont_times'] + yesterday_data_dict[key]['cont_times']
change_red_daynums = yesterday_data_dict[key]['change_red_daynums']
yesterday_final_state = int(yesterday_data_dict[key]['final_state'])
yesterday_level_color = yesterday_data_dict[key]['level_color']
first_date = yesterday_data_dict[key]['first_date']
end_date = None
if yesterday_final_state not in [4, 5, 6]:
if yesterday_final_state == 1:
final_state, level_color = 1, 1
elif yesterday_final_state == 2:
if yesterday_level_color == 2:
if today_records[key]['cont_times'] == 1:
final_state, level_color, change_red_daynums = 2, 1, 0
else:
final_state, level_color = 2, 2
else:
if change_red_daynums >= 2:
final_state, level_color = 1, 1
else:
final_state, level_color = 2, 1
else:
if today_records[key]['cont_times'] == 1:
final_state, level_color = 2, 1
else:
final_state, level_color = 2, 2
update_list.append({
"start_hm": start_hm,
"end_hm": end_hm,
"crossid": crossid,
"phase_type": str(yesterday_data_dict[key]['phase_type']) + '^' + str(phase_type),
"phase_detail": yesterday_data_dict[key]['phase_detail'] + '^' + phase_detail,
"cont_times": phase_cont_times,
"final_state": final_state,
"level_color": level_color,
"first_date": first_date,
"change_red_daynums": change_red_daynums,
"end_date": end_date
})
else:
first_date = day
phase_cont_times = today_records[key]['cont_times']
final_state, level_color = 3, 3
insert_list.append({
"start_hm": start_hm,
"end_hm": end_hm,
"crossid": crossid,
"phase_type": str(phase_type),
"phase_detail": phase_detail,
"cont_times": phase_cont_times,
"final_state": final_state,
"level_color": level_color,
"first_date": first_date,
"change_red_daynums": change_red_daynums
})
yesterday_data_dict.pop(key)
else:
first_date = day
phase_cont_times = today_records[key]['cont_times']
change_red_daynums = 0
final_state, level_color = 3, 3
insert_list.append({
"start_hm": start_hm,
"end_hm": end_hm,
"crossid": crossid,
"phase_type": str(phase_type),
"phase_detail": phase_detail,
"cont_times": phase_cont_times,
"final_state": final_state,
"level_color": level_color,
"first_date": first_date,
"change_red_daynums": change_red_daynums
})
for key in yesterday_data_dict.keys():
if yesterday_data_dict[key]['final_state'] in [4, 5, 6]:
continue
start_hm = key.split('-')[0]
end_hm = key.split('-')[1]
crossid = key.split('-')[2]
phase_type = yesterday_data_dict[key]['phase_type']
phase_detail = yesterday_data_dict[key]['phase_detail']
end_date = None
first_date = yesterday_data_dict[key]['first_date']
change_red_daynums = yesterday_data_dict[key]['change_red_daynums']
if yesterday_data_dict[key]['final_state'] == 3:
end_date = day
final_state = 6
level_color = yesterday_data_dict[key]['level_color']
phase_cont_times = 0
elif yesterday_data_dict[key]['final_state'] == 2:
phase_cont_times = yesterday_data_dict[key]['cont_times'] + 1
if yesterday_data_dict[key]['level_color'] == 2:
final_state = 2
level_color = 2
else:
change_red_daynums = yesterday_data_dict[key]['change_red_daynums'] + 1
if yesterday_data_dict[key]['change_red_daynums'] >= 2:
final_state = 1
level_color = 1
else:
final_state = 2
level_color = 1
else:
phase_cont_times = yesterday_data_dict[key]['cont_times'] + 1
final_state = yesterday_data_dict[key]['final_state']
level_color = yesterday_data_dict[key]['level_color']
update_list.append({
"start_hm": start_hm,
"end_hm": end_hm,
"crossid": crossid,
"phase_type": str(phase_type) + '^' + '',
"phase_detail": phase_detail + '^' + '',
"cont_times": phase_cont_times,
"final_state": final_state,
"level_color": level_color,
"first_date": first_date,
"change_red_daynums": change_red_daynums,
"end_date": end_date
})
return insert_list, update_list
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--dev', default=0, help='开发环境参数0否1是')
parser.add_argument('--day', default=0, help='执行日期')
args = parser.parse_args()
init(args)