1196 lines
34 KiB
Python
1196 lines
34 KiB
Python
import time
|
||
import logging
|
||
from collections import defaultdict
|
||
from datetime import datetime, timedelta
|
||
import pandas as pd
|
||
import re
|
||
|
||
from tool.excel import t_sheet_cols_new
|
||
|
||
|
||
def save_str2file(content, filename):
|
||
f = open(filename, 'w', encoding='utf-8')
|
||
f.write(content)
|
||
f.close()
|
||
|
||
|
||
def load_from_file(filename):
|
||
f = open(filename, 'r', encoding='utf-8')
|
||
content = f.read()
|
||
f.close()
|
||
return content
|
||
|
||
|
||
def str2timestamp(time_string):
|
||
"""
|
||
输入时间字符串,格式为 "YYYY-MM-DD HH:MM:SS"
|
||
time_string = "2024-03-09 14:30:00"
|
||
:param time_string:
|
||
:return:
|
||
"""
|
||
# 解析这个时间字符串
|
||
date_object = datetime.strptime(time_string, "%Y-%m-%d %H:%M:%S")
|
||
# 将解析后的时间对象转换为Unix时间戳(以秒为单位)
|
||
timestamp = date_object.timestamp()
|
||
# # 输出Unix时间戳
|
||
# print(timestamp)
|
||
return timestamp
|
||
|
||
|
||
def timestamp2str(timestamp):
|
||
# 将Unix时间戳转换为struct_time对象(本地时间)
|
||
local_time_tuple = time.localtime(timestamp)
|
||
# 格式化为本地时间字符串
|
||
local_time_str = time.strftime('%Y-%m-%d %H:%M:%S', local_time_tuple)
|
||
return local_time_str
|
||
|
||
|
||
def timestamp2HMS(timestamp):
|
||
# 将Unix时间戳转换为struct_time对象(本地时间)
|
||
local_time_tuple = time.localtime(timestamp)
|
||
# 格式化为本地时间字符串
|
||
local_time_str = time.strftime('%H:%M:%S', local_time_tuple)
|
||
return local_time_str
|
||
|
||
|
||
def timestamp2int(timestamp):
|
||
return int(time.strftime('%Y%m%d', time.localtime(timestamp)))
|
||
|
||
|
||
def daynum_to_timestamp(day):
|
||
"""
|
||
将日期数字转化内Unix时间戳
|
||
:param day: yyyymmdd 格式的数字或字符串
|
||
:return:
|
||
"""
|
||
# 解析这个时间字符串
|
||
date_object = datetime.strptime(str(day), "%Y%m%d")
|
||
# 将解析后的时间对象转换为Unix时间戳(以秒为单位)
|
||
timestamp = date_object.timestamp()
|
||
return timestamp
|
||
|
||
|
||
def get_day_offset(day, offset):
|
||
"""
|
||
计算前几天或后几天的日期数字
|
||
:param day: yyyymmdd 格式的数字或字符串
|
||
:param offset: 天的偏移个数,正数表示后面的日期,负数表示前面的日期
|
||
:return:
|
||
"""
|
||
timestamp = daynum_to_timestamp(day)
|
||
other_timestamp = timestamp + 86400 * offset
|
||
# 将Unix时间戳转换为struct_time对象(本地时间)
|
||
other_time_tuple = time.localtime(other_timestamp)
|
||
# 格式化为本地时间字符串
|
||
other_day = time.strftime('%Y%m%d', other_time_tuple)
|
||
return int(other_day)
|
||
|
||
|
||
def calc_day_diff(day0, day1):
|
||
timestamp0 = daynum_to_timestamp(day0)
|
||
timestamp1 = daynum_to_timestamp(day1)
|
||
return int((timestamp1 - timestamp0) / 86400)
|
||
|
||
|
||
def calc_day_diff_YMD(day1, day2):
|
||
timestamp1 = datetime.strptime(str(day1), "%Y-%m-%d").timestamp()
|
||
timestamp2 = datetime.strptime(str(day2), "%Y-%m-%d").timestamp()
|
||
return int((timestamp2 - timestamp1) / 86400)
|
||
|
||
|
||
def get_day_of_week(day):
|
||
"""
|
||
计算指定日期是星期几,返回1~7,表示星期一到星期日
|
||
:param day:
|
||
:return:
|
||
"""
|
||
timestamp = daynum_to_timestamp(day)
|
||
# 将Unix时间戳转换为struct_time对象(本地时间)
|
||
other_time_tuple = time.localtime(timestamp)
|
||
return other_time_tuple.tm_wday + 1
|
||
|
||
|
||
def test_day_offset():
|
||
print(get_day_offset(20240325, -7))
|
||
print(get_day_offset(20240325, 7))
|
||
print(get_day_offset(20240401, -7))
|
||
print(get_day_offset(20240408, -7))
|
||
|
||
|
||
def weekday2str(day_of_week):
|
||
if day_of_week == 1:
|
||
return '一'
|
||
elif day_of_week == 2:
|
||
return '二'
|
||
elif day_of_week == 3:
|
||
return '三'
|
||
elif day_of_week == 4:
|
||
return '四'
|
||
elif day_of_week == 5:
|
||
return '五'
|
||
elif day_of_week == 6:
|
||
return '六'
|
||
elif day_of_week == 7:
|
||
return '日'
|
||
else:
|
||
return 'X'
|
||
|
||
|
||
def get_today_str():
|
||
# 获取当前时间戳
|
||
current_timestamp = int(time.time())
|
||
# 将Unix时间戳转换为struct_time对象(本地时间)
|
||
local_time_tuple = time.localtime(current_timestamp)
|
||
# 格式化为本地时间字符串
|
||
local_date_str = time.strftime('%Y%m%d', local_time_tuple)
|
||
return local_date_str
|
||
|
||
|
||
def get_today_YMD():
|
||
# 将Unix时间戳转换为struct_time对象(本地时间)
|
||
local_time_tuple = time.localtime(int(time.time()))
|
||
# 格式化为本地时间字符串
|
||
local_date_str = time.strftime('%Y-%m-%d', local_time_tuple)
|
||
return local_date_str
|
||
|
||
|
||
def get_hhmm():
|
||
# 获取当前时刻的小时分钟字符串
|
||
now = datetime.now()
|
||
# 转换为字符串格式
|
||
hhmm_str = now.strftime("%H:%M")
|
||
return hhmm_str
|
||
|
||
|
||
def get_weekday():
|
||
# 获取当前日期
|
||
today = datetime.today().date()
|
||
# 获取星期几的整数表示(0=星期一, 1=星期二, ..., 6=星期日)
|
||
weekday_number = today.weekday()
|
||
# 转化为1~7
|
||
weekday = weekday_number + 1
|
||
return weekday
|
||
|
||
|
||
def filter_last_week_days(filter_week_days_str):
|
||
filter_day_set = set()
|
||
ss = filter_week_days_str.split(',')
|
||
for item in ss:
|
||
filter_day_set.add(int(item))
|
||
today_num = int(get_today_str())
|
||
valid_days = []
|
||
for offset in range(-1, -8, -1):
|
||
day = get_day_offset(today_num, offset)
|
||
week_day = get_day_of_week(day)
|
||
if week_day in filter_day_set:
|
||
valid_days.append(day)
|
||
return valid_days
|
||
|
||
|
||
def filter_last_week_by_days(filter_week_days_str, day, type):
|
||
filter_day_set = set()
|
||
ss = filter_week_days_str.split(',')
|
||
for item in ss:
|
||
filter_day_set.add(int(item))
|
||
valid_days = []
|
||
for offset in range(0, -7, -1):
|
||
last_day = get_day_offset(day, offset)
|
||
week_day = get_day_of_week(last_day)
|
||
|
||
if week_day in filter_day_set:
|
||
if type == 'workday' and int(week_day) <= 5:
|
||
valid_days.append(last_day)
|
||
if type == 'weekend' and int(week_day) > 5:
|
||
valid_days.append(last_day)
|
||
return valid_days
|
||
|
||
|
||
def srcDir_toStr(srcDir):
|
||
"""
|
||
将标准八方向转化为汉字描述
|
||
:param srcDir:
|
||
:return:
|
||
"""
|
||
if srcDir == 'N':
|
||
return '北'
|
||
elif srcDir == 'S':
|
||
return '南'
|
||
elif srcDir == 'E':
|
||
return '东'
|
||
elif srcDir == 'W':
|
||
return '西'
|
||
elif srcDir == 'NE':
|
||
return '东北'
|
||
elif srcDir == 'NW':
|
||
return '西北'
|
||
elif srcDir == 'SE':
|
||
return '东南'
|
||
elif srcDir == 'SW':
|
||
return '西南'
|
||
else:
|
||
return '未知'
|
||
|
||
|
||
def dirname_to_srcDir(dirname: str):
|
||
"""
|
||
将汉字描述转化为标准八方向
|
||
:param dirname:
|
||
:return:
|
||
"""
|
||
if dirname == '北':
|
||
return 'N'
|
||
elif dirname == '南':
|
||
return 'S'
|
||
elif dirname == '东':
|
||
return 'E'
|
||
elif dirname == '西':
|
||
return 'W'
|
||
elif dirname == '东北':
|
||
return 'NE'
|
||
elif dirname == '西北':
|
||
return 'NW'
|
||
elif dirname == '东南':
|
||
return 'SE'
|
||
elif dirname == '西南':
|
||
return 'SW'
|
||
else:
|
||
return None
|
||
|
||
|
||
g_valid_srcDir_set = {'E', 'N', 'NE', 'NW', 'S', 'SE', 'SW', 'W'}
|
||
# 相反的车流来向pair
|
||
g_reversed_srcDir_couples = {'E-W', 'N-S', 'NE-SW', 'NW-SE'}
|
||
|
||
g_reversed_srcDir_mapping = {'E': 'W', 'W': 'E', 'N': 'S', 'S': 'N', 'NE': 'SW', 'SW': 'NE', 'NW': 'SE', 'SE': 'NW'}
|
||
|
||
|
||
def is_crossed_srcDir(src0, src1):
|
||
"""
|
||
判断两个方向是否为相交方向,也即:排除掉:相同和相反的情形
|
||
:param src0:
|
||
:param src1:
|
||
:return:
|
||
"""
|
||
if src0 == src1:
|
||
return False
|
||
global g_reversed_srcDir_couples
|
||
tmp_list = [src0, src1]
|
||
tmp_list = sorted(tmp_list)
|
||
key = tmp_list[0] + '-' + tmp_list[1]
|
||
if key in g_reversed_srcDir_couples:
|
||
return False
|
||
else:
|
||
return True
|
||
|
||
|
||
def is_reversed_srcDir(src0, src1):
|
||
"""
|
||
判断两个方向是否为相反方向
|
||
:param src0:
|
||
:param src1:
|
||
:return:
|
||
"""
|
||
if src0 == src1:
|
||
return False
|
||
global g_reversed_srcDir_couples
|
||
tmp_list = [src0, src1]
|
||
tmp_list = sorted(tmp_list)
|
||
key = tmp_list[0] + '-' + tmp_list[1]
|
||
if key in g_reversed_srcDir_couples:
|
||
return True
|
||
else:
|
||
return False
|
||
|
||
|
||
def test_srcDir_judge():
|
||
global g_valid_srcDir_set
|
||
for src0 in g_valid_srcDir_set:
|
||
for src1 in g_valid_srcDir_set:
|
||
retA = is_crossed_srcDir(src0, src1)
|
||
retB = is_reversed_srcDir(src0, src1)
|
||
print("%s %s == crossed:%s, reversed:%s" % (src0, src1, retA, retB))
|
||
|
||
|
||
def srcDir_to_flowStr(srcDir):
|
||
if srcDir == 'N':
|
||
return '北向南'
|
||
elif srcDir == 'S':
|
||
return '南向北'
|
||
elif srcDir == 'E':
|
||
return '东向西'
|
||
elif srcDir == 'W':
|
||
return '西向东'
|
||
elif srcDir == 'NE':
|
||
return '东北向西南'
|
||
elif srcDir == 'NW':
|
||
return '西北向东南'
|
||
elif srcDir == 'SE':
|
||
return '东南向西北'
|
||
elif srcDir == 'SW':
|
||
return '西南向东北'
|
||
else:
|
||
return '未知'
|
||
|
||
|
||
def srcDir_toCarDirectionStr(srcDir):
|
||
"""
|
||
将单个车流来向,转化为平行双向车流的方向描述
|
||
:param srcDir:
|
||
:return:
|
||
"""
|
||
if srcDir == 'N':
|
||
return '南-北'
|
||
elif srcDir == 'S':
|
||
return '南-北'
|
||
elif srcDir == 'E':
|
||
return '东-西'
|
||
elif srcDir == 'W':
|
||
return '东-西'
|
||
elif srcDir == 'NE':
|
||
return '东北-西南'
|
||
elif srcDir == 'NW':
|
||
return '西北-东南'
|
||
elif srcDir == 'SE':
|
||
return '西北-东南'
|
||
elif srcDir == 'SW':
|
||
return '东北-西南'
|
||
else:
|
||
return '未知'
|
||
|
||
|
||
def turn_type_toStr(turn_type):
|
||
if turn_type == 0:
|
||
return '直行'
|
||
elif turn_type == 1:
|
||
return '左转'
|
||
elif turn_type == 2:
|
||
return '右转'
|
||
elif turn_type == 3:
|
||
return '掉头'
|
||
else:
|
||
return '未知转向'
|
||
|
||
|
||
def get_turn_type_toStr(turn_type):
|
||
if turn_type == 'l':
|
||
return '左转'
|
||
elif turn_type == 's':
|
||
return '直行'
|
||
elif turn_type == 'r':
|
||
return '右转'
|
||
else:
|
||
return '未知转向'
|
||
|
||
|
||
def direction_toStr(direction):
|
||
ss = direction.split('|')
|
||
srcDir = ss[0]
|
||
turn_type = int(ss[1])
|
||
return "%s进口道-%s" % (srcDir_toStr(srcDir), turn_type_toStr(turn_type))
|
||
|
||
|
||
def get_default_ffs(rc):
|
||
if rc == 1:
|
||
return 120
|
||
elif rc == 2:
|
||
return 80
|
||
elif rc == 3:
|
||
return 70
|
||
elif rc == 4:
|
||
return 60
|
||
elif rc == 5:
|
||
return 50
|
||
elif rc == 6:
|
||
return 90
|
||
elif rc == 7:
|
||
return 70
|
||
elif rc == 8:
|
||
return 60
|
||
elif rc == 9:
|
||
return 50
|
||
else:
|
||
return 30
|
||
|
||
|
||
def cal_lane_capacity(v):
|
||
if v >= 120:
|
||
return 2000
|
||
elif v >= 100:
|
||
return 1900
|
||
elif v >= 80:
|
||
return 1800
|
||
elif v >= 60:
|
||
return 1730
|
||
elif v >= 50:
|
||
return 1690
|
||
elif v >= 40:
|
||
return 1640
|
||
elif v >= 30:
|
||
return 1550
|
||
else:
|
||
return 1380
|
||
|
||
|
||
def cal_lane_rate(lane_num):
|
||
if lane_num == 1:
|
||
return 1.0
|
||
elif lane_num == 2:
|
||
return 0.9
|
||
elif lane_num == 3:
|
||
return 0.8
|
||
elif lane_num == 4:
|
||
return 0.7
|
||
else:
|
||
return 0.6
|
||
|
||
|
||
def cal_default_capacity(rc, lane_num, ffs):
|
||
lane_capacity = cal_lane_capacity(ffs)
|
||
lane_rate = cal_lane_rate(lane_num)
|
||
return lane_capacity * lane_num * lane_rate
|
||
|
||
|
||
def calc_service_level(delay_time):
|
||
"""从2024-07-31改为国内标准"""
|
||
if delay_time <= 10:
|
||
return 'A'
|
||
elif delay_time <= 20:
|
||
return 'B'
|
||
elif delay_time <= 35:
|
||
return 'C'
|
||
elif delay_time <= 55:
|
||
return 'D'
|
||
elif delay_time <= 80:
|
||
return 'E'
|
||
else:
|
||
return 'F'
|
||
|
||
|
||
def calc_service_level_artery(delay_time, i):
|
||
if delay_time <= 5 * i:
|
||
return 'A'
|
||
elif delay_time <= 15 * i:
|
||
return 'B'
|
||
elif delay_time <= 25 * i:
|
||
return 'C'
|
||
elif delay_time <= 40 * i:
|
||
return 'D'
|
||
elif delay_time <= 60 * i:
|
||
return 'E'
|
||
else:
|
||
return 'F'
|
||
|
||
|
||
def jam_service_level_artery(jam_index):
|
||
if jam_index < 1.5:
|
||
return 'A'
|
||
elif jam_index < 2:
|
||
return 'D'
|
||
elif jam_index < 4:
|
||
return 'E'
|
||
else:
|
||
return 'F'
|
||
|
||
|
||
def tp_to_str(tp):
|
||
if tp == 100:
|
||
return '早高峰'
|
||
elif tp == 200:
|
||
return '晚高峰'
|
||
elif tp == 300:
|
||
return '早晚高峰'
|
||
elif tp == 400:
|
||
return '平峰'
|
||
elif tp == 500:
|
||
return '凌晨'
|
||
elif tp == 600:
|
||
return '夜间'
|
||
elif tp == 1000:
|
||
return '全天'
|
||
else:
|
||
return '未知'
|
||
|
||
|
||
def get_intervals_from_tp(tp):
|
||
"""
|
||
获取特殊时段对应的时间区间列表
|
||
:param tp: 特殊时段编号
|
||
:return: [ [start, end] [start, end] ... ]
|
||
"""
|
||
res = []
|
||
if tp == 100:
|
||
res.append(['07:00', '09:00'])
|
||
elif tp == 200:
|
||
res.append(['17:00', '19:00'])
|
||
elif tp == 300:
|
||
res.append(['07:00', '09:00'])
|
||
res.append(['17:00', '19:00'])
|
||
elif tp == 400:
|
||
res.append(['06:00', '07:00'])
|
||
res.append(['09:00', '17:00'])
|
||
res.append(['19:00', '22:00'])
|
||
elif tp == 500:
|
||
res.append(['00:00', '06:00'])
|
||
elif tp == 600:
|
||
res.append(['22:00', '24:00'])
|
||
elif tp == 1000:
|
||
res.append(['00:00', '24:00'])
|
||
else:
|
||
pass
|
||
return res
|
||
|
||
|
||
def get_intervals_from_tp_in(tp):
|
||
"""
|
||
获取特殊时段对应的时间区间列表
|
||
:param tp: 特殊时段编号
|
||
:return: [ [start, end] [start, end] ... ]
|
||
"""
|
||
res = []
|
||
if tp == 100:
|
||
res.append(['07:00', '09:00'])
|
||
elif tp == 200:
|
||
res.append(['17:00', '19:00'])
|
||
elif tp == 300:
|
||
res.append(['07:00', '09:00'])
|
||
res.append(['17:00', '19:00'])
|
||
elif tp == 400:
|
||
res.append(['06:00', '07:00'])
|
||
res.append(['09:00', '17:00'])
|
||
res.append(['19:00', '22:00'])
|
||
elif tp == 500:
|
||
res.append(['00:00', '06:00'])
|
||
elif tp == 600:
|
||
res.append(['22:00', '24:00'])
|
||
elif tp == 1000:
|
||
res.append(['00:00', '24:00'])
|
||
else:
|
||
pass
|
||
return res
|
||
|
||
|
||
def get_intervals_from_tp_slice(tp):
|
||
"""
|
||
获取特殊时段对应的时间区间列表
|
||
:param tp: 特殊时段编号
|
||
:return: [ [start, end] [start, end] ... ]
|
||
"""
|
||
res = []
|
||
if tp == 100:
|
||
res.append(['07:00', '09:00'])
|
||
elif tp == 200:
|
||
res.append(['17:00', '19:00'])
|
||
elif tp == 300:
|
||
res.append(['07:00', '09:00'])
|
||
res.append(['17:00', '19:00'])
|
||
elif tp == 400:
|
||
res.append(['06:00', '07:00'])
|
||
res.append(['09:00', '17:00'])
|
||
res.append(['19:00', '22:00'])
|
||
elif tp == 500:
|
||
res.append(['00:00', '06:00'])
|
||
elif tp == 600:
|
||
res.append(['22:00', '24:00'])
|
||
elif tp == 1000:
|
||
res.append(['00:00', '24:00'])
|
||
else:
|
||
pass
|
||
return res
|
||
|
||
|
||
def has_intersection(one, other):
|
||
return other[0] < one[1] and other[1] > one[0]
|
||
|
||
|
||
def has_intersection_with_list(one, other_interval_list):
|
||
for other in other_interval_list:
|
||
if has_intersection(one, other):
|
||
return True
|
||
return False
|
||
|
||
|
||
def fall_in_intervals(hm, interval_list):
|
||
for one in interval_list:
|
||
if one[0] < hm <= one[1]:
|
||
return True
|
||
return False
|
||
|
||
|
||
def if_now_fit_weektp(week_days, tp, tp_start, tp_end):
|
||
"""
|
||
判断当前时刻是否落入指定星期几集合的特定时段中
|
||
:param week_days: 星期几集合,格式:'1,2,3,4,5'
|
||
:param tp: 0表示自定义,其他预定义: 100/200/300/400/500/600/1000
|
||
:param tp_start: tp==0情况下的时段开始时刻
|
||
:param tp_end: tp==0情况下的时段结束时刻
|
||
:return: bool
|
||
"""
|
||
ss = week_days.split(',')
|
||
weekday_set = set()
|
||
for item in ss:
|
||
weekday_set.add(int(item))
|
||
|
||
today_weekday = get_weekday()
|
||
if today_weekday not in weekday_set:
|
||
return False
|
||
|
||
# 计算时段对应的区间列表
|
||
now_hm = get_hhmm()
|
||
interval_list = []
|
||
if tp == 0:
|
||
interval_list = [[tp_start, tp_end]]
|
||
else:
|
||
interval_list = get_intervals_from_tp(tp)
|
||
# 判断当前时刻 是否落入的区间列表中
|
||
return fall_in_intervals(now_hm, interval_list)
|
||
|
||
|
||
def calc_intersection(one, other):
|
||
"""
|
||
计算两个区间的交集区间,无交集时返回 None
|
||
:param one:
|
||
:param other:
|
||
:return:
|
||
"""
|
||
if has_intersection(one, other):
|
||
return [max(one[0], other[0]), min(one[1], other[1])]
|
||
else:
|
||
return None
|
||
|
||
|
||
def calc_intersection_of_two_intervals(one_list, other_list):
|
||
"""
|
||
计算两个区间列表的交集。如果没有交集,则返回 None
|
||
:param one_list:
|
||
:param other_list:
|
||
:return: intersection_list
|
||
"""
|
||
intersection_list = []
|
||
for one in one_list:
|
||
for other in other_list:
|
||
inter = calc_intersection(one, other)
|
||
if inter:
|
||
intersection_list.append(inter)
|
||
if len(intersection_list) > 0:
|
||
return intersection_list
|
||
else:
|
||
return None
|
||
|
||
|
||
def interval_list_to_str(interval_list):
|
||
ss = []
|
||
for one in interval_list:
|
||
ss.append('%s - %s' % (one[0], one[1]))
|
||
return ', '.join(ss)
|
||
|
||
|
||
def timer_decorator(func):
|
||
def wrapper(*args, **kwargs):
|
||
start_time = time.time()
|
||
result = func(*args, **kwargs)
|
||
end_time = time.time()
|
||
msg = f"{func.__name__} time_spent {end_time - start_time:.6f} s"
|
||
logging.debug(msg)
|
||
print(msg)
|
||
return result
|
||
|
||
return wrapper
|
||
|
||
|
||
def make_common_res(status, msg):
|
||
res = {}
|
||
res['status'] = status
|
||
res['msg'] = msg
|
||
return res
|
||
|
||
|
||
def make_res(status, msg, desc):
|
||
res = {}
|
||
res['status'] = status
|
||
res['msg'] = msg
|
||
res['desc'] = desc
|
||
return res
|
||
|
||
|
||
def roadclass2str(rc):
|
||
rc_mapping = {1: '高速公路', 2: '国道', 3: '省道', 4: '县道', 5: '其他低等级道路', 6: '城市快速路', 7: '城市主干道',
|
||
8: '城市次干道', 9: '城市支路', 10: '郊区小路'}
|
||
rc = int(rc)
|
||
if rc < 1 or rc > 10:
|
||
return '未知道路等级'
|
||
else:
|
||
return rc_mapping[rc]
|
||
|
||
|
||
def comparable_rank(rc):
|
||
"""
|
||
转换为可数值比较的rc,取值范围1~10,数值越小,等级越高
|
||
"""
|
||
if 1 <= rc <= 4:
|
||
return rc * 2
|
||
elif 6 <= rc <= 9:
|
||
return rc * 2 - 11
|
||
elif rc == 5:
|
||
return 9
|
||
else:
|
||
return int(rc)
|
||
|
||
|
||
def make_tp_list(tp_start_list):
|
||
num = len(tp_start_list)
|
||
tp_list = []
|
||
for i in range(0, num):
|
||
if i < num - 1:
|
||
tp_list.append([tp_start_list[i], tp_start_list[i + 1]])
|
||
else:
|
||
# if tp_start_list[i] < '23:59':
|
||
tp_list.append([tp_start_list[i], '24:00'])
|
||
return tp_list
|
||
|
||
|
||
def make_map_from_list(info_list, key_name):
|
||
res = {}
|
||
for info in info_list:
|
||
key = info[key_name]
|
||
res[key] = info
|
||
return res
|
||
|
||
|
||
def parse_int_list(num_list_str):
|
||
tmp_str_list = num_list_str.split(',')
|
||
num_list = []
|
||
for ss in tmp_str_list:
|
||
num_list.append(int(ss))
|
||
return num_list
|
||
|
||
|
||
def parse_float_list(num_list_str):
|
||
tmp_str_list = num_list_str.split(',')
|
||
num_list = []
|
||
for ss in tmp_str_list:
|
||
num_list.append(float(ss))
|
||
return num_list
|
||
|
||
|
||
def parse_phase_name(phase_name):
|
||
"""解析相位的文本描述"""
|
||
num = len(phase_name)
|
||
srcDir_part = phase_name[0: num - 2]
|
||
turn_part = phase_name[num - 2:]
|
||
return srcDir_part, turn_part
|
||
|
||
|
||
def list_to_str(item_list, spliter):
|
||
if not item_list or len(item_list) == 0:
|
||
return ''
|
||
"""将list转化为字符串"""
|
||
ss = []
|
||
for item in item_list:
|
||
ss.append(str(item))
|
||
return spliter.join(ss)
|
||
|
||
|
||
def time_str_to_seconds(time_str):
|
||
time_obj = datetime.strptime(time_str, '%H:%M')
|
||
seconds = time_obj.hour * 3600 + time_obj.minute * 60
|
||
return seconds
|
||
|
||
|
||
def time_str_to_minutes(time_str):
|
||
time_obj = datetime.strptime(time_str, '%H:%M')
|
||
minutes = time_obj.hour * 60 + time_obj.minute
|
||
return minutes
|
||
|
||
|
||
def allowed_file(filename):
|
||
return '.' in filename and filename.rsplit('.', 1)[1].lower() in {'xlsx', 'xls'}
|
||
|
||
|
||
def allowed_file_suffix(filename, suffix):
|
||
return '.' in filename and filename.rsplit('.', 1)[1].lower() in suffix
|
||
|
||
|
||
t_sheet_cols = {
|
||
'调度计划表': {
|
||
'head': ['序号', '调度类型', '月份', '日期', '星期', '日计划号'],
|
||
'column': ['priority', 'type', 'month', 'day', 'weekday', 'scheduleid'],
|
||
'rex': [r'^\d+$', r'^\d+|$', r'^([0-9]|1[0-2]|)$', r'^(?:[0-9]|[12][0-9]|3[01]|)$', r'^([0-7]|)$', r'^\d+$'],
|
||
},
|
||
'日计划表': {
|
||
'head': ['序号', '日计划号', '执行时段', '配时方案号', '控制模式'],
|
||
'column': ['eg', 'scheduleid', 'tp_start', 'planid', 'control_mode'],
|
||
'rex': [r'^\d+$', r'^\d+$', r'^[0-2]\d:[0-5]\d$', r'^\d+$', r'.'],
|
||
},
|
||
'配时方案表': {
|
||
'head': ['序号', '方案号', '方案名称', '周期长', '阶段序号', '阶段号', '阶段时间', '协调相位号', '协调相位差'],
|
||
'column': ['eg', 'planid', 'name', 'cycle', 'stage_seq', 'stageid', 'stage_duration', 'coord_phaseid',
|
||
'offset'],
|
||
'rex': [r'^\d+$', r'^\d+$', r'.', r'^\d+$', r'^\d+$', r'^\d+$', r'^\d+$', r'^(\d+|)$', r'^(\d+|)$'],
|
||
},
|
||
'阶段表': {
|
||
'head': ['序号', '阶段号', '阶段名称', '黄灯时间', '全红时间', '关联相位'],
|
||
'column': ['eg', 'stageid', 'name', 'yellow', 'allred', 'phases'],
|
||
'rex': [r'^\d+$', r'^\d+$', r'.', r'^\d+$', r'^\d+$', r'^(\d+,)*\d+$'],
|
||
},
|
||
'相位表': {
|
||
'head': ['序号', '相位号', '相位名称', '最小绿', '最大绿'],
|
||
'column': ['eg', 'phaseid', 'name', 'min_green', 'max_green'],
|
||
'rex': [r'^\d+$', r'^\d+$', r'.', r'^\d+$', r'^\d+$'],
|
||
}
|
||
}
|
||
|
||
|
||
def read_and_filter_excel(file, sheet_names, is_new=False):
|
||
xl = pd.ExcelFile(file)
|
||
sheet_cols = t_sheet_cols
|
||
if is_new:
|
||
sheet_cols = t_sheet_cols_new
|
||
if not all(sheet_name in sheet_cols for sheet_name in xl.sheet_names):
|
||
return {}, '存在不可用sheet,请检查后重新上传'
|
||
|
||
if not all(sheet_name in xl.sheet_names for sheet_name in sheet_names):
|
||
return {}, 'sheet不匹配,请检查后重新上传'
|
||
list = {}
|
||
for sheet_name in sheet_names:
|
||
df = pd.read_excel(xl, sheet_name, dtype=str, na_filter=False)
|
||
list[sheet_name] = []
|
||
|
||
if len(df.columns) < len(sheet_cols[sheet_name]['head']):
|
||
return {}, '表头不匹配,请检查后重新上传'
|
||
|
||
for index, column_name in enumerate(sheet_cols[sheet_name]['head']):
|
||
if column_name != df.columns[index]:
|
||
return {}, '表头错误,请检查后重新上传'
|
||
|
||
for index, row in df.iterrows():
|
||
line = {}
|
||
cleaned_row = row.apply(lambda x: x.strip() if isinstance(x, str) else x)
|
||
for i in range(len(sheet_cols[sheet_name]['head'])):
|
||
if not re.match(sheet_cols[sheet_name]['rex'][i], str(cleaned_row.iloc[i])):
|
||
return {}, f'{sheet_name} {df.columns[i]}列 第{index + 2}行 格式错误'
|
||
if len(str(cleaned_row.iloc[i])) > 100:
|
||
return {}, f'{sheet_name} {df.columns[i]}列 第{index + 2}行 长度超过100个字符'
|
||
line[sheet_cols[sheet_name]['column'][i]] = cleaned_row.iloc[i]
|
||
if sheet_name == "调度计划表" and '日计划名称' in df.columns:
|
||
line['schedule_name'] = cleaned_row.iloc[len(sheet_cols[sheet_name]['head'])]
|
||
list[sheet_name].append(line)
|
||
if len(list[sheet_name]) == 0:
|
||
return {}, '数据不能为空'
|
||
|
||
return list, ''
|
||
|
||
|
||
def check_request(value):
|
||
try:
|
||
v = str(value).lower()
|
||
pattern = r"\b(and|like|exec|insert|select|drop|grant|alter|delete|update|count|chr|mid|master|truncate|char|delclare|or)\b|(\*|;)"
|
||
r = re.search(pattern, v)
|
||
if r:
|
||
raise Exception("request params is wrong")
|
||
return None
|
||
except Exception as error:
|
||
return error
|
||
|
||
|
||
def get_time_stamp_list_by_duration(timestamp, duration):
|
||
batch_size = 0
|
||
if duration == 1:
|
||
batch_size = 3
|
||
elif duration == 2: # 1h
|
||
batch_size = 12
|
||
elif duration == 3: # 2h
|
||
batch_size = 24
|
||
timestamp_list = []
|
||
# 15分钟
|
||
for i in range(0, batch_size):
|
||
timestamp_list.append(timestamp - 300 * i)
|
||
return timestamp_list
|
||
|
||
|
||
# 获取可用的天
|
||
def get_days_from_week_daytype(week, day, daytype):
|
||
weeks = week.split(',')
|
||
week_day = get_day_of_week(day)
|
||
if daytype == 'today' and str(week_day) in weeks:
|
||
return [day]
|
||
if daytype == 'hisday' and str(week_day) in weeks:
|
||
return [day]
|
||
if daytype != 'hisday' and daytype != 'today':
|
||
if daytype == 'workdayofweek':
|
||
return filter_last_week_by_days(week, day, 'workday')
|
||
if daytype == 'weekendofweek':
|
||
return filter_last_week_by_days(week, day, 'weekend')
|
||
return []
|
||
|
||
|
||
def get_time_str_slice_by_dur(start, end):
|
||
result = []
|
||
while start < end:
|
||
new_time_obj = datetime.strptime(start, '%H:%M').timestamp() + 3600
|
||
new_time_obj = datetime.fromtimestamp(new_time_obj)
|
||
# 将新的时间对象格式化为字符串
|
||
new_time_str = new_time_obj.strftime('%H:%M')
|
||
if new_time_str == '00:00':
|
||
new_time_str = '24:00'
|
||
result.append("{}-{}".format(start, new_time_str))
|
||
start = new_time_str
|
||
return result
|
||
|
||
|
||
# 组合绿波协调时间段列表
|
||
def get_green_waves_tp(green_waves_info):
|
||
tps = []
|
||
for item_green_wave in green_waves_info:
|
||
if item_green_wave['launch_mode'] == 'manual' or item_green_wave['week_days'] == '':
|
||
continue
|
||
# 筛选绿波的时段
|
||
if item_green_wave['tp'] > 0:
|
||
tp_slice = get_intervals_from_tp_in(item_green_wave['tp'])
|
||
for sublist in tp_slice:
|
||
str_end = get_time_str_slice_by_dur(sublist[0], sublist[1])
|
||
if len(str_end) > 0:
|
||
tps += [item_list for item_list in str_end if item_list]
|
||
|
||
if item_green_wave['tp'] <= 0:
|
||
tp_end = item_green_wave['tp_end']
|
||
if tp_end == '00:00':
|
||
tp_end = '24:00'
|
||
str_end = get_time_str_slice_by_dur(item_green_wave['tp_start'], tp_end)
|
||
tps += [item_list for item_list in str_end if item_list]
|
||
|
||
tps_range = list(set(tps))
|
||
return tps_range
|
||
|
||
|
||
#获取绿波协调路段时间段和协调速度
|
||
def get_green_waves_tp_speed(green_waves_info):
|
||
result = defaultdict(lambda: defaultdict(list))
|
||
for item_green_wave in green_waves_info:
|
||
if item_green_wave['launch_mode'] == 'manual' or item_green_wave['week_days'] == '':
|
||
continue
|
||
|
||
# 筛选绿波的时段
|
||
if item_green_wave['tp'] > 0:
|
||
tp_slice = get_intervals_from_tp_in(item_green_wave['tp'])
|
||
for sublist in tp_slice:
|
||
str_end = '-'.join(sublist)
|
||
result[item_green_wave['waveid']][str_end] = item_green_wave
|
||
|
||
if item_green_wave['tp'] <= 0:
|
||
tp_end = item_green_wave['tp_end']
|
||
if tp_end == '00:00':
|
||
tp_end = '24:00'
|
||
result[item_green_wave['waveid']][
|
||
item_green_wave['tp_start'] + '-' + tp_end] = item_green_wave
|
||
return result
|
||
|
||
|
||
#获取配时方案和时间
|
||
def get_green_waves_road(wave_info, cross_info):
|
||
road_list = {} #正向
|
||
if wave_info['type'] == 0 or wave_info['type'] == 2:
|
||
i = 0
|
||
speeds = wave_info['speed_list'].split(',')
|
||
for item_cross in cross_info:
|
||
if item_cross['cross_seq'] <= wave_info['start_cross_seq']:
|
||
continue
|
||
i += 1
|
||
if i < wave_info['cross_num']:
|
||
speed = speeds[i - 1]
|
||
road_list[item_cross['inroadid']] = float(speed)
|
||
|
||
if wave_info['type'] == 1 or wave_info['type'] == 2:
|
||
i = 0
|
||
speeds = wave_info['speed_list_reversed'].split(',') if wave_info['speed_list_reversed'] != '' else []
|
||
for item_cross in cross_info:
|
||
if item_cross['cross_seq'] < wave_info['start_cross_seq']:
|
||
continue
|
||
i += 1
|
||
if i < wave_info['cross_num']:
|
||
speed = speeds[i - 1] if len(speeds) > 0 else 0
|
||
road_list[item_cross['inroadid_reversed']] = float(speed)
|
||
|
||
return road_list
|
||
|
||
|
||
# 取两个时间段列表的差集
|
||
def get_green_waves_tp_diff(green_tps_slice, exist_tp_slice):
|
||
result_tp_slice = []
|
||
for item_tp in exist_tp_slice:
|
||
if item_tp in green_tps_slice:
|
||
continue
|
||
result_tp_slice.append(item_tp)
|
||
|
||
return sorted(result_tp_slice)
|
||
|
||
|
||
# 取两个时间段列表的交集
|
||
def get_green_waves_tp_intersect(tp_slice_1, tp_slice_2):
|
||
result_tp_slice = []
|
||
for item_tp in tp_slice_1:
|
||
if item_tp in tp_slice_2:
|
||
result_tp_slice.append(item_tp)
|
||
|
||
return sorted(result_tp_slice)
|
||
|
||
|
||
# 根据时间段组成连续的时间段
|
||
def get_green_waves_tp_continuous(tp_slice):
|
||
green_waves_time_dur = ''
|
||
green_waves_result_final_map = {}
|
||
green_waves_result_final_slice = []
|
||
for time_dur in tp_slice:
|
||
if len(green_waves_result_final_map) == 0:
|
||
green_waves_time_dur = time_dur
|
||
green_waves_result_final_map[time_dur] = 1
|
||
continue
|
||
this_time_dur = time_dur.split('-')
|
||
prev_time_dur = green_waves_time_dur.split('-')
|
||
if this_time_dur[0] == prev_time_dur[1]:
|
||
del green_waves_result_final_map[green_waves_time_dur]
|
||
green_waves_time_dur = prev_time_dur[0] + "-" + this_time_dur[1]
|
||
green_waves_result_final_map[green_waves_time_dur] = 1
|
||
|
||
if this_time_dur[0] != prev_time_dur[1]:
|
||
green_waves_time_dur = time_dur
|
||
green_waves_result_final_map[green_waves_time_dur] = 1
|
||
|
||
for final_tp, _ in green_waves_result_final_map.items():
|
||
green_waves_result_final_slice.append(final_tp)
|
||
|
||
return green_waves_result_final_slice
|
||
|
||
|
||
def get_used_week_days(daytype, day):
|
||
weeks = '1,2,3,4,5'
|
||
if daytype == 'weekendofweek':
|
||
weeks = '6,7'
|
||
if daytype == 'hisday':
|
||
weeks = str(get_day_of_week(day))
|
||
days = get_days_from_week_daytype(weeks, day, daytype)
|
||
return days
|
||
|
||
|
||
# 根据日期和tp获取指定dur间隔时间戳:dur是分钟每15分钟间隔取时间戳列表
|
||
def get_time_stamp_list_by_days_tp(days, tp, dur):
|
||
default_start_end = []
|
||
tp_list = get_intervals_from_tp_in(tp)
|
||
for item_day in days:
|
||
day_str = str(item_day)
|
||
for item_tp in tp_list:
|
||
item_start_stamp = item_tp[0]
|
||
item_end_stamp = item_tp[1]
|
||
next_day_str = day_str
|
||
start_stamp = int(
|
||
str2timestamp("{} {}:00".format(f"{day_str[:4]}-{day_str[4:6]}-{day_str[6:]}", item_start_stamp)))
|
||
if item_tp[1] == '24:00':
|
||
item_end_stamp = '00:00'
|
||
next_day_str = str(get_day_offset(item_day, 1))
|
||
end_stamp = int(
|
||
str2timestamp(
|
||
"{} {}:00".format(f"{next_day_str[:4]}-{next_day_str[4:6]}-{next_day_str[6:]}", item_end_stamp)))
|
||
|
||
while start_stamp <= end_stamp:
|
||
default_start_end.append(start_stamp)
|
||
start_stamp += dur * 60
|
||
return default_start_end
|
||
|
||
|
||
#生成每半小时一个时段
|
||
def get_half_hour_24_str():
|
||
# 获取当前时间
|
||
current_time = datetime.now()
|
||
|
||
# 将分钟数、秒数和微秒数置零,保留小时数
|
||
current_time = current_time.replace(minute=0, second=0, microsecond=0)
|
||
|
||
# 初始化一个空列表来存储结果
|
||
hourly_times = []
|
||
|
||
# 循环生成一天中每个小时的时间点
|
||
for i in range(48):
|
||
hourly_times.append(current_time.strftime('%H:%M')) # 格式化为 '小时:分钟' 的字符串
|
||
current_time += timedelta(minutes=30) # 加1小时
|
||
|
||
return hourly_times
|
||
|
||
|
||
# 生成每一个时段
|
||
def get_hour_24_str():
|
||
# 获取当前时间
|
||
current_time = datetime.now()
|
||
|
||
# 将分钟数、秒数和微秒数置零,保留小时数
|
||
current_time = current_time.replace(minute=0, second=0, microsecond=0)
|
||
|
||
# 初始化一个空列表来存储结果
|
||
hourly_times = []
|
||
|
||
# 循环生成一天中每个小时的时间点
|
||
for i in range(24):
|
||
hourly_times.append(current_time.strftime('%H:%M')) # 格式化为 '小时:分钟' 的字符串
|
||
current_time += timedelta(minutes=60) # 加1小时
|
||
|
||
return hourly_times
|
||
|
||
|
||
#获取周天交集
|
||
def get_green_waves_week_mixed(green_weeks, week_day):
|
||
week_days = []
|
||
for item_week_day in green_weeks:
|
||
for week in week_day:
|
||
if item_week_day == week:
|
||
week_days.append(week)
|
||
return week_days
|
||
|
||
|
||
#根据绿波配置组装路口
|
||
def get_green_waves_cross_list(start_seq, cross_num, artery_coss_list):
|
||
green_waves_list = []
|
||
start_index = start_seq - 1
|
||
for i in range(0, cross_num):
|
||
index = start_index + i
|
||
green_waves_list.append(artery_coss_list[index])
|
||
|
||
return green_waves_list
|
||
|
||
|
||
def tplist_2_tpinterval(tp_list):
|
||
if len(tp_list) == 0:
|
||
return []
|
||
|
||
result = [
|
||
{'tp_start': tp_list[0], 'tp_end': '23:59'}
|
||
]
|
||
|
||
for i in range(1, len(tp_list)):
|
||
if tp_list[i] == '23:59':
|
||
continue
|
||
result[-1]['tp_end'] = tp_list[i]
|
||
result.append({'tp_start': tp_list[i], 'tp_end': '23:59'})
|
||
|
||
return result
|
||
|
||
|
||
def time_intersection_minutes(time1: str, time2: str) -> int:
|
||
"""计算两个时间段的交集分钟数"""
|
||
|
||
def to_minutes(t: str) -> int:
|
||
h, m = map(int, t.split(':'))
|
||
return h * 60 + m
|
||
|
||
# 解析时间段
|
||
start1, end1 = map(to_minutes, time1.split('-'))
|
||
start2, end2 = map(to_minutes, time2.split('-'))
|
||
|
||
# 计算交集
|
||
intersection_start = max(start1, start2)
|
||
intersection_end = min(end1, end2)
|
||
|
||
# 如果没有交集,返回0
|
||
if intersection_start >= intersection_end:
|
||
return 0
|
||
|
||
# 返回交集分钟数
|
||
return intersection_end - intersection_start
|