import time import logging from collections import defaultdict from datetime import datetime, timedelta import pandas as pd import re from tool.excel import t_sheet_cols_new def save_str2file(content, filename): f = open(filename, 'w', encoding='utf-8') f.write(content) f.close() def load_from_file(filename): f = open(filename, 'r', encoding='utf-8') content = f.read() f.close() return content def str2timestamp(time_string): """ 输入时间字符串,格式为 "YYYY-MM-DD HH:MM:SS" time_string = "2024-03-09 14:30:00" :param time_string: :return: """ # 解析这个时间字符串 date_object = datetime.strptime(time_string, "%Y-%m-%d %H:%M:%S") # 将解析后的时间对象转换为Unix时间戳(以秒为单位) timestamp = date_object.timestamp() # # 输出Unix时间戳 # print(timestamp) return timestamp def timestamp2str(timestamp): # 将Unix时间戳转换为struct_time对象(本地时间) local_time_tuple = time.localtime(timestamp) # 格式化为本地时间字符串 local_time_str = time.strftime('%Y-%m-%d %H:%M:%S', local_time_tuple) return local_time_str def timestamp2HMS(timestamp): # 将Unix时间戳转换为struct_time对象(本地时间) local_time_tuple = time.localtime(timestamp) # 格式化为本地时间字符串 local_time_str = time.strftime('%H:%M:%S', local_time_tuple) return local_time_str def timestamp2int(timestamp): return int(time.strftime('%Y%m%d', time.localtime(timestamp))) def daynum_to_timestamp(day): """ 将日期数字转化内Unix时间戳 :param day: yyyymmdd 格式的数字或字符串 :return: """ # 解析这个时间字符串 date_object = datetime.strptime(str(day), "%Y%m%d") # 将解析后的时间对象转换为Unix时间戳(以秒为单位) timestamp = date_object.timestamp() return timestamp def get_day_offset(day, offset): """ 计算前几天或后几天的日期数字 :param day: yyyymmdd 格式的数字或字符串 :param offset: 天的偏移个数,正数表示后面的日期,负数表示前面的日期 :return: """ timestamp = daynum_to_timestamp(day) other_timestamp = timestamp + 86400 * offset # 将Unix时间戳转换为struct_time对象(本地时间) other_time_tuple = time.localtime(other_timestamp) # 格式化为本地时间字符串 other_day = time.strftime('%Y%m%d', other_time_tuple) return int(other_day) def calc_day_diff(day0, day1): timestamp0 = daynum_to_timestamp(day0) timestamp1 = daynum_to_timestamp(day1) return int((timestamp1 - timestamp0) / 86400) def calc_day_diff_YMD(day1, day2): timestamp1 = datetime.strptime(str(day1), "%Y-%m-%d").timestamp() timestamp2 = datetime.strptime(str(day2), "%Y-%m-%d").timestamp() return int((timestamp2 - timestamp1) / 86400) def get_day_of_week(day): """ 计算指定日期是星期几,返回1~7,表示星期一到星期日 :param day: :return: """ timestamp = daynum_to_timestamp(day) # 将Unix时间戳转换为struct_time对象(本地时间) other_time_tuple = time.localtime(timestamp) return other_time_tuple.tm_wday + 1 def test_day_offset(): print(get_day_offset(20240325, -7)) print(get_day_offset(20240325, 7)) print(get_day_offset(20240401, -7)) print(get_day_offset(20240408, -7)) def weekday2str(day_of_week): if day_of_week == 1: return '一' elif day_of_week == 2: return '二' elif day_of_week == 3: return '三' elif day_of_week == 4: return '四' elif day_of_week == 5: return '五' elif day_of_week == 6: return '六' elif day_of_week == 7: return '日' else: return 'X' def get_today_str(): # 获取当前时间戳 current_timestamp = int(time.time()) # 将Unix时间戳转换为struct_time对象(本地时间) local_time_tuple = time.localtime(current_timestamp) # 格式化为本地时间字符串 local_date_str = time.strftime('%Y%m%d', local_time_tuple) return local_date_str def get_today_YMD(): # 将Unix时间戳转换为struct_time对象(本地时间) local_time_tuple = time.localtime(int(time.time())) # 格式化为本地时间字符串 local_date_str = time.strftime('%Y-%m-%d', local_time_tuple) return local_date_str def get_hhmm(): # 获取当前时刻的小时分钟字符串 now = datetime.now() # 转换为字符串格式 hhmm_str = now.strftime("%H:%M") return hhmm_str def get_weekday(): # 获取当前日期 today = datetime.today().date() # 获取星期几的整数表示(0=星期一, 1=星期二, ..., 6=星期日) weekday_number = today.weekday() # 转化为1~7 weekday = weekday_number + 1 return weekday def filter_last_week_days(filter_week_days_str): filter_day_set = set() ss = filter_week_days_str.split(',') for item in ss: filter_day_set.add(int(item)) today_num = int(get_today_str()) valid_days = [] for offset in range(-1, -8, -1): day = get_day_offset(today_num, offset) week_day = get_day_of_week(day) if week_day in filter_day_set: valid_days.append(day) return valid_days def filter_last_week_by_days(filter_week_days_str, day, type): filter_day_set = set() ss = filter_week_days_str.split(',') for item in ss: filter_day_set.add(int(item)) valid_days = [] for offset in range(0, -7, -1): last_day = get_day_offset(day, offset) week_day = get_day_of_week(last_day) if week_day in filter_day_set: if type == 'workday' and int(week_day) <= 5: valid_days.append(last_day) if type == 'weekend' and int(week_day) > 5: valid_days.append(last_day) return valid_days def srcDir_toStr(srcDir): """ 将标准八方向转化为汉字描述 :param srcDir: :return: """ if srcDir == 'N': return '北' elif srcDir == 'S': return '南' elif srcDir == 'E': return '东' elif srcDir == 'W': return '西' elif srcDir == 'NE': return '东北' elif srcDir == 'NW': return '西北' elif srcDir == 'SE': return '东南' elif srcDir == 'SW': return '西南' else: return '未知' def dirname_to_srcDir(dirname: str): """ 将汉字描述转化为标准八方向 :param dirname: :return: """ if dirname == '北': return 'N' elif dirname == '南': return 'S' elif dirname == '东': return 'E' elif dirname == '西': return 'W' elif dirname == '东北': return 'NE' elif dirname == '西北': return 'NW' elif dirname == '东南': return 'SE' elif dirname == '西南': return 'SW' else: return None g_valid_srcDir_set = {'E', 'N', 'NE', 'NW', 'S', 'SE', 'SW', 'W'} # 相反的车流来向pair g_reversed_srcDir_couples = {'E-W', 'N-S', 'NE-SW', 'NW-SE'} g_reversed_srcDir_mapping = {'E': 'W', 'W': 'E', 'N': 'S', 'S': 'N', 'NE': 'SW', 'SW': 'NE', 'NW': 'SE', 'SE': 'NW'} def is_crossed_srcDir(src0, src1): """ 判断两个方向是否为相交方向,也即:排除掉:相同和相反的情形 :param src0: :param src1: :return: """ if src0 == src1: return False global g_reversed_srcDir_couples tmp_list = [src0, src1] tmp_list = sorted(tmp_list) key = tmp_list[0] + '-' + tmp_list[1] if key in g_reversed_srcDir_couples: return False else: return True def is_reversed_srcDir(src0, src1): """ 判断两个方向是否为相反方向 :param src0: :param src1: :return: """ if src0 == src1: return False global g_reversed_srcDir_couples tmp_list = [src0, src1] tmp_list = sorted(tmp_list) key = tmp_list[0] + '-' + tmp_list[1] if key in g_reversed_srcDir_couples: return True else: return False def test_srcDir_judge(): global g_valid_srcDir_set for src0 in g_valid_srcDir_set: for src1 in g_valid_srcDir_set: retA = is_crossed_srcDir(src0, src1) retB = is_reversed_srcDir(src0, src1) print("%s %s == crossed:%s, reversed:%s" % (src0, src1, retA, retB)) def srcDir_to_flowStr(srcDir): if srcDir == 'N': return '北向南' elif srcDir == 'S': return '南向北' elif srcDir == 'E': return '东向西' elif srcDir == 'W': return '西向东' elif srcDir == 'NE': return '东北向西南' elif srcDir == 'NW': return '西北向东南' elif srcDir == 'SE': return '东南向西北' elif srcDir == 'SW': return '西南向东北' else: return '未知' def srcDir_toCarDirectionStr(srcDir): """ 将单个车流来向,转化为平行双向车流的方向描述 :param srcDir: :return: """ if srcDir == 'N': return '南-北' elif srcDir == 'S': return '南-北' elif srcDir == 'E': return '东-西' elif srcDir == 'W': return '东-西' elif srcDir == 'NE': return '东北-西南' elif srcDir == 'NW': return '西北-东南' elif srcDir == 'SE': return '西北-东南' elif srcDir == 'SW': return '东北-西南' else: return '未知' def turn_type_toStr(turn_type): if turn_type == 0: return '直行' elif turn_type == 1: return '左转' elif turn_type == 2: return '右转' elif turn_type == 3: return '掉头' else: return '未知转向' def get_turn_type_toStr(turn_type): if turn_type == 'l': return '左转' elif turn_type == 's': return '直行' elif turn_type == 'r': return '右转' else: return '未知转向' def direction_toStr(direction): ss = direction.split('|') srcDir = ss[0] turn_type = int(ss[1]) return "%s进口道-%s" % (srcDir_toStr(srcDir), turn_type_toStr(turn_type)) def get_default_ffs(rc): if rc == 1: return 120 elif rc == 2: return 80 elif rc == 3: return 70 elif rc == 4: return 60 elif rc == 5: return 50 elif rc == 6: return 90 elif rc == 7: return 70 elif rc == 8: return 60 elif rc == 9: return 50 else: return 30 def cal_lane_capacity(v): if v >= 120: return 2000 elif v >= 100: return 1900 elif v >= 80: return 1800 elif v >= 60: return 1730 elif v >= 50: return 1690 elif v >= 40: return 1640 elif v >= 30: return 1550 else: return 1380 def cal_lane_rate(lane_num): if lane_num == 1: return 1.0 elif lane_num == 2: return 0.9 elif lane_num == 3: return 0.8 elif lane_num == 4: return 0.7 else: return 0.6 def cal_default_capacity(rc, lane_num, ffs): lane_capacity = cal_lane_capacity(ffs) lane_rate = cal_lane_rate(lane_num) return lane_capacity * lane_num * lane_rate def calc_service_level(delay_time): """从2024-07-31改为国内标准""" if delay_time <= 10: return 'A' elif delay_time <= 20: return 'B' elif delay_time <= 35: return 'C' elif delay_time <= 55: return 'D' elif delay_time <= 80: return 'E' else: return 'F' def calc_service_level_artery(delay_time, i): if delay_time <= 5 * i: return 'A' elif delay_time <= 15 * i: return 'B' elif delay_time <= 25 * i: return 'C' elif delay_time <= 40 * i: return 'D' elif delay_time <= 60 * i: return 'E' else: return 'F' def jam_service_level_artery(jam_index): if jam_index < 1.5: return 'A' elif jam_index < 2: return 'D' elif jam_index < 4: return 'E' else: return 'F' def tp_to_str(tp): if tp == 100: return '早高峰' elif tp == 200: return '晚高峰' elif tp == 300: return '早晚高峰' elif tp == 400: return '平峰' elif tp == 500: return '凌晨' elif tp == 600: return '夜间' elif tp == 1000: return '全天' else: return '未知' def get_intervals_from_tp(tp): """ 获取特殊时段对应的时间区间列表 :param tp: 特殊时段编号 :return: [ [start, end] [start, end] ... ] """ res = [] if tp == 100: res.append(['07:00', '09:00']) elif tp == 200: res.append(['17:00', '19:00']) elif tp == 300: res.append(['07:00', '09:00']) res.append(['17:00', '19:00']) elif tp == 400: res.append(['06:00', '07:00']) res.append(['09:00', '17:00']) res.append(['19:00', '22:00']) elif tp == 500: res.append(['00:00', '06:00']) elif tp == 600: res.append(['22:00', '24:00']) elif tp == 1000: res.append(['00:00', '24:00']) else: pass return res def get_intervals_from_tp_in(tp): """ 获取特殊时段对应的时间区间列表 :param tp: 特殊时段编号 :return: [ [start, end] [start, end] ... ] """ res = [] if tp == 100: res.append(['07:00', '09:00']) elif tp == 200: res.append(['17:00', '19:00']) elif tp == 300: res.append(['07:00', '09:00']) res.append(['17:00', '19:00']) elif tp == 400: res.append(['06:00', '07:00']) res.append(['09:00', '17:00']) res.append(['19:00', '22:00']) elif tp == 500: res.append(['00:00', '06:00']) elif tp == 600: res.append(['22:00', '24:00']) elif tp == 1000: res.append(['00:00', '24:00']) else: pass return res def get_intervals_from_tp_slice(tp): """ 获取特殊时段对应的时间区间列表 :param tp: 特殊时段编号 :return: [ [start, end] [start, end] ... ] """ res = [] if tp == 100: res.append(['07:00', '09:00']) elif tp == 200: res.append(['17:00', '19:00']) elif tp == 300: res.append(['07:00', '09:00']) res.append(['17:00', '19:00']) elif tp == 400: res.append(['06:00', '07:00']) res.append(['09:00', '17:00']) res.append(['19:00', '22:00']) elif tp == 500: res.append(['00:00', '06:00']) elif tp == 600: res.append(['22:00', '24:00']) elif tp == 1000: res.append(['00:00', '24:00']) else: pass return res def has_intersection(one, other): return other[0] < one[1] and other[1] > one[0] def has_intersection_with_list(one, other_interval_list): for other in other_interval_list: if has_intersection(one, other): return True return False def fall_in_intervals(hm, interval_list): for one in interval_list: if one[0] < hm <= one[1]: return True return False def if_now_fit_weektp(week_days, tp, tp_start, tp_end): """ 判断当前时刻是否落入指定星期几集合的特定时段中 :param week_days: 星期几集合,格式:'1,2,3,4,5' :param tp: 0表示自定义,其他预定义: 100/200/300/400/500/600/1000 :param tp_start: tp==0情况下的时段开始时刻 :param tp_end: tp==0情况下的时段结束时刻 :return: bool """ ss = week_days.split(',') weekday_set = set() for item in ss: weekday_set.add(int(item)) today_weekday = get_weekday() if today_weekday not in weekday_set: return False # 计算时段对应的区间列表 now_hm = get_hhmm() interval_list = [] if tp == 0: interval_list = [[tp_start, tp_end]] else: interval_list = get_intervals_from_tp(tp) # 判断当前时刻 是否落入的区间列表中 return fall_in_intervals(now_hm, interval_list) def calc_intersection(one, other): """ 计算两个区间的交集区间,无交集时返回 None :param one: :param other: :return: """ if has_intersection(one, other): return [max(one[0], other[0]), min(one[1], other[1])] else: return None def calc_intersection_of_two_intervals(one_list, other_list): """ 计算两个区间列表的交集。如果没有交集,则返回 None :param one_list: :param other_list: :return: intersection_list """ intersection_list = [] for one in one_list: for other in other_list: inter = calc_intersection(one, other) if inter: intersection_list.append(inter) if len(intersection_list) > 0: return intersection_list else: return None def interval_list_to_str(interval_list): ss = [] for one in interval_list: ss.append('%s - %s' % (one[0], one[1])) return ', '.join(ss) def timer_decorator(func): def wrapper(*args, **kwargs): start_time = time.time() result = func(*args, **kwargs) end_time = time.time() msg = f"{func.__name__} time_spent {end_time - start_time:.6f} s" logging.debug(msg) print(msg) return result return wrapper def make_common_res(status, msg): res = {} res['status'] = status res['msg'] = msg return res def make_res(status, msg, desc): res = {} res['status'] = status res['msg'] = msg res['desc'] = desc return res def roadclass2str(rc): rc_mapping = {1: '高速公路', 2: '国道', 3: '省道', 4: '县道', 5: '其他低等级道路', 6: '城市快速路', 7: '城市主干道', 8: '城市次干道', 9: '城市支路', 10: '郊区小路'} rc = int(rc) if rc < 1 or rc > 10: return '未知道路等级' else: return rc_mapping[rc] def comparable_rank(rc): """ 转换为可数值比较的rc,取值范围1~10,数值越小,等级越高 """ if 1 <= rc <= 4: return rc * 2 elif 6 <= rc <= 9: return rc * 2 - 11 elif rc == 5: return 9 else: return int(rc) def make_tp_list(tp_start_list): num = len(tp_start_list) tp_list = [] for i in range(0, num): if i < num - 1: tp_list.append([tp_start_list[i], tp_start_list[i + 1]]) else: # if tp_start_list[i] < '23:59': tp_list.append([tp_start_list[i], '24:00']) return tp_list def make_map_from_list(info_list, key_name): res = {} for info in info_list: key = info[key_name] res[key] = info return res def parse_int_list(num_list_str): tmp_str_list = num_list_str.split(',') num_list = [] for ss in tmp_str_list: num_list.append(int(ss)) return num_list def parse_float_list(num_list_str): tmp_str_list = num_list_str.split(',') num_list = [] for ss in tmp_str_list: num_list.append(float(ss)) return num_list def parse_phase_name(phase_name): """解析相位的文本描述""" num = len(phase_name) srcDir_part = phase_name[0: num - 2] turn_part = phase_name[num - 2:] return srcDir_part, turn_part def list_to_str(item_list, spliter): if not item_list or len(item_list) == 0: return '' """将list转化为字符串""" ss = [] for item in item_list: ss.append(str(item)) return spliter.join(ss) def time_str_to_seconds(time_str): time_obj = datetime.strptime(time_str, '%H:%M') seconds = time_obj.hour * 3600 + time_obj.minute * 60 return seconds def time_str_to_minutes(time_str): time_obj = datetime.strptime(time_str, '%H:%M') minutes = time_obj.hour * 60 + time_obj.minute return minutes def allowed_file(filename): return '.' in filename and filename.rsplit('.', 1)[1].lower() in {'xlsx', 'xls'} def allowed_file_suffix(filename, suffix): return '.' in filename and filename.rsplit('.', 1)[1].lower() in suffix t_sheet_cols = { '调度计划表': { 'head': ['序号', '调度类型', '月份', '日期', '星期', '日计划号'], 'column': ['priority', 'type', 'month', 'day', 'weekday', 'scheduleid'], 'rex': [r'^\d+$', r'^\d+|$', r'^([0-9]|1[0-2]|)$', r'^(?:[0-9]|[12][0-9]|3[01]|)$', r'^([0-7]|)$', r'^\d+$'], }, '日计划表': { 'head': ['序号', '日计划号', '执行时段', '配时方案号', '控制模式'], 'column': ['eg', 'scheduleid', 'tp_start', 'planid', 'control_mode'], 'rex': [r'^\d+$', r'^\d+$', r'^[0-2]\d:[0-5]\d$', r'^\d+$', r'.'], }, '配时方案表': { 'head': ['序号', '方案号', '方案名称', '周期长', '阶段序号', '阶段号', '阶段时间', '协调相位号', '协调相位差'], 'column': ['eg', 'planid', 'name', 'cycle', 'stage_seq', 'stageid', 'stage_duration', 'coord_phaseid', 'offset'], 'rex': [r'^\d+$', r'^\d+$', r'.', r'^\d+$', r'^\d+$', r'^\d+$', r'^\d+$', r'^(\d+|)$', r'^(\d+|)$'], }, '阶段表': { 'head': ['序号', '阶段号', '阶段名称', '黄灯时间', '全红时间', '关联相位'], 'column': ['eg', 'stageid', 'name', 'yellow', 'allred', 'phases'], 'rex': [r'^\d+$', r'^\d+$', r'.', r'^\d+$', r'^\d+$', r'^(\d+,)*\d+$'], }, '相位表': { 'head': ['序号', '相位号', '相位名称', '最小绿', '最大绿'], 'column': ['eg', 'phaseid', 'name', 'min_green', 'max_green'], 'rex': [r'^\d+$', r'^\d+$', r'.', r'^\d+$', r'^\d+$'], } } def read_and_filter_excel(file, sheet_names, is_new=False): xl = pd.ExcelFile(file) sheet_cols = t_sheet_cols if is_new: sheet_cols = t_sheet_cols_new if not all(sheet_name in sheet_cols for sheet_name in xl.sheet_names): return {}, '存在不可用sheet,请检查后重新上传' if not all(sheet_name in xl.sheet_names for sheet_name in sheet_names): return {}, 'sheet不匹配,请检查后重新上传' list = {} for sheet_name in sheet_names: df = pd.read_excel(xl, sheet_name, dtype=str, na_filter=False) list[sheet_name] = [] if len(df.columns) < len(sheet_cols[sheet_name]['head']): return {}, '表头不匹配,请检查后重新上传' for index, column_name in enumerate(sheet_cols[sheet_name]['head']): if column_name != df.columns[index]: return {}, '表头错误,请检查后重新上传' for index, row in df.iterrows(): line = {} cleaned_row = row.apply(lambda x: x.strip() if isinstance(x, str) else x) for i in range(len(sheet_cols[sheet_name]['head'])): if not re.match(sheet_cols[sheet_name]['rex'][i], str(cleaned_row.iloc[i])): return {}, f'{sheet_name} {df.columns[i]}列 第{index + 2}行 格式错误' if len(str(cleaned_row.iloc[i])) > 100: return {}, f'{sheet_name} {df.columns[i]}列 第{index + 2}行 长度超过100个字符' line[sheet_cols[sheet_name]['column'][i]] = cleaned_row.iloc[i] if sheet_name == "调度计划表" and '日计划名称' in df.columns: line['schedule_name'] = cleaned_row.iloc[len(sheet_cols[sheet_name]['head'])] list[sheet_name].append(line) if len(list[sheet_name]) == 0: return {}, '数据不能为空' return list, '' def check_request(value): try: v = str(value).lower() pattern = r"\b(and|like|exec|insert|select|drop|grant|alter|delete|update|count|chr|mid|master|truncate|char|delclare|or)\b|(\*|;)" r = re.search(pattern, v) if r: raise Exception("request params is wrong") return None except Exception as error: return error def get_time_stamp_list_by_duration(timestamp, duration): batch_size = 0 if duration == 1: batch_size = 3 elif duration == 2: # 1h batch_size = 12 elif duration == 3: # 2h batch_size = 24 timestamp_list = [] # 15分钟 for i in range(0, batch_size): timestamp_list.append(timestamp - 300 * i) return timestamp_list # 获取可用的天 def get_days_from_week_daytype(week, day, daytype): weeks = week.split(',') week_day = get_day_of_week(day) if daytype == 'today' and str(week_day) in weeks: return [day] if daytype == 'hisday' and str(week_day) in weeks: return [day] if daytype != 'hisday' and daytype != 'today': if daytype == 'workdayofweek': return filter_last_week_by_days(week, day, 'workday') if daytype == 'weekendofweek': return filter_last_week_by_days(week, day, 'weekend') return [] def get_time_str_slice_by_dur(start, end): result = [] while start < end: new_time_obj = datetime.strptime(start, '%H:%M').timestamp() + 3600 new_time_obj = datetime.fromtimestamp(new_time_obj) # 将新的时间对象格式化为字符串 new_time_str = new_time_obj.strftime('%H:%M') if new_time_str == '00:00': new_time_str = '24:00' result.append("{}-{}".format(start, new_time_str)) start = new_time_str return result # 组合绿波协调时间段列表 def get_green_waves_tp(green_waves_info): tps = [] for item_green_wave in green_waves_info: if item_green_wave['launch_mode'] == 'manual' or item_green_wave['week_days'] == '': continue # 筛选绿波的时段 if item_green_wave['tp'] > 0: tp_slice = get_intervals_from_tp_in(item_green_wave['tp']) for sublist in tp_slice: str_end = get_time_str_slice_by_dur(sublist[0], sublist[1]) if len(str_end) > 0: tps += [item_list for item_list in str_end if item_list] if item_green_wave['tp'] <= 0: tp_end = item_green_wave['tp_end'] if tp_end == '00:00': tp_end = '24:00' str_end = get_time_str_slice_by_dur(item_green_wave['tp_start'], tp_end) tps += [item_list for item_list in str_end if item_list] tps_range = list(set(tps)) return tps_range #获取绿波协调路段时间段和协调速度 def get_green_waves_tp_speed(green_waves_info): result = defaultdict(lambda: defaultdict(list)) for item_green_wave in green_waves_info: if item_green_wave['launch_mode'] == 'manual' or item_green_wave['week_days'] == '': continue # 筛选绿波的时段 if item_green_wave['tp'] > 0: tp_slice = get_intervals_from_tp_in(item_green_wave['tp']) for sublist in tp_slice: str_end = '-'.join(sublist) result[item_green_wave['waveid']][str_end] = item_green_wave if item_green_wave['tp'] <= 0: tp_end = item_green_wave['tp_end'] if tp_end == '00:00': tp_end = '24:00' result[item_green_wave['waveid']][ item_green_wave['tp_start'] + '-' + tp_end] = item_green_wave return result #获取配时方案和时间 def get_green_waves_road(wave_info, cross_info): road_list = {} #正向 if wave_info['type'] == 0 or wave_info['type'] == 2: i = 0 speeds = wave_info['speed_list'].split(',') for item_cross in cross_info: if item_cross['cross_seq'] <= wave_info['start_cross_seq']: continue i += 1 if i < wave_info['cross_num']: speed = speeds[i - 1] road_list[item_cross['inroadid']] = float(speed) if wave_info['type'] == 1 or wave_info['type'] == 2: i = 0 speeds = wave_info['speed_list_reversed'].split(',') if wave_info['speed_list_reversed'] != '' else [] for item_cross in cross_info: if item_cross['cross_seq'] < wave_info['start_cross_seq']: continue i += 1 if i < wave_info['cross_num']: speed = speeds[i - 1] if len(speeds) > 0 else 0 road_list[item_cross['inroadid_reversed']] = float(speed) return road_list # 取两个时间段列表的差集 def get_green_waves_tp_diff(green_tps_slice, exist_tp_slice): result_tp_slice = [] for item_tp in exist_tp_slice: if item_tp in green_tps_slice: continue result_tp_slice.append(item_tp) return sorted(result_tp_slice) # 取两个时间段列表的交集 def get_green_waves_tp_intersect(tp_slice_1, tp_slice_2): result_tp_slice = [] for item_tp in tp_slice_1: if item_tp in tp_slice_2: result_tp_slice.append(item_tp) return sorted(result_tp_slice) # 根据时间段组成连续的时间段 def get_green_waves_tp_continuous(tp_slice): green_waves_time_dur = '' green_waves_result_final_map = {} green_waves_result_final_slice = [] for time_dur in tp_slice: if len(green_waves_result_final_map) == 0: green_waves_time_dur = time_dur green_waves_result_final_map[time_dur] = 1 continue this_time_dur = time_dur.split('-') prev_time_dur = green_waves_time_dur.split('-') if this_time_dur[0] == prev_time_dur[1]: del green_waves_result_final_map[green_waves_time_dur] green_waves_time_dur = prev_time_dur[0] + "-" + this_time_dur[1] green_waves_result_final_map[green_waves_time_dur] = 1 if this_time_dur[0] != prev_time_dur[1]: green_waves_time_dur = time_dur green_waves_result_final_map[green_waves_time_dur] = 1 for final_tp, _ in green_waves_result_final_map.items(): green_waves_result_final_slice.append(final_tp) return green_waves_result_final_slice def get_used_week_days(daytype, day): weeks = '1,2,3,4,5' if daytype == 'weekendofweek': weeks = '6,7' if daytype == 'hisday': weeks = str(get_day_of_week(day)) days = get_days_from_week_daytype(weeks, day, daytype) return days # 根据日期和tp获取指定dur间隔时间戳:dur是分钟每15分钟间隔取时间戳列表 def get_time_stamp_list_by_days_tp(days, tp, dur): default_start_end = [] tp_list = get_intervals_from_tp_in(tp) for item_day in days: day_str = str(item_day) for item_tp in tp_list: item_start_stamp = item_tp[0] item_end_stamp = item_tp[1] next_day_str = day_str start_stamp = int( str2timestamp("{} {}:00".format(f"{day_str[:4]}-{day_str[4:6]}-{day_str[6:]}", item_start_stamp))) if item_tp[1] == '24:00': item_end_stamp = '00:00' next_day_str = str(get_day_offset(item_day, 1)) end_stamp = int( str2timestamp( "{} {}:00".format(f"{next_day_str[:4]}-{next_day_str[4:6]}-{next_day_str[6:]}", item_end_stamp))) while start_stamp <= end_stamp: default_start_end.append(start_stamp) start_stamp += dur * 60 return default_start_end #生成每半小时一个时段 def get_half_hour_24_str(): # 获取当前时间 current_time = datetime.now() # 将分钟数、秒数和微秒数置零,保留小时数 current_time = current_time.replace(minute=0, second=0, microsecond=0) # 初始化一个空列表来存储结果 hourly_times = [] # 循环生成一天中每个小时的时间点 for i in range(48): hourly_times.append(current_time.strftime('%H:%M')) # 格式化为 '小时:分钟' 的字符串 current_time += timedelta(minutes=30) # 加1小时 return hourly_times # 生成每一个时段 def get_hour_24_str(): # 获取当前时间 current_time = datetime.now() # 将分钟数、秒数和微秒数置零,保留小时数 current_time = current_time.replace(minute=0, second=0, microsecond=0) # 初始化一个空列表来存储结果 hourly_times = [] # 循环生成一天中每个小时的时间点 for i in range(24): hourly_times.append(current_time.strftime('%H:%M')) # 格式化为 '小时:分钟' 的字符串 current_time += timedelta(minutes=60) # 加1小时 return hourly_times #获取周天交集 def get_green_waves_week_mixed(green_weeks, week_day): week_days = [] for item_week_day in green_weeks: for week in week_day: if item_week_day == week: week_days.append(week) return week_days #根据绿波配置组装路口 def get_green_waves_cross_list(start_seq, cross_num, artery_coss_list): green_waves_list = [] start_index = start_seq - 1 for i in range(0, cross_num): index = start_index + i green_waves_list.append(artery_coss_list[index]) return green_waves_list def tplist_2_tpinterval(tp_list): if len(tp_list) == 0: return [] result = [ {'tp_start': tp_list[0], 'tp_end': '23:59'} ] for i in range(1, len(tp_list)): if tp_list[i] == '23:59': continue result[-1]['tp_end'] = tp_list[i] result.append({'tp_start': tp_list[i], 'tp_end': '23:59'}) return result def time_intersection_minutes(time1: str, time2: str) -> int: """计算两个时间段的交集分钟数""" def to_minutes(t: str) -> int: h, m = map(int, t.split(':')) return h * 60 + m # 解析时间段 start1, end1 = map(to_minutes, time1.split('-')) start2, end2 = map(to_minutes, time2.split('-')) # 计算交集 intersection_start = max(start1, start2) intersection_end = min(end1, end2) # 如果没有交集,返回0 if intersection_start >= intersection_end: return 0 # 返回交集分钟数 return intersection_end - intersection_start