cross_doctor/app/comm.py

1156 lines
33 KiB
Python
Raw Normal View History

2025-10-10 14:38:22 +08:00
import time
import logging
from collections import defaultdict
from datetime import datetime, timedelta
import pandas as pd
import re
from tool.excel import t_sheet_cols_new
def save_str2file(content, filename):
f = open(filename, 'w', encoding='utf-8')
f.write(content)
f.close()
def load_from_file(filename):
f = open(filename, 'r', encoding='utf-8')
content = f.read()
f.close()
return content
def str2timestamp(time_string):
"""
输入时间字符串格式为 "YYYY-MM-DD HH:MM:SS"
time_string = "2024-03-09 14:30:00"
:param time_string:
:return:
"""
# 解析这个时间字符串
date_object = datetime.strptime(time_string, "%Y-%m-%d %H:%M:%S")
# 将解析后的时间对象转换为Unix时间戳以秒为单位
timestamp = date_object.timestamp()
# # 输出Unix时间戳
# print(timestamp)
return timestamp
def timestamp2str(timestamp):
# 将Unix时间戳转换为struct_time对象本地时间
local_time_tuple = time.localtime(timestamp)
# 格式化为本地时间字符串
local_time_str = time.strftime('%Y-%m-%d %H:%M:%S', local_time_tuple)
return local_time_str
def timestamp2HMS(timestamp):
# 将Unix时间戳转换为struct_time对象本地时间
local_time_tuple = time.localtime(timestamp)
# 格式化为本地时间字符串
local_time_str = time.strftime('%H:%M:%S', local_time_tuple)
return local_time_str
def timestamp2int(timestamp):
return int(time.strftime('%Y%m%d', time.localtime(timestamp)))
def daynum_to_timestamp(day):
"""
将日期数字转化内Unix时间戳
:param day: yyyymmdd 格式的数字或字符串
:return:
"""
# 解析这个时间字符串
date_object = datetime.strptime(str(day), "%Y%m%d")
# 将解析后的时间对象转换为Unix时间戳以秒为单位
timestamp = date_object.timestamp()
return timestamp
def get_day_offset(day, offset):
"""
计算前几天或后几天的日期数字
:param day: yyyymmdd 格式的数字或字符串
:param offset: 天的偏移个数正数表示后面的日期负数表示前面的日期
:return:
"""
timestamp = daynum_to_timestamp(day)
other_timestamp = timestamp + 86400 * offset
# 将Unix时间戳转换为struct_time对象本地时间
other_time_tuple = time.localtime(other_timestamp)
# 格式化为本地时间字符串
other_day = time.strftime('%Y%m%d', other_time_tuple)
return int(other_day)
def calc_day_diff(day0, day1):
timestamp0 = daynum_to_timestamp(day0)
timestamp1 = daynum_to_timestamp(day1)
return int((timestamp1 - timestamp0) / 86400)
def calc_day_diff_YMD(day1, day2):
timestamp1 = datetime.strptime(str(day1), "%Y-%m-%d").timestamp()
timestamp2 = datetime.strptime(str(day2), "%Y-%m-%d").timestamp()
return int((timestamp2 - timestamp1) / 86400)
def get_day_of_week(day):
"""
计算指定日期是星期几返回1~7表示星期一到星期日
:param day:
:return:
"""
timestamp = daynum_to_timestamp(day)
# 将Unix时间戳转换为struct_time对象本地时间
other_time_tuple = time.localtime(timestamp)
return other_time_tuple.tm_wday + 1
def test_day_offset():
print(get_day_offset(20240325, -7))
print(get_day_offset(20240325, 7))
print(get_day_offset(20240401, -7))
print(get_day_offset(20240408, -7))
def weekday2str(day_of_week):
if day_of_week == 1:
return ''
elif day_of_week == 2:
return ''
elif day_of_week == 3:
return ''
elif day_of_week == 4:
return ''
elif day_of_week == 5:
return ''
elif day_of_week == 6:
return ''
elif day_of_week == 7:
return ''
else:
return 'X'
def get_today_str():
# 获取当前时间戳
current_timestamp = int(time.time())
# 将Unix时间戳转换为struct_time对象本地时间
local_time_tuple = time.localtime(current_timestamp)
# 格式化为本地时间字符串
local_date_str = time.strftime('%Y%m%d', local_time_tuple)
return local_date_str
def get_today_YMD():
# 将Unix时间戳转换为struct_time对象本地时间
local_time_tuple = time.localtime(int(time.time()))
# 格式化为本地时间字符串
local_date_str = time.strftime('%Y-%m-%d', local_time_tuple)
return local_date_str
def get_hhmm():
# 获取当前时刻的小时分钟字符串
now = datetime.now()
# 转换为字符串格式
hhmm_str = now.strftime("%H:%M")
return hhmm_str
def get_weekday():
# 获取当前日期
today = datetime.today().date()
# 获取星期几的整数表示0=星期一, 1=星期二, ..., 6=星期日)
weekday_number = today.weekday()
# 转化为1~7
weekday = weekday_number + 1
return weekday
def filter_last_week_days(filter_week_days_str):
filter_day_set = set()
ss = filter_week_days_str.split(',')
for item in ss:
filter_day_set.add(int(item))
today_num = int(get_today_str())
valid_days = []
for offset in range(-1, -8, -1):
day = get_day_offset(today_num, offset)
week_day = get_day_of_week(day)
if week_day in filter_day_set:
valid_days.append(day)
return valid_days
def filter_last_week_by_days(filter_week_days_str, day, type):
filter_day_set = set()
ss = filter_week_days_str.split(',')
for item in ss:
filter_day_set.add(int(item))
valid_days = []
for offset in range(0, -7, -1):
last_day = get_day_offset(day, offset)
week_day = get_day_of_week(last_day)
if week_day in filter_day_set:
if type == 'workday' and int(week_day) <= 5:
valid_days.append(last_day)
if type == 'weekend' and int(week_day) > 5:
valid_days.append(last_day)
return valid_days
def srcDir_toStr(srcDir):
"""
将标准八方向转化为汉字描述
:param srcDir:
:return:
"""
if srcDir == 'N':
return ''
elif srcDir == 'S':
return ''
elif srcDir == 'E':
return ''
elif srcDir == 'W':
return '西'
elif srcDir == 'NE':
return '东北'
elif srcDir == 'NW':
return '西北'
elif srcDir == 'SE':
return '东南'
elif srcDir == 'SW':
return '西南'
else:
return '未知'
def dirname_to_srcDir(dirname: str):
"""
将汉字描述转化为标准八方向
:param dirname:
:return:
"""
if dirname == '':
return 'N'
elif dirname == '':
return 'S'
elif dirname == '':
return 'E'
elif dirname == '西':
return 'W'
elif dirname == '东北':
return 'NE'
elif dirname == '西北':
return 'NW'
elif dirname == '东南':
return 'SE'
elif dirname == '西南':
return 'SW'
else:
return None
g_valid_srcDir_set = {'E', 'N', 'NE', 'NW', 'S', 'SE', 'SW', 'W'}
# 相反的车流来向pair
g_reversed_srcDir_couples = {'E-W', 'N-S', 'NE-SW', 'NW-SE'}
g_reversed_srcDir_mapping = {'E': 'W', 'W': 'E', 'N': 'S', 'S': 'N', 'NE': 'SW', 'SW': 'NE', 'NW': 'SE', 'SE': 'NW'}
def is_crossed_srcDir(src0, src1):
"""
判断两个方向是否为相交方向也即排除掉相同和相反的情形
:param src0:
:param src1:
:return:
"""
if src0 == src1:
return False
global g_reversed_srcDir_couples
tmp_list = [src0, src1]
tmp_list = sorted(tmp_list)
key = tmp_list[0] + '-' + tmp_list[1]
if key in g_reversed_srcDir_couples:
return False
else:
return True
def is_reversed_srcDir(src0, src1):
"""
判断两个方向是否为相反方向
:param src0:
:param src1:
:return:
"""
if src0 == src1:
return False
global g_reversed_srcDir_couples
tmp_list = [src0, src1]
tmp_list = sorted(tmp_list)
key = tmp_list[0] + '-' + tmp_list[1]
if key in g_reversed_srcDir_couples:
return True
else:
return False
def test_srcDir_judge():
global g_valid_srcDir_set
for src0 in g_valid_srcDir_set:
for src1 in g_valid_srcDir_set:
retA = is_crossed_srcDir(src0, src1)
retB = is_reversed_srcDir(src0, src1)
print("%s %s == crossed:%s, reversed:%s" % (src0, src1, retA, retB))
def srcDir_to_flowStr(srcDir):
if srcDir == 'N':
return '北向南'
elif srcDir == 'S':
return '南向北'
elif srcDir == 'E':
return '东向西'
elif srcDir == 'W':
return '西向东'
elif srcDir == 'NE':
return '东北向西南'
elif srcDir == 'NW':
return '西北向东南'
elif srcDir == 'SE':
return '东南向西北'
elif srcDir == 'SW':
return '西南向东北'
else:
return '未知'
def srcDir_toCarDirectionStr(srcDir):
"""
将单个车流来向转化为平行双向车流的方向描述
:param srcDir:
:return:
"""
if srcDir == 'N':
return '南-北'
elif srcDir == 'S':
return '南-北'
elif srcDir == 'E':
return '东-西'
elif srcDir == 'W':
return '东-西'
elif srcDir == 'NE':
return '东北-西南'
elif srcDir == 'NW':
return '西北-东南'
elif srcDir == 'SE':
return '西北-东南'
elif srcDir == 'SW':
return '东北-西南'
else:
return '未知'
def turn_type_toStr(turn_type):
if turn_type == 0:
return '直行'
elif turn_type == 1:
return '左转'
elif turn_type == 2:
return '右转'
elif turn_type == 3:
return '掉头'
else:
return '未知转向'
def get_turn_type_toStr(turn_type):
if turn_type == 'l':
return '左转'
elif turn_type == 's':
return '直行'
elif turn_type == 'r':
return '右转'
else:
return '未知转向'
def direction_toStr(direction):
ss = direction.split('|')
srcDir = ss[0]
turn_type = int(ss[1])
return "%s进口道-%s" % (srcDir_toStr(srcDir), turn_type_toStr(turn_type))
def get_default_ffs(rc):
if rc == 1:
return 120
elif rc == 2:
return 80
elif rc == 3:
return 70
elif rc == 4:
return 60
elif rc == 5:
return 50
elif rc == 6:
return 90
elif rc == 7:
return 70
elif rc == 8:
return 60
elif rc == 9:
return 50
else:
return 30
def cal_lane_capacity(v):
if v >= 120:
return 2000
elif v >= 100:
return 1900
elif v >= 80:
return 1800
elif v >= 60:
return 1730
elif v >= 50:
return 1690
elif v >= 40:
return 1640
elif v >= 30:
return 1550
else:
return 1380
def cal_lane_rate(lane_num):
if lane_num == 1:
return 1.0
elif lane_num == 2:
return 0.9
elif lane_num == 3:
return 0.8
elif lane_num == 4:
return 0.7
else:
return 0.6
def cal_default_capacity(rc, lane_num, ffs):
lane_capacity = cal_lane_capacity(ffs)
lane_rate = cal_lane_rate(lane_num)
return lane_capacity * lane_num * lane_rate
def calc_service_level(delay_time):
"""从2024-07-31改为国内标准"""
if delay_time <= 10:
return 'A'
elif delay_time <= 20:
return 'B'
elif delay_time <= 35:
return 'C'
elif delay_time <= 55:
return 'D'
elif delay_time <= 80:
return 'E'
else:
return 'F'
def calc_service_level_artery(delay_time, i):
if delay_time <= 5 * i:
return 'A'
elif delay_time <= 15 * i:
return 'B'
elif delay_time <= 25 * i:
return 'C'
elif delay_time <= 40 * i:
return 'D'
elif delay_time <= 60 * i:
return 'E'
else:
return 'F'
def jam_service_level_artery(jam_index):
if jam_index < 1.5:
return 'A'
elif jam_index < 2:
return 'D'
elif jam_index < 4:
return 'E'
else:
return 'F'
def tp_to_str(tp):
if tp == 100:
return '早高峰'
elif tp == 200:
return '晚高峰'
elif tp == 300:
return '早晚高峰'
elif tp == 400:
return '平峰'
elif tp == 500:
return '凌晨'
elif tp == 600:
return '夜间'
elif tp == 1000:
return '全天'
else:
return '未知'
def get_intervals_from_tp(tp):
"""
获取特殊时段对应的时间区间列表
:param tp: 特殊时段编号
:return: [ [start, end] [start, end] ... ]
"""
res = []
if tp == 100:
res.append(['07:00', '09:00'])
elif tp == 200:
res.append(['17:00', '19:00'])
elif tp == 300:
res.append(['07:00', '09:00'])
res.append(['17:00', '19:00'])
elif tp == 400:
res.append(['06:00', '07:00'])
res.append(['09:00', '17:00'])
res.append(['19:00', '22:00'])
elif tp == 500:
res.append(['00:00', '06:00'])
elif tp == 600:
res.append(['22:00', '24:00'])
elif tp == 1000:
res.append(['00:00', '24:00'])
else:
pass
return res
def get_intervals_from_tp_in(tp):
"""
获取特殊时段对应的时间区间列表
:param tp: 特殊时段编号
:return: [ [start, end] [start, end] ... ]
"""
res = []
if tp == 100:
res.append(['07:00', '09:00'])
elif tp == 200:
res.append(['17:00', '19:00'])
elif tp == 300:
res.append(['07:00', '09:00'])
res.append(['17:00', '19:00'])
elif tp == 400:
res.append(['06:00', '07:00'])
res.append(['09:00', '17:00'])
res.append(['19:00', '22:00'])
elif tp == 500:
res.append(['00:00', '06:00'])
elif tp == 600:
res.append(['22:00', '24:00'])
elif tp == 1000:
res.append(['00:00', '24:00'])
else:
pass
return res
def get_intervals_from_tp_slice(tp):
"""
获取特殊时段对应的时间区间列表
:param tp: 特殊时段编号
:return: [ [start, end] [start, end] ... ]
"""
res = []
if tp == 100:
res.append(['07:00', '09:00'])
elif tp == 200:
res.append(['17:00', '19:00'])
elif tp == 300:
res.append(['07:00', '09:00'])
res.append(['17:00', '19:00'])
elif tp == 400:
res.append(['06:00', '07:00'])
res.append(['09:00', '17:00'])
res.append(['19:00', '22:00'])
elif tp == 500:
res.append(['00:00', '06:00'])
elif tp == 600:
res.append(['22:00', '24:00'])
elif tp == 1000:
res.append(['00:00', '24:00'])
else:
pass
return res
def has_intersection(one, other):
return other[0] < one[1] and other[1] > one[0]
def has_intersection_with_list(one, other_interval_list):
for other in other_interval_list:
if has_intersection(one, other):
return True
return False
def fall_in_intervals(hm, interval_list):
for one in interval_list:
if one[0] < hm <= one[1]:
return True
return False
def if_now_fit_weektp(week_days, tp, tp_start, tp_end):
"""
判断当前时刻是否落入指定星期几集合的特定时段中
:param week_days: 星期几集合格式:'1,2,3,4,5'
:param tp: 0表示自定义其他预定义: 100/200/300/400/500/600/1000
:param tp_start: tp==0情况下的时段开始时刻
:param tp_end: tp==0情况下的时段结束时刻
:return: bool
"""
ss = week_days.split(',')
weekday_set = set()
for item in ss:
weekday_set.add(int(item))
today_weekday = get_weekday()
if today_weekday not in weekday_set:
return False
# 计算时段对应的区间列表
now_hm = get_hhmm()
interval_list = []
if tp == 0:
interval_list = [[tp_start, tp_end]]
else:
interval_list = get_intervals_from_tp(tp)
# 判断当前时刻 是否落入的区间列表中
return fall_in_intervals(now_hm, interval_list)
def calc_intersection(one, other):
"""
计算两个区间的交集区间无交集时返回 None
:param one:
:param other:
:return:
"""
if has_intersection(one, other):
return [max(one[0], other[0]), min(one[1], other[1])]
else:
return None
def calc_intersection_of_two_intervals(one_list, other_list):
"""
计算两个区间列表的交集如果没有交集则返回 None
:param one_list:
:param other_list:
:return: intersection_list
"""
intersection_list = []
for one in one_list:
for other in other_list:
inter = calc_intersection(one, other)
if inter:
intersection_list.append(inter)
if len(intersection_list) > 0:
return intersection_list
else:
return None
def interval_list_to_str(interval_list):
ss = []
for one in interval_list:
ss.append('%s - %s' % (one[0], one[1]))
return ', '.join(ss)
def timer_decorator(func):
def wrapper(*args, **kwargs):
start_time = time.time()
result = func(*args, **kwargs)
end_time = time.time()
msg = f"{func.__name__} time_spent {end_time - start_time:.6f} s"
logging.debug(msg)
print(msg)
return result
return wrapper
def make_common_res(status, msg):
res = {}
res['status'] = status
res['msg'] = msg
return res
def make_res(status, msg, desc):
res = {}
res['status'] = status
res['msg'] = msg
res['desc'] = desc
return res
def roadclass2str(rc):
rc_mapping = {1: '高速公路', 2: '国道', 3: '省道', 4: '县道', 5: '其他低等级道路', 6: '城市快速路', 7: '城市主干道',
8: '城市次干道', 9: '城市支路', 10: '郊区小路'}
rc = int(rc)
if rc < 1 or rc > 10:
return '未知道路等级'
else:
return rc_mapping[rc]
def comparable_rank(rc):
"""
转换为可数值比较的rc取值范围1~10,数值越小等级越高
"""
if 1 <= rc <= 4:
return rc * 2
elif 6 <= rc <= 9:
return rc * 2 - 11
elif rc == 5:
return 9
else:
return int(rc)
def make_tp_list(tp_start_list):
num = len(tp_start_list)
tp_list = []
for i in range(0, num):
if i < num - 1:
tp_list.append([tp_start_list[i], tp_start_list[i + 1]])
else:
# if tp_start_list[i] < '23:59':
tp_list.append([tp_start_list[i], '24:00'])
return tp_list
def make_map_from_list(info_list, key_name):
res = {}
for info in info_list:
key = info[key_name]
res[key] = info
return res
def parse_int_list(num_list_str):
tmp_str_list = num_list_str.split(',')
num_list = []
for ss in tmp_str_list:
num_list.append(int(ss))
return num_list
def parse_float_list(num_list_str):
tmp_str_list = num_list_str.split(',')
num_list = []
for ss in tmp_str_list:
num_list.append(float(ss))
return num_list
def parse_phase_name(phase_name):
"""解析相位的文本描述"""
num = len(phase_name)
srcDir_part = phase_name[0: num - 2]
turn_part = phase_name[num - 2:]
return srcDir_part, turn_part
def list_to_str(item_list, spliter):
if not item_list or len(item_list) == 0:
return ''
"""将list转化为字符串"""
ss = []
for item in item_list:
ss.append(str(item))
return spliter.join(ss)
def time_str_to_seconds(time_str):
time_obj = datetime.strptime(time_str, '%H:%M')
seconds = time_obj.hour * 3600 + time_obj.minute * 60
return seconds
def time_str_to_minutes(time_str):
time_obj = datetime.strptime(time_str, '%H:%M')
minutes = time_obj.hour * 60 + time_obj.minute
return minutes
def allowed_file(filename):
return '.' in filename and filename.rsplit('.', 1)[1].lower() in {'xlsx', 'xls'}
def allowed_file_suffix(filename, suffix):
return '.' in filename and filename.rsplit('.', 1)[1].lower() in suffix
t_sheet_cols = {
'调度计划表': {
'head': ['序号', '调度类型', '月份', '日期', '星期', '日计划号'],
'column': ['priority', 'type', 'month', 'day', 'weekday', 'scheduleid'],
'rex': [r'^\d+$', r'^\d+|$', r'^([0-9]|1[0-2]|)$', r'^(?:[0-9]|[12][0-9]|3[01]|)$', r'^([0-7]|)$', r'^\d+$'],
},
'日计划表': {
'head': ['序号', '日计划号', '执行时段', '配时方案号', '控制模式'],
'column': ['eg', 'scheduleid', 'tp_start', 'planid', 'control_mode'],
'rex': [r'^\d+$', r'^\d+$', r'^[0-2]\d:[0-5]\d$', r'^\d+$', r'.'],
},
'配时方案表': {
'head': ['序号', '方案号', '方案名称', '周期长', '阶段序号', '阶段号', '阶段时间', '协调相位号', '协调相位差'],
'column': ['eg', 'planid', 'name', 'cycle', 'stage_seq', 'stageid', 'stage_duration', 'coord_phaseid',
'offset'],
'rex': [r'^\d+$', r'^\d+$', r'.', r'^\d+$', r'^\d+$', r'^\d+$', r'^\d+$', r'^(\d+|)$', r'^(\d+|)$'],
},
'阶段表': {
'head': ['序号', '阶段号', '阶段名称', '黄灯时间', '全红时间', '关联相位'],
'column': ['eg', 'stageid', 'name', 'yellow', 'allred', 'phases'],
'rex': [r'^\d+$', r'^\d+$', r'.', r'^\d+$', r'^\d+$', r'^(\d+,)*\d+$'],
},
'相位表': {
'head': ['序号', '相位号', '相位名称', '最小绿', '最大绿'],
'column': ['eg', 'phaseid', 'name', 'min_green', 'max_green'],
'rex': [r'^\d+$', r'^\d+$', r'.', r'^\d+$', r'^\d+$'],
}
}
def read_and_filter_excel(file, sheet_names, is_new=False):
xl = pd.ExcelFile(file)
sheet_cols = t_sheet_cols
if is_new:
sheet_cols = t_sheet_cols_new
if not all(sheet_name in sheet_cols for sheet_name in xl.sheet_names):
return {}, '存在不可用sheet,请检查后重新上传'
if not all(sheet_name in xl.sheet_names for sheet_name in sheet_names):
return {}, 'sheet不匹配,请检查后重新上传'
list = {}
for sheet_name in sheet_names:
df = pd.read_excel(xl, sheet_name, dtype=str, na_filter=False)
list[sheet_name] = []
if len(df.columns) < len(sheet_cols[sheet_name]['head']):
return {}, '表头不匹配,请检查后重新上传'
for index, column_name in enumerate(sheet_cols[sheet_name]['head']):
if column_name != df.columns[index]:
return {}, '表头错误,请检查后重新上传'
for index, row in df.iterrows():
line = {}
cleaned_row = row.apply(lambda x: x.strip() if isinstance(x, str) else x)
for i in range(len(sheet_cols[sheet_name]['head'])):
if not re.match(sheet_cols[sheet_name]['rex'][i], str(cleaned_row.iloc[i])):
return {}, f'{sheet_name} {df.columns[i]}列 第{index + 2}行 格式错误'
if len(str(cleaned_row.iloc[i])) > 100:
return {}, f'{sheet_name} {df.columns[i]}列 第{index + 2}行 长度超过100个字符'
line[sheet_cols[sheet_name]['column'][i]] = cleaned_row.iloc[i]
if sheet_name == "调度计划表" and '日计划名称' in df.columns:
line['schedule_name'] = cleaned_row.iloc[len(sheet_cols[sheet_name]['head'])]
list[sheet_name].append(line)
if len(list[sheet_name]) == 0:
return {}, '数据不能为空'
return list, ''
def check_request(value):
try:
v = str(value).lower()
pattern = r"\b(and|like|exec|insert|select|drop|grant|alter|delete|update|count|chr|mid|master|truncate|char|delclare|or)\b|(\*|;)"
r = re.search(pattern, v)
if r:
raise Exception("request params is wrong")
return None
except Exception as error:
return error
def get_time_stamp_list_by_duration(timestamp, duration):
batch_size = 0
if duration == 1:
batch_size = 3
elif duration == 2: # 1h
batch_size = 12
elif duration == 3: # 2h
batch_size = 24
timestamp_list = []
# 15分钟
for i in range(0, batch_size):
timestamp_list.append(timestamp - 300 * i)
return timestamp_list
# 获取可用的天
def get_days_from_week_daytype(week, day, daytype):
weeks = week.split(',')
week_day = get_day_of_week(day)
if daytype == 'today' and str(week_day) in weeks:
return [day]
if daytype == 'hisday' and str(week_day) in weeks:
return [day]
if daytype != 'hisday' and daytype != 'today':
if daytype == 'workdayofweek':
return filter_last_week_by_days(week, day, 'workday')
if daytype == 'weekendofweek':
return filter_last_week_by_days(week, day, 'weekend')
return []
def get_time_str_slice_by_dur(start, end):
result = []
while start < end:
new_time_obj = datetime.strptime(start, '%H:%M').timestamp() + 3600
new_time_obj = datetime.fromtimestamp(new_time_obj)
# 将新的时间对象格式化为字符串
new_time_str = new_time_obj.strftime('%H:%M')
if new_time_str == '00:00':
new_time_str = '24:00'
result.append("{}-{}".format(start, new_time_str))
start = new_time_str
return result
# 组合绿波协调时间段列表
def get_green_waves_tp(green_waves_info):
tps = []
for item_green_wave in green_waves_info:
if item_green_wave['launch_mode'] == 'manual' or item_green_wave['week_days'] == '':
continue
# 筛选绿波的时段
if item_green_wave['tp'] > 0:
tp_slice = get_intervals_from_tp_in(item_green_wave['tp'])
for sublist in tp_slice:
str_end = get_time_str_slice_by_dur(sublist[0], sublist[1])
if len(str_end) > 0:
tps += [item_list for item_list in str_end if item_list]
if item_green_wave['tp'] <= 0:
tp_end = item_green_wave['tp_end']
if tp_end == '00:00':
tp_end = '24:00'
str_end = get_time_str_slice_by_dur(item_green_wave['tp_start'], tp_end)
tps += [item_list for item_list in str_end if item_list]
tps_range = list(set(tps))
return tps_range
#获取绿波协调路段时间段和协调速度
def get_green_waves_tp_speed(green_waves_info):
result = defaultdict(lambda: defaultdict(list))
for item_green_wave in green_waves_info:
if item_green_wave['launch_mode'] == 'manual' or item_green_wave['week_days'] == '':
continue
# 筛选绿波的时段
if item_green_wave['tp'] > 0:
tp_slice = get_intervals_from_tp_in(item_green_wave['tp'])
for sublist in tp_slice:
str_end = '-'.join(sublist)
result[item_green_wave['waveid']][str_end] = item_green_wave
if item_green_wave['tp'] <= 0:
tp_end = item_green_wave['tp_end']
if tp_end == '00:00':
tp_end = '24:00'
result[item_green_wave['waveid']][
item_green_wave['tp_start'] + '-' + tp_end] = item_green_wave
return result
#获取配时方案和时间
def get_green_waves_road(wave_info, cross_info):
road_list = {} #正向
if wave_info['type'] == 0 or wave_info['type'] == 2:
i = 0
speeds = wave_info['speed_list'].split(',')
for item_cross in cross_info:
if item_cross['cross_seq'] <= wave_info['start_cross_seq']:
continue
i += 1
if i < wave_info['cross_num']:
speed = speeds[i - 1]
road_list[item_cross['inroadid']] = float(speed)
if wave_info['type'] == 1 or wave_info['type'] == 2:
i = 0
speeds = wave_info['speed_list_reversed'].split(',') if wave_info['speed_list_reversed'] != '' else []
for item_cross in cross_info:
if item_cross['cross_seq'] < wave_info['start_cross_seq']:
continue
i += 1
if i < wave_info['cross_num']:
speed = speeds[i - 1] if len(speeds) > 0 else 0
road_list[item_cross['inroadid_reversed']] = float(speed)
return road_list
# 取两个时间段列表的差集
def get_green_waves_tp_diff(green_tps_slice, exist_tp_slice):
result_tp_slice = []
for item_tp in exist_tp_slice:
if item_tp in green_tps_slice:
continue
result_tp_slice.append(item_tp)
return sorted(result_tp_slice)
# 取两个时间段列表的交集
def get_green_waves_tp_intersect(tp_slice_1, tp_slice_2):
result_tp_slice = []
for item_tp in tp_slice_1:
if item_tp in tp_slice_2:
result_tp_slice.append(item_tp)
return sorted(result_tp_slice)
# 根据时间段组成连续的时间段
def get_green_waves_tp_continuous(tp_slice):
green_waves_time_dur = ''
green_waves_result_final_map = {}
green_waves_result_final_slice = []
for time_dur in tp_slice:
if len(green_waves_result_final_map) == 0:
green_waves_time_dur = time_dur
green_waves_result_final_map[time_dur] = 1
continue
this_time_dur = time_dur.split('-')
prev_time_dur = green_waves_time_dur.split('-')
if this_time_dur[0] == prev_time_dur[1]:
del green_waves_result_final_map[green_waves_time_dur]
green_waves_time_dur = prev_time_dur[0] + "-" + this_time_dur[1]
green_waves_result_final_map[green_waves_time_dur] = 1
if this_time_dur[0] != prev_time_dur[1]:
green_waves_time_dur = time_dur
green_waves_result_final_map[green_waves_time_dur] = 1
for final_tp, _ in green_waves_result_final_map.items():
green_waves_result_final_slice.append(final_tp)
return green_waves_result_final_slice
def get_used_week_days(daytype, day):
weeks = '1,2,3,4,5'
if daytype == 'weekendofweek':
weeks = '6,7'
if daytype == 'hisday':
weeks = str(get_day_of_week(day))
days = get_days_from_week_daytype(weeks, day, daytype)
return days
# 根据日期和tp获取指定dur间隔时间戳dur是分钟每15分钟间隔取时间戳列表
def get_time_stamp_list_by_days_tp(days, tp, dur):
default_start_end = []
tp_list = get_intervals_from_tp_in(tp)
for item_day in days:
day_str = str(item_day)
for item_tp in tp_list:
item_start_stamp = item_tp[0]
item_end_stamp = item_tp[1]
next_day_str = day_str
start_stamp = int(
str2timestamp("{} {}:00".format(f"{day_str[:4]}-{day_str[4:6]}-{day_str[6:]}", item_start_stamp)))
if item_tp[1] == '24:00':
item_end_stamp = '00:00'
next_day_str = str(get_day_offset(item_day, 1))
end_stamp = int(
str2timestamp(
"{} {}:00".format(f"{next_day_str[:4]}-{next_day_str[4:6]}-{next_day_str[6:]}", item_end_stamp)))
while start_stamp <= end_stamp:
default_start_end.append(start_stamp)
start_stamp += dur * 60
return default_start_end
#生成每半小时一个时段
def get_half_hour_24_str():
# 获取当前时间
current_time = datetime.now()
# 将分钟数、秒数和微秒数置零,保留小时数
current_time = current_time.replace(minute=0, second=0, microsecond=0)
# 初始化一个空列表来存储结果
hourly_times = []
# 循环生成一天中每个小时的时间点
for i in range(48):
hourly_times.append(current_time.strftime('%H:%M')) # 格式化为 '小时:分钟' 的字符串
current_time += timedelta(minutes=30) # 加1小时
return hourly_times
# 生成每一个时段
def get_hour_24_str():
# 获取当前时间
current_time = datetime.now()
# 将分钟数、秒数和微秒数置零,保留小时数
current_time = current_time.replace(minute=0, second=0, microsecond=0)
# 初始化一个空列表来存储结果
hourly_times = []
# 循环生成一天中每个小时的时间点
for i in range(24):
hourly_times.append(current_time.strftime('%H:%M')) # 格式化为 '小时:分钟' 的字符串
current_time += timedelta(minutes=60) # 加1小时
return hourly_times
#获取周天交集
def get_green_waves_week_mixed(green_weeks, week_day):
week_days = []
for item_week_day in green_weeks:
for week in week_day:
if item_week_day == week:
week_days.append(week)
return week_days
#根据绿波配置组装路口
def get_green_waves_cross_list(start_seq, cross_num, artery_coss_list):
green_waves_list = []
start_index = start_seq - 1
for i in range(0, cross_num):
index = start_index + i
green_waves_list.append(artery_coss_list[index])
return green_waves_list