cross_doctor/app/comm.py

1196 lines
34 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import time
import logging
from collections import defaultdict
from datetime import datetime, timedelta
import pandas as pd
import re
from tool.excel import t_sheet_cols_new
def save_str2file(content, filename):
f = open(filename, 'w', encoding='utf-8')
f.write(content)
f.close()
def load_from_file(filename):
f = open(filename, 'r', encoding='utf-8')
content = f.read()
f.close()
return content
def str2timestamp(time_string):
"""
输入时间字符串,格式为 "YYYY-MM-DD HH:MM:SS"
time_string = "2024-03-09 14:30:00"
:param time_string:
:return:
"""
# 解析这个时间字符串
date_object = datetime.strptime(time_string, "%Y-%m-%d %H:%M:%S")
# 将解析后的时间对象转换为Unix时间戳以秒为单位
timestamp = date_object.timestamp()
# # 输出Unix时间戳
# print(timestamp)
return timestamp
def timestamp2str(timestamp):
# 将Unix时间戳转换为struct_time对象本地时间
local_time_tuple = time.localtime(timestamp)
# 格式化为本地时间字符串
local_time_str = time.strftime('%Y-%m-%d %H:%M:%S', local_time_tuple)
return local_time_str
def timestamp2HMS(timestamp):
# 将Unix时间戳转换为struct_time对象本地时间
local_time_tuple = time.localtime(timestamp)
# 格式化为本地时间字符串
local_time_str = time.strftime('%H:%M:%S', local_time_tuple)
return local_time_str
def timestamp2int(timestamp):
return int(time.strftime('%Y%m%d', time.localtime(timestamp)))
def daynum_to_timestamp(day):
"""
将日期数字转化内Unix时间戳
:param day: yyyymmdd 格式的数字或字符串
:return:
"""
# 解析这个时间字符串
date_object = datetime.strptime(str(day), "%Y%m%d")
# 将解析后的时间对象转换为Unix时间戳以秒为单位
timestamp = date_object.timestamp()
return timestamp
def get_day_offset(day, offset):
"""
计算前几天或后几天的日期数字
:param day: yyyymmdd 格式的数字或字符串
:param offset: 天的偏移个数,正数表示后面的日期,负数表示前面的日期
:return:
"""
timestamp = daynum_to_timestamp(day)
other_timestamp = timestamp + 86400 * offset
# 将Unix时间戳转换为struct_time对象本地时间
other_time_tuple = time.localtime(other_timestamp)
# 格式化为本地时间字符串
other_day = time.strftime('%Y%m%d', other_time_tuple)
return int(other_day)
def calc_day_diff(day0, day1):
timestamp0 = daynum_to_timestamp(day0)
timestamp1 = daynum_to_timestamp(day1)
return int((timestamp1 - timestamp0) / 86400)
def calc_day_diff_YMD(day1, day2):
timestamp1 = datetime.strptime(str(day1), "%Y-%m-%d").timestamp()
timestamp2 = datetime.strptime(str(day2), "%Y-%m-%d").timestamp()
return int((timestamp2 - timestamp1) / 86400)
def get_day_of_week(day):
"""
计算指定日期是星期几返回1~7表示星期一到星期日
:param day:
:return:
"""
timestamp = daynum_to_timestamp(day)
# 将Unix时间戳转换为struct_time对象本地时间
other_time_tuple = time.localtime(timestamp)
return other_time_tuple.tm_wday + 1
def test_day_offset():
print(get_day_offset(20240325, -7))
print(get_day_offset(20240325, 7))
print(get_day_offset(20240401, -7))
print(get_day_offset(20240408, -7))
def weekday2str(day_of_week):
if day_of_week == 1:
return ''
elif day_of_week == 2:
return ''
elif day_of_week == 3:
return ''
elif day_of_week == 4:
return ''
elif day_of_week == 5:
return ''
elif day_of_week == 6:
return ''
elif day_of_week == 7:
return ''
else:
return 'X'
def get_today_str():
# 获取当前时间戳
current_timestamp = int(time.time())
# 将Unix时间戳转换为struct_time对象本地时间
local_time_tuple = time.localtime(current_timestamp)
# 格式化为本地时间字符串
local_date_str = time.strftime('%Y%m%d', local_time_tuple)
return local_date_str
def get_today_YMD():
# 将Unix时间戳转换为struct_time对象本地时间
local_time_tuple = time.localtime(int(time.time()))
# 格式化为本地时间字符串
local_date_str = time.strftime('%Y-%m-%d', local_time_tuple)
return local_date_str
def get_hhmm():
# 获取当前时刻的小时分钟字符串
now = datetime.now()
# 转换为字符串格式
hhmm_str = now.strftime("%H:%M")
return hhmm_str
def get_weekday():
# 获取当前日期
today = datetime.today().date()
# 获取星期几的整数表示0=星期一, 1=星期二, ..., 6=星期日)
weekday_number = today.weekday()
# 转化为1~7
weekday = weekday_number + 1
return weekday
def filter_last_week_days(filter_week_days_str):
filter_day_set = set()
ss = filter_week_days_str.split(',')
for item in ss:
filter_day_set.add(int(item))
today_num = int(get_today_str())
valid_days = []
for offset in range(-1, -8, -1):
day = get_day_offset(today_num, offset)
week_day = get_day_of_week(day)
if week_day in filter_day_set:
valid_days.append(day)
return valid_days
def filter_last_week_by_days(filter_week_days_str, day, type):
filter_day_set = set()
ss = filter_week_days_str.split(',')
for item in ss:
filter_day_set.add(int(item))
valid_days = []
for offset in range(0, -7, -1):
last_day = get_day_offset(day, offset)
week_day = get_day_of_week(last_day)
if week_day in filter_day_set:
if type == 'workday' and int(week_day) <= 5:
valid_days.append(last_day)
if type == 'weekend' and int(week_day) > 5:
valid_days.append(last_day)
return valid_days
def srcDir_toStr(srcDir):
"""
将标准八方向转化为汉字描述
:param srcDir:
:return:
"""
if srcDir == 'N':
return ''
elif srcDir == 'S':
return ''
elif srcDir == 'E':
return ''
elif srcDir == 'W':
return '西'
elif srcDir == 'NE':
return '东北'
elif srcDir == 'NW':
return '西北'
elif srcDir == 'SE':
return '东南'
elif srcDir == 'SW':
return '西南'
else:
return '未知'
def dirname_to_srcDir(dirname: str):
"""
将汉字描述转化为标准八方向
:param dirname:
:return:
"""
if dirname == '':
return 'N'
elif dirname == '':
return 'S'
elif dirname == '':
return 'E'
elif dirname == '西':
return 'W'
elif dirname == '东北':
return 'NE'
elif dirname == '西北':
return 'NW'
elif dirname == '东南':
return 'SE'
elif dirname == '西南':
return 'SW'
else:
return None
g_valid_srcDir_set = {'E', 'N', 'NE', 'NW', 'S', 'SE', 'SW', 'W'}
# 相反的车流来向pair
g_reversed_srcDir_couples = {'E-W', 'N-S', 'NE-SW', 'NW-SE'}
g_reversed_srcDir_mapping = {'E': 'W', 'W': 'E', 'N': 'S', 'S': 'N', 'NE': 'SW', 'SW': 'NE', 'NW': 'SE', 'SE': 'NW'}
def is_crossed_srcDir(src0, src1):
"""
判断两个方向是否为相交方向,也即:排除掉:相同和相反的情形
:param src0:
:param src1:
:return:
"""
if src0 == src1:
return False
global g_reversed_srcDir_couples
tmp_list = [src0, src1]
tmp_list = sorted(tmp_list)
key = tmp_list[0] + '-' + tmp_list[1]
if key in g_reversed_srcDir_couples:
return False
else:
return True
def is_reversed_srcDir(src0, src1):
"""
判断两个方向是否为相反方向
:param src0:
:param src1:
:return:
"""
if src0 == src1:
return False
global g_reversed_srcDir_couples
tmp_list = [src0, src1]
tmp_list = sorted(tmp_list)
key = tmp_list[0] + '-' + tmp_list[1]
if key in g_reversed_srcDir_couples:
return True
else:
return False
def test_srcDir_judge():
global g_valid_srcDir_set
for src0 in g_valid_srcDir_set:
for src1 in g_valid_srcDir_set:
retA = is_crossed_srcDir(src0, src1)
retB = is_reversed_srcDir(src0, src1)
print("%s %s == crossed:%s, reversed:%s" % (src0, src1, retA, retB))
def srcDir_to_flowStr(srcDir):
if srcDir == 'N':
return '北向南'
elif srcDir == 'S':
return '南向北'
elif srcDir == 'E':
return '东向西'
elif srcDir == 'W':
return '西向东'
elif srcDir == 'NE':
return '东北向西南'
elif srcDir == 'NW':
return '西北向东南'
elif srcDir == 'SE':
return '东南向西北'
elif srcDir == 'SW':
return '西南向东北'
else:
return '未知'
def srcDir_toCarDirectionStr(srcDir):
"""
将单个车流来向,转化为平行双向车流的方向描述
:param srcDir:
:return:
"""
if srcDir == 'N':
return '南-北'
elif srcDir == 'S':
return '南-北'
elif srcDir == 'E':
return '东-西'
elif srcDir == 'W':
return '东-西'
elif srcDir == 'NE':
return '东北-西南'
elif srcDir == 'NW':
return '西北-东南'
elif srcDir == 'SE':
return '西北-东南'
elif srcDir == 'SW':
return '东北-西南'
else:
return '未知'
def turn_type_toStr(turn_type):
if turn_type == 0:
return '直行'
elif turn_type == 1:
return '左转'
elif turn_type == 2:
return '右转'
elif turn_type == 3:
return '掉头'
else:
return '未知转向'
def get_turn_type_toStr(turn_type):
if turn_type == 'l':
return '左转'
elif turn_type == 's':
return '直行'
elif turn_type == 'r':
return '右转'
else:
return '未知转向'
def direction_toStr(direction):
ss = direction.split('|')
srcDir = ss[0]
turn_type = int(ss[1])
return "%s进口道-%s" % (srcDir_toStr(srcDir), turn_type_toStr(turn_type))
def get_default_ffs(rc):
if rc == 1:
return 120
elif rc == 2:
return 80
elif rc == 3:
return 70
elif rc == 4:
return 60
elif rc == 5:
return 50
elif rc == 6:
return 90
elif rc == 7:
return 70
elif rc == 8:
return 60
elif rc == 9:
return 50
else:
return 30
def cal_lane_capacity(v):
if v >= 120:
return 2000
elif v >= 100:
return 1900
elif v >= 80:
return 1800
elif v >= 60:
return 1730
elif v >= 50:
return 1690
elif v >= 40:
return 1640
elif v >= 30:
return 1550
else:
return 1380
def cal_lane_rate(lane_num):
if lane_num == 1:
return 1.0
elif lane_num == 2:
return 0.9
elif lane_num == 3:
return 0.8
elif lane_num == 4:
return 0.7
else:
return 0.6
def cal_default_capacity(rc, lane_num, ffs):
lane_capacity = cal_lane_capacity(ffs)
lane_rate = cal_lane_rate(lane_num)
return lane_capacity * lane_num * lane_rate
def calc_service_level(delay_time):
"""从2024-07-31改为国内标准"""
if delay_time <= 10:
return 'A'
elif delay_time <= 20:
return 'B'
elif delay_time <= 35:
return 'C'
elif delay_time <= 55:
return 'D'
elif delay_time <= 80:
return 'E'
else:
return 'F'
def calc_service_level_artery(delay_time, i):
if delay_time <= 5 * i:
return 'A'
elif delay_time <= 15 * i:
return 'B'
elif delay_time <= 25 * i:
return 'C'
elif delay_time <= 40 * i:
return 'D'
elif delay_time <= 60 * i:
return 'E'
else:
return 'F'
def jam_service_level_artery(jam_index):
if jam_index < 1.5:
return 'A'
elif jam_index < 2:
return 'D'
elif jam_index < 4:
return 'E'
else:
return 'F'
def tp_to_str(tp):
if tp == 100:
return '早高峰'
elif tp == 200:
return '晚高峰'
elif tp == 300:
return '早晚高峰'
elif tp == 400:
return '平峰'
elif tp == 500:
return '凌晨'
elif tp == 600:
return '夜间'
elif tp == 1000:
return '全天'
else:
return '未知'
def get_intervals_from_tp(tp):
"""
获取特殊时段对应的时间区间列表
:param tp: 特殊时段编号
:return: [ [start, end] [start, end] ... ]
"""
res = []
if tp == 100:
res.append(['07:00', '09:00'])
elif tp == 200:
res.append(['17:00', '19:00'])
elif tp == 300:
res.append(['07:00', '09:00'])
res.append(['17:00', '19:00'])
elif tp == 400:
res.append(['06:00', '07:00'])
res.append(['09:00', '17:00'])
res.append(['19:00', '22:00'])
elif tp == 500:
res.append(['00:00', '06:00'])
elif tp == 600:
res.append(['22:00', '24:00'])
elif tp == 1000:
res.append(['00:00', '24:00'])
else:
pass
return res
def get_intervals_from_tp_in(tp):
"""
获取特殊时段对应的时间区间列表
:param tp: 特殊时段编号
:return: [ [start, end] [start, end] ... ]
"""
res = []
if tp == 100:
res.append(['07:00', '09:00'])
elif tp == 200:
res.append(['17:00', '19:00'])
elif tp == 300:
res.append(['07:00', '09:00'])
res.append(['17:00', '19:00'])
elif tp == 400:
res.append(['06:00', '07:00'])
res.append(['09:00', '17:00'])
res.append(['19:00', '22:00'])
elif tp == 500:
res.append(['00:00', '06:00'])
elif tp == 600:
res.append(['22:00', '24:00'])
elif tp == 1000:
res.append(['00:00', '24:00'])
else:
pass
return res
def get_intervals_from_tp_slice(tp):
"""
获取特殊时段对应的时间区间列表
:param tp: 特殊时段编号
:return: [ [start, end] [start, end] ... ]
"""
res = []
if tp == 100:
res.append(['07:00', '09:00'])
elif tp == 200:
res.append(['17:00', '19:00'])
elif tp == 300:
res.append(['07:00', '09:00'])
res.append(['17:00', '19:00'])
elif tp == 400:
res.append(['06:00', '07:00'])
res.append(['09:00', '17:00'])
res.append(['19:00', '22:00'])
elif tp == 500:
res.append(['00:00', '06:00'])
elif tp == 600:
res.append(['22:00', '24:00'])
elif tp == 1000:
res.append(['00:00', '24:00'])
else:
pass
return res
def has_intersection(one, other):
return other[0] < one[1] and other[1] > one[0]
def has_intersection_with_list(one, other_interval_list):
for other in other_interval_list:
if has_intersection(one, other):
return True
return False
def fall_in_intervals(hm, interval_list):
for one in interval_list:
if one[0] < hm <= one[1]:
return True
return False
def if_now_fit_weektp(week_days, tp, tp_start, tp_end):
"""
判断当前时刻是否落入指定星期几集合的特定时段中
:param week_days: 星期几集合,格式:'1,2,3,4,5'
:param tp: 0表示自定义其他预定义: 100/200/300/400/500/600/1000
:param tp_start: tp==0情况下的时段开始时刻
:param tp_end: tp==0情况下的时段结束时刻
:return: bool
"""
ss = week_days.split(',')
weekday_set = set()
for item in ss:
weekday_set.add(int(item))
today_weekday = get_weekday()
if today_weekday not in weekday_set:
return False
# 计算时段对应的区间列表
now_hm = get_hhmm()
interval_list = []
if tp == 0:
interval_list = [[tp_start, tp_end]]
else:
interval_list = get_intervals_from_tp(tp)
# 判断当前时刻 是否落入的区间列表中
return fall_in_intervals(now_hm, interval_list)
def calc_intersection(one, other):
"""
计算两个区间的交集区间,无交集时返回 None
:param one:
:param other:
:return:
"""
if has_intersection(one, other):
return [max(one[0], other[0]), min(one[1], other[1])]
else:
return None
def calc_intersection_of_two_intervals(one_list, other_list):
"""
计算两个区间列表的交集。如果没有交集,则返回 None
:param one_list:
:param other_list:
:return: intersection_list
"""
intersection_list = []
for one in one_list:
for other in other_list:
inter = calc_intersection(one, other)
if inter:
intersection_list.append(inter)
if len(intersection_list) > 0:
return intersection_list
else:
return None
def interval_list_to_str(interval_list):
ss = []
for one in interval_list:
ss.append('%s - %s' % (one[0], one[1]))
return ', '.join(ss)
def timer_decorator(func):
def wrapper(*args, **kwargs):
start_time = time.time()
result = func(*args, **kwargs)
end_time = time.time()
msg = f"{func.__name__} time_spent {end_time - start_time:.6f} s"
logging.debug(msg)
print(msg)
return result
return wrapper
def make_common_res(status, msg):
res = {}
res['status'] = status
res['msg'] = msg
return res
def make_res(status, msg, desc):
res = {}
res['status'] = status
res['msg'] = msg
res['desc'] = desc
return res
def roadclass2str(rc):
rc_mapping = {1: '高速公路', 2: '国道', 3: '省道', 4: '县道', 5: '其他低等级道路', 6: '城市快速路', 7: '城市主干道',
8: '城市次干道', 9: '城市支路', 10: '郊区小路'}
rc = int(rc)
if rc < 1 or rc > 10:
return '未知道路等级'
else:
return rc_mapping[rc]
def comparable_rank(rc):
"""
转换为可数值比较的rc取值范围1~10,数值越小,等级越高
"""
if 1 <= rc <= 4:
return rc * 2
elif 6 <= rc <= 9:
return rc * 2 - 11
elif rc == 5:
return 9
else:
return int(rc)
def make_tp_list(tp_start_list):
num = len(tp_start_list)
tp_list = []
for i in range(0, num):
if i < num - 1:
tp_list.append([tp_start_list[i], tp_start_list[i + 1]])
else:
# if tp_start_list[i] < '23:59':
tp_list.append([tp_start_list[i], '24:00'])
return tp_list
def make_map_from_list(info_list, key_name):
res = {}
for info in info_list:
key = info[key_name]
res[key] = info
return res
def parse_int_list(num_list_str):
tmp_str_list = num_list_str.split(',')
num_list = []
for ss in tmp_str_list:
num_list.append(int(ss))
return num_list
def parse_float_list(num_list_str):
tmp_str_list = num_list_str.split(',')
num_list = []
for ss in tmp_str_list:
num_list.append(float(ss))
return num_list
def parse_phase_name(phase_name):
"""解析相位的文本描述"""
num = len(phase_name)
srcDir_part = phase_name[0: num - 2]
turn_part = phase_name[num - 2:]
return srcDir_part, turn_part
def list_to_str(item_list, spliter):
if not item_list or len(item_list) == 0:
return ''
"""将list转化为字符串"""
ss = []
for item in item_list:
ss.append(str(item))
return spliter.join(ss)
def time_str_to_seconds(time_str):
time_obj = datetime.strptime(time_str, '%H:%M')
seconds = time_obj.hour * 3600 + time_obj.minute * 60
return seconds
def time_str_to_minutes(time_str):
time_obj = datetime.strptime(time_str, '%H:%M')
minutes = time_obj.hour * 60 + time_obj.minute
return minutes
def allowed_file(filename):
return '.' in filename and filename.rsplit('.', 1)[1].lower() in {'xlsx', 'xls'}
def allowed_file_suffix(filename, suffix):
return '.' in filename and filename.rsplit('.', 1)[1].lower() in suffix
t_sheet_cols = {
'调度计划表': {
'head': ['序号', '调度类型', '月份', '日期', '星期', '日计划号'],
'column': ['priority', 'type', 'month', 'day', 'weekday', 'scheduleid'],
'rex': [r'^\d+$', r'^\d+|$', r'^([0-9]|1[0-2]|)$', r'^(?:[0-9]|[12][0-9]|3[01]|)$', r'^([0-7]|)$', r'^\d+$'],
},
'日计划表': {
'head': ['序号', '日计划号', '执行时段', '配时方案号', '控制模式'],
'column': ['eg', 'scheduleid', 'tp_start', 'planid', 'control_mode'],
'rex': [r'^\d+$', r'^\d+$', r'^[0-2]\d:[0-5]\d$', r'^\d+$', r'.'],
},
'配时方案表': {
'head': ['序号', '方案号', '方案名称', '周期长', '阶段序号', '阶段号', '阶段时间', '协调相位号', '协调相位差'],
'column': ['eg', 'planid', 'name', 'cycle', 'stage_seq', 'stageid', 'stage_duration', 'coord_phaseid',
'offset'],
'rex': [r'^\d+$', r'^\d+$', r'.', r'^\d+$', r'^\d+$', r'^\d+$', r'^\d+$', r'^(\d+|)$', r'^(\d+|)$'],
},
'阶段表': {
'head': ['序号', '阶段号', '阶段名称', '黄灯时间', '全红时间', '关联相位'],
'column': ['eg', 'stageid', 'name', 'yellow', 'allred', 'phases'],
'rex': [r'^\d+$', r'^\d+$', r'.', r'^\d+$', r'^\d+$', r'^(\d+,)*\d+$'],
},
'相位表': {
'head': ['序号', '相位号', '相位名称', '最小绿', '最大绿'],
'column': ['eg', 'phaseid', 'name', 'min_green', 'max_green'],
'rex': [r'^\d+$', r'^\d+$', r'.', r'^\d+$', r'^\d+$'],
}
}
def read_and_filter_excel(file, sheet_names, is_new=False):
xl = pd.ExcelFile(file)
sheet_cols = t_sheet_cols
if is_new:
sheet_cols = t_sheet_cols_new
if not all(sheet_name in sheet_cols for sheet_name in xl.sheet_names):
return {}, '存在不可用sheet,请检查后重新上传'
if not all(sheet_name in xl.sheet_names for sheet_name in sheet_names):
return {}, 'sheet不匹配,请检查后重新上传'
list = {}
for sheet_name in sheet_names:
df = pd.read_excel(xl, sheet_name, dtype=str, na_filter=False)
list[sheet_name] = []
if len(df.columns) < len(sheet_cols[sheet_name]['head']):
return {}, '表头不匹配,请检查后重新上传'
for index, column_name in enumerate(sheet_cols[sheet_name]['head']):
if column_name != df.columns[index]:
return {}, '表头错误,请检查后重新上传'
for index, row in df.iterrows():
line = {}
cleaned_row = row.apply(lambda x: x.strip() if isinstance(x, str) else x)
for i in range(len(sheet_cols[sheet_name]['head'])):
if not re.match(sheet_cols[sheet_name]['rex'][i], str(cleaned_row.iloc[i])):
return {}, f'{sheet_name} {df.columns[i]}列 第{index + 2}行 格式错误'
if len(str(cleaned_row.iloc[i])) > 100:
return {}, f'{sheet_name} {df.columns[i]}列 第{index + 2}行 长度超过100个字符'
line[sheet_cols[sheet_name]['column'][i]] = cleaned_row.iloc[i]
if sheet_name == "调度计划表" and '日计划名称' in df.columns:
line['schedule_name'] = cleaned_row.iloc[len(sheet_cols[sheet_name]['head'])]
list[sheet_name].append(line)
if len(list[sheet_name]) == 0:
return {}, '数据不能为空'
return list, ''
def check_request(value):
try:
v = str(value).lower()
pattern = r"\b(and|like|exec|insert|select|drop|grant|alter|delete|update|count|chr|mid|master|truncate|char|delclare|or)\b|(\*|;)"
r = re.search(pattern, v)
if r:
raise Exception("request params is wrong")
return None
except Exception as error:
return error
def get_time_stamp_list_by_duration(timestamp, duration):
batch_size = 0
if duration == 1:
batch_size = 3
elif duration == 2: # 1h
batch_size = 12
elif duration == 3: # 2h
batch_size = 24
timestamp_list = []
# 15分钟
for i in range(0, batch_size):
timestamp_list.append(timestamp - 300 * i)
return timestamp_list
# 获取可用的天
def get_days_from_week_daytype(week, day, daytype):
weeks = week.split(',')
week_day = get_day_of_week(day)
if daytype == 'today' and str(week_day) in weeks:
return [day]
if daytype == 'hisday' and str(week_day) in weeks:
return [day]
if daytype != 'hisday' and daytype != 'today':
if daytype == 'workdayofweek':
return filter_last_week_by_days(week, day, 'workday')
if daytype == 'weekendofweek':
return filter_last_week_by_days(week, day, 'weekend')
return []
def get_time_str_slice_by_dur(start, end):
result = []
while start < end:
new_time_obj = datetime.strptime(start, '%H:%M').timestamp() + 3600
new_time_obj = datetime.fromtimestamp(new_time_obj)
# 将新的时间对象格式化为字符串
new_time_str = new_time_obj.strftime('%H:%M')
if new_time_str == '00:00':
new_time_str = '24:00'
result.append("{}-{}".format(start, new_time_str))
start = new_time_str
return result
# 组合绿波协调时间段列表
def get_green_waves_tp(green_waves_info):
tps = []
for item_green_wave in green_waves_info:
if item_green_wave['launch_mode'] == 'manual' or item_green_wave['week_days'] == '':
continue
# 筛选绿波的时段
if item_green_wave['tp'] > 0:
tp_slice = get_intervals_from_tp_in(item_green_wave['tp'])
for sublist in tp_slice:
str_end = get_time_str_slice_by_dur(sublist[0], sublist[1])
if len(str_end) > 0:
tps += [item_list for item_list in str_end if item_list]
if item_green_wave['tp'] <= 0:
tp_end = item_green_wave['tp_end']
if tp_end == '00:00':
tp_end = '24:00'
str_end = get_time_str_slice_by_dur(item_green_wave['tp_start'], tp_end)
tps += [item_list for item_list in str_end if item_list]
tps_range = list(set(tps))
return tps_range
#获取绿波协调路段时间段和协调速度
def get_green_waves_tp_speed(green_waves_info):
result = defaultdict(lambda: defaultdict(list))
for item_green_wave in green_waves_info:
if item_green_wave['launch_mode'] == 'manual' or item_green_wave['week_days'] == '':
continue
# 筛选绿波的时段
if item_green_wave['tp'] > 0:
tp_slice = get_intervals_from_tp_in(item_green_wave['tp'])
for sublist in tp_slice:
str_end = '-'.join(sublist)
result[item_green_wave['waveid']][str_end] = item_green_wave
if item_green_wave['tp'] <= 0:
tp_end = item_green_wave['tp_end']
if tp_end == '00:00':
tp_end = '24:00'
result[item_green_wave['waveid']][
item_green_wave['tp_start'] + '-' + tp_end] = item_green_wave
return result
#获取配时方案和时间
def get_green_waves_road(wave_info, cross_info):
road_list = {} #正向
if wave_info['type'] == 0 or wave_info['type'] == 2:
i = 0
speeds = wave_info['speed_list'].split(',')
for item_cross in cross_info:
if item_cross['cross_seq'] <= wave_info['start_cross_seq']:
continue
i += 1
if i < wave_info['cross_num']:
speed = speeds[i - 1]
road_list[item_cross['inroadid']] = float(speed)
if wave_info['type'] == 1 or wave_info['type'] == 2:
i = 0
speeds = wave_info['speed_list_reversed'].split(',') if wave_info['speed_list_reversed'] != '' else []
for item_cross in cross_info:
if item_cross['cross_seq'] < wave_info['start_cross_seq']:
continue
i += 1
if i < wave_info['cross_num']:
speed = speeds[i - 1] if len(speeds) > 0 else 0
road_list[item_cross['inroadid_reversed']] = float(speed)
return road_list
# 取两个时间段列表的差集
def get_green_waves_tp_diff(green_tps_slice, exist_tp_slice):
result_tp_slice = []
for item_tp in exist_tp_slice:
if item_tp in green_tps_slice:
continue
result_tp_slice.append(item_tp)
return sorted(result_tp_slice)
# 取两个时间段列表的交集
def get_green_waves_tp_intersect(tp_slice_1, tp_slice_2):
result_tp_slice = []
for item_tp in tp_slice_1:
if item_tp in tp_slice_2:
result_tp_slice.append(item_tp)
return sorted(result_tp_slice)
# 根据时间段组成连续的时间段
def get_green_waves_tp_continuous(tp_slice):
green_waves_time_dur = ''
green_waves_result_final_map = {}
green_waves_result_final_slice = []
for time_dur in tp_slice:
if len(green_waves_result_final_map) == 0:
green_waves_time_dur = time_dur
green_waves_result_final_map[time_dur] = 1
continue
this_time_dur = time_dur.split('-')
prev_time_dur = green_waves_time_dur.split('-')
if this_time_dur[0] == prev_time_dur[1]:
del green_waves_result_final_map[green_waves_time_dur]
green_waves_time_dur = prev_time_dur[0] + "-" + this_time_dur[1]
green_waves_result_final_map[green_waves_time_dur] = 1
if this_time_dur[0] != prev_time_dur[1]:
green_waves_time_dur = time_dur
green_waves_result_final_map[green_waves_time_dur] = 1
for final_tp, _ in green_waves_result_final_map.items():
green_waves_result_final_slice.append(final_tp)
return green_waves_result_final_slice
def get_used_week_days(daytype, day):
weeks = '1,2,3,4,5'
if daytype == 'weekendofweek':
weeks = '6,7'
if daytype == 'hisday':
weeks = str(get_day_of_week(day))
days = get_days_from_week_daytype(weeks, day, daytype)
return days
# 根据日期和tp获取指定dur间隔时间戳dur是分钟每15分钟间隔取时间戳列表
def get_time_stamp_list_by_days_tp(days, tp, dur):
default_start_end = []
tp_list = get_intervals_from_tp_in(tp)
for item_day in days:
day_str = str(item_day)
for item_tp in tp_list:
item_start_stamp = item_tp[0]
item_end_stamp = item_tp[1]
next_day_str = day_str
start_stamp = int(
str2timestamp("{} {}:00".format(f"{day_str[:4]}-{day_str[4:6]}-{day_str[6:]}", item_start_stamp)))
if item_tp[1] == '24:00':
item_end_stamp = '00:00'
next_day_str = str(get_day_offset(item_day, 1))
end_stamp = int(
str2timestamp(
"{} {}:00".format(f"{next_day_str[:4]}-{next_day_str[4:6]}-{next_day_str[6:]}", item_end_stamp)))
while start_stamp <= end_stamp:
default_start_end.append(start_stamp)
start_stamp += dur * 60
return default_start_end
#生成每半小时一个时段
def get_half_hour_24_str():
# 获取当前时间
current_time = datetime.now()
# 将分钟数、秒数和微秒数置零,保留小时数
current_time = current_time.replace(minute=0, second=0, microsecond=0)
# 初始化一个空列表来存储结果
hourly_times = []
# 循环生成一天中每个小时的时间点
for i in range(48):
hourly_times.append(current_time.strftime('%H:%M')) # 格式化为 '小时:分钟' 的字符串
current_time += timedelta(minutes=30) # 加1小时
return hourly_times
# 生成每一个时段
def get_hour_24_str():
# 获取当前时间
current_time = datetime.now()
# 将分钟数、秒数和微秒数置零,保留小时数
current_time = current_time.replace(minute=0, second=0, microsecond=0)
# 初始化一个空列表来存储结果
hourly_times = []
# 循环生成一天中每个小时的时间点
for i in range(24):
hourly_times.append(current_time.strftime('%H:%M')) # 格式化为 '小时:分钟' 的字符串
current_time += timedelta(minutes=60) # 加1小时
return hourly_times
#获取周天交集
def get_green_waves_week_mixed(green_weeks, week_day):
week_days = []
for item_week_day in green_weeks:
for week in week_day:
if item_week_day == week:
week_days.append(week)
return week_days
#根据绿波配置组装路口
def get_green_waves_cross_list(start_seq, cross_num, artery_coss_list):
green_waves_list = []
start_index = start_seq - 1
for i in range(0, cross_num):
index = start_index + i
green_waves_list.append(artery_coss_list[index])
return green_waves_list
def tplist_2_tpinterval(tp_list):
if len(tp_list) == 0:
return []
result = [
{'tp_start': tp_list[0], 'tp_end': '23:59'}
]
for i in range(1, len(tp_list)):
if tp_list[i] == '23:59':
continue
result[-1]['tp_end'] = tp_list[i]
result.append({'tp_start': tp_list[i], 'tp_end': '23:59'})
return result
def time_intersection_minutes(time1: str, time2: str) -> int:
"""计算两个时间段的交集分钟数"""
def to_minutes(t: str) -> int:
h, m = map(int, t.split(':'))
return h * 60 + m
# 解析时间段
start1, end1 = map(to_minutes, time1.split('-'))
start2, end2 = map(to_minutes, time2.split('-'))
# 计算交集
intersection_start = max(start1, start2)
intersection_end = min(end1, end2)
# 如果没有交集返回0
if intersection_start >= intersection_end:
return 0
# 返回交集分钟数
return intersection_end - intersection_start