cross_doctor/tool/screen.py

1010 lines
50 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
import datetime
import os
import platform
import random
import threading
import time
from typing import List
import requests
from flask import jsonify
from playwright.async_api import async_playwright
from playwright.sync_api import sync_playwright
from PIL import Image
domain = 'http://101.42.25.182:8082/'
def get_os_type():
system = platform.system()
if system == 'Darwin':
return 'macOS'
elif system == 'Linux':
try:
import distro
if distro.name() == 'Ubuntu':
return 'Ubuntu'
return 'Linux'
except ImportError:
return 'Linux'
elif system == 'Windows':
return 'Windows'
else:
return 'Unknown OS'
class CutMonitorImgColumn:
def __init__(self, wave_id, wave_name, week_or_day, date_name, tp_start, tp_end):
self.wave_id = wave_id
self.wave_name = wave_name
self.week_or_day = week_or_day #1日2周
self.date_name = date_name
self.tp_start = tp_start
self.tp_end = tp_end
class CutReportImgColumn:
def __init__(self, crossid, cross_name, time_list, tp_name, walk=0, delay=0, flow_in=0):
self.crossid = crossid
self.cross_name = cross_name
self.time_list = time_list
self.tp_name = tp_name
self.walk = walk
self.delay = delay
self.flow_in = flow_in
class ScreenError(Exception):
def __init__(self, message):
self.message = message
def __str__(self):
return f"CustomError: {self.message}"
def get_request_host(nodeid):
response = requests.get(f"http://101.42.25.182:9999/api/route?nodeid={nodeid}")
if response.status_code == 200:
return response.json(), None
return None, 'Failed to fetch data'
def screenshot_wave_img_v3(wave_id, wave_name, time_list, tp_name, high=0, odi=0, divide=0, remit=0):
return asyncio.run(_screenshot_wave_img_v3(wave_id, wave_name, time_list, tp_name, high, odi, divide, remit))
def screenshot_report_img(attrs: List[CutReportImgColumn], nodeid, area_id, userid, role):
return asyncio.run(_screenshot_report_img(attrs, nodeid, area_id, userid, role))
def screenshot_wave_monitor_img(attrs: List[CutMonitorImgColumn], userid, role):
return asyncio.run(_screenshot_wave_monitor_img(attrs, userid, role))
async def _screenshot_wave_img_v3(wave_id, wave_name, time_list, tp_name, high=0, odi=0, divide=0, remit=0):
image_dir = os.path.dirname(os.path.abspath(__file__))
file_name = f"{wave_id}{tp_name[:11]}{high}{odi}{divide}{remit}"
for item_time in time_list:
file_name += f"{item_time[0]}{item_time[1]}"
file_dir = f"{image_dir}/../temp/{file_name}.jpg"
if os.path.exists(file_dir):
return file_dir, None
try:
async with async_playwright() as p:
# 启动浏览器
browser = await p.chromium.launch(headless=True,
args=['--window-size=1900,768', '--no-sandbox',
'--disable-setuid-sandbox',
'--disable-gpu'])
page = await browser.new_page() # 新建页面
# 这里模拟获取 host 响应的方式
#response, error = get_request_host(nodeid) # 确保这个函数能返回正确的响应
# 设置浏览器的视口大小
await page.set_viewport_size({'width': 1900, 'height': 768})
await asyncio.sleep(2)
# 打开目标网页
print('打开页面')
await page.goto(f"http://101.42.25.182:8082/page/wave_evaluate/evaluate", timeout=120000)
# 等待元素加载
print('等待amap')
await page.wait_for_selector('#amap')
await page.wait_for_function('document.readyState === "complete"')
await page.wait_for_selector('.amap-container')
await page.wait_for_function('document.querySelector(".amap-container") !== null')
#await page.wait_for_function('window.AMap && window.AMap.Map')
# await page.wait_for_function('window.AMap && window.AMap.Map')
print('等待网络')
await page.wait_for_load_state('load')
await page.wait_for_load_state('networkidle')
print('等待amap-layer-overlay')
await page.wait_for_selector('.el-loading-mask', state="hidden")
await page.wait_for_selector(".el-loading-spinner", state="hidden")
await page.wait_for_selector('.amap-layer-overlay', timeout=30000)
print("右缩放")
right = await page.query_selector('.anticon.anticon-right-square')
while not await right.is_visible() or not await right.is_enabled():
await asyncio.sleep(0.1) # 等待 100 毫秒再检查
await right.click()
await page.wait_for_selector(".metrics", state="hidden")
#await asyncio.sleep(1)
print("下缩放")
bottom = await page.query_selector('.anticon.anticon-down-square')
while not await bottom.is_visible() or not await bottom.is_enabled():
await asyncio.sleep(0.1) # 等待 100 毫秒再检查
await bottom.click()
await page.wait_for_selector(".b-panel", state="hidden")
#await asyncio.sleep(1)
print("选择绿波....")
input_element = await page.query_selector("#rc_select_0")
await input_element.click()
await input_element.fill(wave_name)
print("输入绿波....")
await page.wait_for_selector('.rc-virtual-list-holder-inner')
dev_select = await page.query_selector(".rc-virtual-list-holder-inner")
await dev_select.click()
print("点击绿波....")
if high == 1:
checkbox_locator = page.locator('input[type="checkbox"][value="high_frequency"]')
await checkbox_locator.click()
if odi == 1:
checkbox_locator = page.locator('input[type="checkbox"][value="ODI"]')
await checkbox_locator.click()
if divide == 1:
checkbox_locator = page.locator('input[type="radio"][value="forward-out_ts_info"]')
await checkbox_locator.click()
if divide == 2:
checkbox_locator = page.locator('input[type="radio"][value="backward-out_ts_info"]')
await checkbox_locator.click()
if remit == 1:
checkbox_locator = page.locator('input[type="radio"][value="forward-in_ts_info"]')
await checkbox_locator.click()
if remit == 2:
checkbox_locator = page.locator('input[type="radio"][value="backward-in_ts_info"]')
await checkbox_locator.click()
for index, item_time in enumerate(time_list):
time_select = page.locator('input[placeholder="开始日期"]')
#time_select = await page.query_selector(".ant-picker-input.ant-picker-input-active")
await time_select.click()
locator = page.locator('div.ant-picker-date-panel')
locator_elements = await locator.all()
if len(locator_elements) != 2:
await browser.close()
return '时间选择器不存在'
start_elements = locator_elements[0]
end_elements = locator_elements[1]
start = item_time[0]
end = item_time[1]
#找开始时间
start_td = start_elements.locator('td')
start_td_elements = await start_td.all()
start_td_title = await start_td_elements[0].get_attribute('title')
start_button = start_elements.locator('button.ant-picker-header-prev-btn')
#结束日期
end_td = end_elements.locator('td')
end_td_elements = await end_td.all()
end_td_title = await end_td_elements[-1].get_attribute('title')
end_button = end_elements.locator('ant-picker-header-next-btn')
while start < start_td_title:
await start_button.click()
start_td = start_elements.locator('td')
start_td_elements = await start_td.all()
start_td_title = await start_td_elements[0].get_attribute('title')
while start > end_td_title:
await end_button.click()
end_td = end_elements.locator('td')
end_td_elements = await end_td.all()
end_td_title = await end_td_elements[-1].get_attribute('title')
#开始时间所在区域
for td_element in start_td_elements:
title = await td_element.get_attribute('title') # 获取 title 属性
if title == start:
await td_element.click()
break
# 开始时间所在区域
for td_element in end_td_elements:
title = await td_element.get_attribute('title') # 获取 title 属性
if title == start:
await td_element.click()
break
# 找结束时间
start_td = start_elements.locator('td')
start_td_elements = await start_td.all()
# 结束日期
end_td = end_elements.locator('td')
end_td_elements = await end_td.all()
at_start = False
# 结束日期在开始时间所在区域
for td_element in start_td_elements:
title = await td_element.get_attribute('title') # 获取 title 属性
if title == end:
at_start = True
await td_element.click()
break
if not at_start:
# 结束日期在结束时间所在区域
for td_element in end_td_elements:
title = await td_element.get_attribute('title') # 获取 title 属性
if title == end:
await td_element.click()
break
button = page.locator('button.ant-btn.ant-btn-circle.ant-btn-primary.ant-btn-sm.ant-btn-icon-only')
while not await button.is_visible() or not await button.is_enabled():
await asyncio.sleep(0.1) # 等待 100 毫秒再检查
await button.click()
input_select_1 = page.locator('input#rc_select_1')
await input_select_1.click()
tp_divs = page.locator('div.ant-select-item.ant-select-item-option')
tp_div_elements = await tp_divs.all()
for item_tp_div in tp_div_elements:
title = await item_tp_div.get_attribute('title')
if title == tp_name:
await item_tp_div.click()
search = await page.query_selector(
'.css-1u7g04k.ant-btn.ant-btn-primary.ant-btn-sm:not(.ant-btn-circle)')
while not await search.is_visible() or not await search.is_enabled():
await asyncio.sleep(0.1) # 等待 100 毫秒再检查
await search.click()
print('查询等待网络')
await page.wait_for_load_state('load')
await page.wait_for_load_state('networkidle')
if divide > 0 or remit > 0:
print('等待转向比')
await page.wait_for_selector('div.flow-container', state='attached')
await page.wait_for_selector('div.flow-container >> :visible', timeout=10000)
print('等待图片加载完成')
image_locators = await page.locator('img').all()
# 检查每个图片元素的加载状态
for img_locator in image_locators:
is_loaded = await img_locator.evaluate('(img) => img.complete') # 检查图片是否加载完成
if not is_loaded:
await img_locator.wait_for(state='attached')
print('查询等待amap-layer-overlay')
await page.wait_for_selector('.el-loading-mask', state="hidden")
await page.wait_for_selector(".el-loading-spinner", state="hidden")
element = await page.query_selector('.amap-layer-overlay')
markers = await page.query_selector_all('.amap-marker')
marker_positions = []
for marker in markers:
# 获取 left 和 top 样式属性
left = await marker.evaluate("el => window.getComputedStyle(el).left")
top = await marker.evaluate("el => window.getComputedStyle(el).top")
marker_positions.append({"left": left.replace('px', ''), "top": top.replace('px', '')})
left_values = [float(item['left']) for item in marker_positions]
top_values = [float(item['top']) for item in marker_positions]
min_left = min(left_values)
min_top = min(top_values)
max_left = max(left_values)
max_top = max(top_values)
print('截图')
if element:
bounding_box = await element.bounding_box()
if bounding_box:
bound_x = bounding_box['x']
bound_y = bounding_box['y']
x = min_left - 200 if min_left - 200 > 0 else 0
y = min_top - 70 if min_top - 70 > 0 else 0
width = max_left + 250 - x if max_left + 250 < bounding_box['width'] else bounding_box[
'width'] - x
height = max_top + 70 - y if max_top + 70 < bounding_box['height'] else bounding_box[
'height'] - y
await page.screenshot(path=file_dir,
clip={
'x': x + bound_x,
'y': y + bound_y,
'width': width,
'height': height
})
return file_dir, None
await browser.close()
return None, None
except Exception as error:
await browser.close()
return None, error
async def _screenshot_report_img(attrs: List[CutReportImgColumn], nodeid, area_id, userid, role):
"""
路口对比报告截图
"""
image_map = {}
if len(attrs) <= 0:
return {}, None
image_dir = os.path.dirname(os.path.abspath(__file__))
for item_attr in attrs:
date_list_str = ''.join([date for sublist in item_attr.time_list for date in sublist])
file_name = f"{item_attr.crossid}{date_list_str}{item_attr.tp_name[:11]}"
file_dir_full = f"{image_dir}/../temp/{file_name}.jpg"
image_map['cross'] = file_dir_full
if item_attr.walk == 1:
#人行横道
file_name = f"{item_attr.crossid}{date_list_str}{item_attr.tp_name[:11]}_walk"
file_dir_full = f"{image_dir}/../temp/{file_name}.jpg"
image_map['walk'] = file_dir_full
if item_attr.delay == 1:
#延误
file_name = f"{item_attr.crossid}{date_list_str}{item_attr.tp_name[:11]}_delay"
file_dir_full = f"{image_dir}/../temp/{file_name}.jpg"
image_map['delay'] = file_dir_full
if item_attr.flow_in == 1:
#分流
file_name = f"{item_attr.crossid}{date_list_str}{item_attr.tp_name[:11]}_flow_in"
file_dir_full = f"{image_dir}/../temp/{file_name}.jpg"
image_map['flow_in'] = file_dir_full
exists = True
for index, image_file_path in image_map.items():
if not os.path.exists(image_file_path):
exists = False
if exists:
return image_map, None
try:
async with async_playwright() as p:
# 启动浏览器
browser = await p.chromium.launch(headless=True,
args=['--window-size=1900,800',
'--no-sandbox',
'--disable-setuid-sandbox',
'--disable-gpu',
'--enable-gpu-rasterization',
'--ignore-gpu-blocklist',
'--enable-zero-copy', '--disable-software-rasterizer',
'--disable-extensions',
'--disable-plugins', '--disable-hardware-media-key', '--no-zygote',
'--process-per-site', '--disable-background-networking',
'--disable-sync',
'--disable-background-timer-throttling'])
print('打开页面')
page = await browser.new_page() # 新建页面
cookies = [
{
'name': 'cross_manage_nodeid',
'value': str(nodeid),
'domain': '101.42.25.182',
'path': '/',
},
{
'name': 'cross_manage_areaid',
'value': str(area_id),
'domain': '101.42.25.182',
'path': '/',
},
{
'name': 'cross_manage_token',
'value': "iuqwefhjdbcsajhdshcgaiudncjadhajn_15001383357",
'domain': '101.42.25.182',
'path': '/',
},
{
'name': 'cross_manage_userid',
'value': userid,
'domain': '101.42.25.182',
'path': '/',
},
]
await page.context.add_cookies(cookies)
await page.set_viewport_size({'width': 1200, 'height': 860})
await asyncio.sleep(2)
await page.goto(f"http://101.42.25.182:8085/page/diagnosis/detail", timeout=120000)
print('等待amap')
await page.wait_for_selector('#amap')
await page.wait_for_function('document.readyState === "complete"')
await page.wait_for_selector('.amap-container')
await page.wait_for_function('document.querySelector(".amap-container") !== null')
print('等待网络')
await page.wait_for_load_state('load')
await page.wait_for_load_state('networkidle')
print('等待amap-layer-overlay')
await page.wait_for_selector('.el-loading-mask', state="hidden")
await page.wait_for_selector(".el-loading-spinner", state="hidden")
await page.wait_for_selector('.amap-layer-overlay', timeout=30000)
print("滚动到下面")
info_detail = await page.query_selector('.info-detail')
if info_detail:
await info_detail.evaluate('element => element.scrollTop = element.scrollHeight')
print("等到图片加载完成")
image_locators = await page.locator('img').all()
for img_locator in image_locators:
is_loaded = await img_locator.evaluate('(img) => img.complete') # 检查图片是否加载完成
if not is_loaded:
await img_locator.wait_for(state='attached')
print("选择路口....")
input_element = await page.query_selector("#rc_select_0")
await input_element.click()
await input_element.fill(attrs[0].cross_name)
print("输入路口....")
await page.wait_for_selector('.rc-virtual-list-holder-inner')
dev_select = await page.query_selector(".rc-virtual-list-holder-inner")
await dev_select.click()
print("点击路口....")
for index, item_time in enumerate(attrs[0].time_list):
time_select = page.locator('input[placeholder="开始日期"]')
#time_select = await page.query_selector(".ant-picker-input.ant-picker-input-active")
await time_select.click()
locator = page.locator('div.ant-picker-date-panel')
locator_elements = await locator.all()
if len(locator_elements) != 2:
await browser.close()
return {}, '时间选择器不存在'
start_elements = locator_elements[0]
end_elements = locator_elements[1]
start = item_time[0]
end = item_time[1]
#找开始时间
start_td = start_elements.locator('td')
start_td_elements = await start_td.all()
start_td_title = await start_td_elements[0].get_attribute('title')
start_button = start_elements.locator('button.ant-picker-header-prev-btn')
#结束日期
end_td = end_elements.locator('td')
end_td_elements = await end_td.all()
end_td_title = await end_td_elements[-1].get_attribute('title')
end_button = end_elements.locator('ant-picker-header-next-btn')
while start < start_td_title:
await start_button.click()
start_td = start_elements.locator('td')
start_td_elements = await start_td.all()
start_td_title = await start_td_elements[0].get_attribute('title')
while start > end_td_title:
await end_button.click()
end_td = end_elements.locator('td')
end_td_elements = await end_td.all()
end_td_title = await end_td_elements[-1].get_attribute('title')
start_click = False
#开始时间所在区域
for td_element in start_td_elements:
title = await td_element.get_attribute('title') # 获取 title 属性
if title == start:
start_click = True
await td_element.click()
break
# 开始时间所在区域
if not start_click:
for td_element in end_td_elements:
title = await td_element.get_attribute('title') # 获取 title 属性
if title == start:
start_click = True
await td_element.click()
break
# 找结束时间
start_td = start_elements.locator('td')
start_td_elements = await start_td.all()
# 结束日期
end_td = end_elements.locator('td')
end_td_elements = await end_td.all()
at_start = False
# 结束日期在开始时间所在区域
for td_element in start_td_elements:
title = await td_element.get_attribute('title') # 获取 title 属性
if title == end:
at_start = True
await td_element.click()
break
if not at_start:
# 结束日期在结束时间所在区域
for td_element in end_td_elements:
title = await td_element.get_attribute('title') # 获取 title 属性
if title == end:
await td_element.click()
break
button = page.locator('button.ant-btn.ant-btn-circle.ant-btn-primary.ant-btn-sm.ant-btn-icon-only')
while not await button.is_visible() or not await button.is_enabled():
await asyncio.sleep(0.1) # 等待 100 毫秒再检查
await button.click()
input_select_1 = page.locator('input#rc_select_1')
await input_select_1.click()
tp_divs = page.locator('div.ant-cascader-menu-item-content')
tp_div_elements = await tp_divs.all()
print("诊断时段....")
for item_tp_div in tp_div_elements:
#title = await item_tp_div.get_attribute('title')
title = await item_tp_div.text_content()
if title == attrs[0].tp_name or title == '小时时段':
if title == '小时时段':
await item_tp_div.click()
tp_divs_hour = page.locator('div.ant-cascader-menu-item-content')
tp_div_elements_hour = await tp_divs_hour.all()
for item_tp_div_hour in tp_div_elements_hour:
title2 = await item_tp_div_hour.text_content()
if title2 == attrs[0].tp_name:
await item_tp_div_hour.click()
else:
await item_tp_div.click()
print('点击查询')
search = page.locator('button.ant-btn.ant-btn-primary.ant-btn-sm').filter(has_text="查 询")
await search.click()
print('查询等待网络')
await page.wait_for_load_state('load')
await page.wait_for_load_state('networkidle', timeout=30000)
await page.wait_for_selector('.ant-spin-container.ant-spin-blur', state='visible', timeout=30000)
await page.wait_for_selector('.ant-spin-container.ant-spin-blur', state='hidden', timeout=30000)
await page.wait_for_selector('.css-1u7g04k.ant-spin.ant-spin-spinning', state='hidden',timeout=30000)
# 额外等待动画完成
await page.wait_for_timeout(300)
#await page.locator('div.clear').click()
print("去掉服务水平")
clear_buttons = await page.query_selector_all('button:has-text("清空")')
for item_clear_button in clear_buttons:
await item_clear_button.click()
await page.wait_for_timeout(500)
server = await page.query_selector('.ant-checkbox-wrapper.ant-checkbox-wrapper-checked.css-1u7g04k')
await server.click()
await page.wait_for_selector(".cross-level", state="hidden")
await page.wait_for_timeout(500)
if image_map.get('cross'):
element = await page.query_selector('.amap-layer-overlay')
cross_bounding_box = await element.bounding_box()
await page.screenshot(path=image_map['cross'],
clip={
'x': cross_bounding_box['x'],
'y': cross_bounding_box['y'],
'width': cross_bounding_box['width'],
'height': cross_bounding_box['height'] - 45
})
print('全屏')
all_screen = page.locator('button.css-1u7g04k.ant-btn.ant-btn-link').filter(has_text="全屏")
await all_screen.click()
await page.wait_for_timeout(500)
ledger_element = await page.query_selector('.ledger-box')
bounding_box = await ledger_element.bounding_box()
x_offset = 170
x_width = 850
#去掉方案异常路口显示
for image_key , item_image_path in image_map.items():
#人行横道
if image_key == 'walk':
print("人行横道截图")
clear_buttons = await page.query_selector_all('button:has-text("清空")')
for item_clear_button in clear_buttons:
await item_clear_button.click()
await page.wait_for_timeout(500)
flow_buttons = await page.query_selector_all('.ant-checkbox-wrapper.css-1u7g04k:has-text("人行横道距离")')
for item_clear_button in flow_buttons:
await item_clear_button.click()
await page.wait_for_timeout(500)
await page.screenshot(path=item_image_path,
clip={
'x': bounding_box['x'] + x_offset,
'y': bounding_box['y'],
'width': x_width,
'height': bounding_box['height']
})
# 延误
if image_key == 'delay':
print("延误截图")
clear_buttons = await page.query_selector_all('button:has-text("清空")')
for item_clear_button in clear_buttons:
await item_clear_button.click()
await page.wait_for_timeout(500)
clear_buttons = await page.query_selector_all('.ant-radio-wrapper.css-1u7g04k:has-text("延误")')
for item_clear_button in clear_buttons:
await item_clear_button.click()
await page.wait_for_timeout(500)
await page.screenshot(path=item_image_path,
clip={
'x': bounding_box['x'] + x_offset,
'y': bounding_box['y'],
'width': x_width,
'height': bounding_box['height']
})
# 汇入截图
if image_key == 'flow_in':
print("分流截图")
clear_buttons = await page.query_selector_all('button:has-text("清空")')
for item_clear_button in clear_buttons:
await item_clear_button.click()
await page.wait_for_timeout(500)
flow_in_buttons = await page.query_selector_all('.ant-radio-wrapper.css-1u7g04k:has-text("分流")')
for item_clear_button in flow_in_buttons:
await item_clear_button.click()
await page.wait_for_timeout(500)
await page.screenshot(path=item_image_path,
clip={
'x': bounding_box['x'] + x_offset,
'y': bounding_box['y'],
'width': x_width,
'height': bounding_box['height']
})
await browser.close()
return image_map, None
except Exception as error:
await browser.close()
return {}, error
async def _screenshot_wave_monitor_img(attrs: List[CutMonitorImgColumn], userid, role):
image_list = []
result = []
if len(attrs) <= 0:
return result, None
image_dir = os.path.dirname(os.path.abspath(__file__))
cut = False
for item_attr in attrs:
file_name = f"{item_attr.wave_id}{item_attr.week_or_day}{item_attr.date_name}"
file_dir_full = f"{image_dir}/../temp/{file_name}.jpg"
if not os.path.exists(file_dir_full):
file_dir_full = ''
cut = True
image_list.append({
'attr': item_attr,
'img': file_dir_full
})
if not cut:
for item_image in image_list:
result.append(item_image['img'])
return result, None
try:
async with async_playwright() as p:
# 启动浏览器
browser = await p.chromium.launch(headless=True,
args=['--window-size=1200,800', '--no-sandbox',
'--disable-setuid-sandbox',
'--disable-gpu',
'--enable-gpu-rasterization',
'--ignore-gpu-blocklist',
'--enable-zero-copy', '--disable-software-rasterizer',
'--disable-extensions',
'--disable-plugins', '--disable-hardware-media-key', '--no-zygote',
'--process-per-site', '--disable-background-networking',
'--disable-sync',
'--disable-background-timer-throttling'
])
print('打开页面')
page = await browser.new_page() # 新建页面
cookies = [
{
'name': 'manage_token',
'value': userid,
'domain': '101.42.25.182',
'path': '/',
},
{
'name': 'manage_userid',
'value': userid,
'domain': '101.42.25.182',
'path': '/',
},
]
await page.context.add_cookies(cookies)
await page.set_viewport_size({'width': 1200, 'height': 700})
await asyncio.sleep(2)
await page.goto(f"{domain}page/wave_monitor/situation", timeout=120000)
#await page.goto(f"http://82.157.173.20:8082/page/wave_monitor/situation", timeout=120000)
print('等待amap')
await page.wait_for_selector('#amap')
await page.wait_for_function('document.readyState === "complete"')
await page.wait_for_selector('.amap-container')
await page.wait_for_function('document.querySelector(".amap-container") !== null')
print('等待网络')
await page.wait_for_load_state('load')
await page.wait_for_load_state('networkidle')
print('等待amap-layer-overlay')
await page.wait_for_selector('.el-loading-mask', state="hidden")
await page.wait_for_selector(".el-loading-spinner", state="hidden")
await page.wait_for_selector('.amap-layer-overlay', timeout=30000)
for item_image_index, item_image in enumerate(image_list):
if item_image['img'] != '':
continue
item_file_name = f"{item_image['attr'].wave_id}{item_image['attr'].week_or_day}{item_image['attr'].date_name}"
item_file_dir_full = f"{image_dir}/../temp/{item_file_name}.jpg"
print("选择绿波....")
if item_image_index > 0:
print('重载')
await page.reload()
print('等待amap')
await page.wait_for_selector('#amap')
await page.wait_for_function('document.readyState === "complete"')
await page.wait_for_selector('.amap-container')
await page.wait_for_function('document.querySelector(".amap-container") !== null')
print('等待网络')
await page.wait_for_load_state('load')
await page.wait_for_load_state('networkidle')
print('等待amap-layer-overlay')
await page.wait_for_selector('.el-loading-mask', state="hidden")
await page.wait_for_selector(".el-loading-spinner", state="hidden")
await page.wait_for_selector('.amap-layer-overlay', timeout=30000)
input_element = await page.query_selector("#rc_select_0")
await input_element.click()
await input_element.fill(item_image['attr'].wave_name)
print("输入绿波....")
await page.wait_for_selector('.rc-virtual-list-holder-inner')
dev_select = await page.query_selector(".rc-virtual-list-holder-inner")
await dev_select.click()
print("点击周日....")
week_or_day_span = await page.query_selector('.ant-input-group.css-1u7g04k.ant-input-group-compact')
week_or_day = await week_or_day_span.query_selector(
'.ant-select.ant-select-sm.css-1u7g04k.ant-select-single.ant-select-show-arrow')
await week_or_day.click()
divs = page.locator('div.ant-select-item-option-content')
divs_all = await divs.all()
for item_div in divs_all:
text = await item_div.text_content()
print(text)
if text == '' and item_image['attr'].week_or_day == 1:
await item_div.click()
break
if text == '' and item_image['attr'].week_or_day == 2:
await item_div.click()
break
locator = await page.query_selector('div.ant-picker.ant-picker-small.css-1u7g04k')
await locator.click()
time_selector = page.locator('.ant-picker-cell:not(.ant-picker-cell-week)')
time_selector_elements = await time_selector.all()
start_date = await time_selector_elements[0].get_attribute('title')
end_date = await time_selector_elements[-1].get_attribute('title')
search_date = item_image['attr'].date_name
if item_image['attr'].week_or_day == 2:
search_date = get_date_of_week(item_image['attr'].date_name)
while search_date < start_date:
start_button = page.locator('button.ant-picker-header-prev-btn')
await start_button.click()
time_selector = page.locator('.ant-picker-cell:not(.ant-picker-cell-week)')
time_selector_elements = await time_selector.all()
start_date = await time_selector_elements[0].get_attribute('title')
end_date = await time_selector_elements[-1].get_attribute('title')
while search_date > end_date:
end_button = page.locator('button.ant-picker-header-next-btn')
await end_button.click()
time_selector = page.locator('.ant-picker-cell:not(.ant-picker-cell-week)')
time_selector_elements = await time_selector.all()
start_date = await time_selector_elements[0].get_attribute('title')
end_date = await time_selector_elements[-1].get_attribute('title')
day_list = []
#日
if item_image['attr'].week_or_day == 1:
day_list.append(search_date)
#周
if item_image['attr'].week_or_day == 2:
day_list = get_week_dates(search_date)
print(day_list)
time_selector = page.locator(
'.ant-picker-cell:not(.ant-picker-cell-week):not(.ant-picker-cell-disabled)')
time_selector_elements = await time_selector.all()
for item_time_selector in time_selector_elements:
title = await item_time_selector.get_attribute('title')
if title in day_list:
await item_time_selector.click()
break
tp_div = await page.query_selector('.ant-picker.ant-picker-range.ant-picker-small.css-1u7g04k')
#开始时间
start_tp = await tp_div.query_selector('input[placeholder="开始时间"]')
await start_tp.click()
tp_time = item_image['attr'].tp_start.split(':')
start_ul = page.locator(
'div.ant-picker-panel.ant-picker-panel-has-range ul.ant-picker-time-panel-column')
uls = await start_ul.all()
for index, item_ul in enumerate(uls):
li_divs = item_ul.locator('div.ant-picker-time-panel-cell-inner')
tp_divs = await li_divs.all()
for item_start_div in tp_divs:
title = await item_start_div.text_content()
# print(tp_time[index],title)
if tp_time[index] == title:
await item_start_div.click()
break
li_button = await page.query_selector(
'ul.ant-picker-ranges li.ant-picker-ok button.css-1u7g04k.ant-btn.ant-btn-primary.ant-btn-sm')
await li_button.click()
# 结束时间
# end_tp = await tp_div.query_selector('input[placeholder="结束时间"]')
# await end_tp.click()
# time.sleep(100000)
tp_time = item_image['attr'].tp_end.split(':')
end_ul = page.locator(
'div.ant-picker-panel.ant-picker-panel-has-range ul.ant-picker-time-panel-column')
end_uls = await end_ul.all()
for index, item_ul in enumerate(end_uls):
li_divs = item_ul.locator('div.ant-picker-time-panel-cell-inner')
tp_divs = await li_divs.all()
for item_start_div in tp_divs:
title = await item_start_div.text_content()
# print(tp_time[index], title)
if tp_time[index] == title:
await item_start_div.click()
break
li_button = await page.query_selector(
'ul.ant-picker-ranges li.ant-picker-ok button.css-1u7g04k.ant-btn.ant-btn-primary.ant-btn-sm')
await li_button.click()
right_button = await page.query_selector(
'div.custom-right-panel span.anticon.anticon-right-square')
if right_button:
await right_button.evaluate('element => element.click()')
bottom_button = await page.query_selector(
'span.anticon.anticon-down-square')
if bottom_button:
await bottom_button.evaluate('element => element.click()')
print('查询等待网络')
search_button = await page.query_selector(
'div.ant-col.ant-col-offset-1.css-1u7g04k button.css-1u7g04k.ant-btn.ant-btn-primary.ant-btn-sm')
await search_button.click()
print('查询等待网络')
await page.wait_for_load_state('load')
await page.wait_for_load_state('networkidle')
print('等待图片加载完成')
image_locators = await page.locator('img').all()
# 检查每个图片元素的加载状态
for img_locator in image_locators:
is_loaded = await img_locator.evaluate('(img) => img.complete') # 检查图片是否加载完成
if not is_loaded:
await img_locator.wait_for(state='attached')
print('查询等待amap-layer-overlay')
await page.wait_for_selector('.el-loading-mask', state="hidden")
await page.wait_for_selector(".el-loading-spinner", state="hidden")
element = await page.query_selector('.amap-layer-overlay')
# 去掉方案异常路口显示
tp_divs = page.locator('div.ant-col.ant-col-8.css-1u7g04k input[type="checkbox"]')
tp_div_elements = await tp_divs.all()
for item_tp_div_elements in tp_div_elements:
if await item_tp_div_elements.is_checked():
await item_tp_div_elements.click()
#markers = await page.query_selector_all('.amap-marker')
print('截图')
if element:
bounding_box = await element.bounding_box()
if bounding_box:
bound_x = bounding_box['x']
bound_y = bounding_box['y']
await page.screenshot(path=item_file_dir_full,
clip={
'x': bound_x,
'y': bound_y,
'width': bounding_box['width'],
'height': bounding_box['height'] - 30
})
item_image['img'] = item_file_dir_full
await browser.close()
result = []
for item_image_list in image_list:
result.append(item_image_list['img'])
return result, None
except Exception as error:
await browser.close()
return [], error
def get_week_dates(date_str):
# 将给定的日期字符串转换为日期对象
given_date = datetime.datetime.strptime(date_str, '%Y-%m-%d')
# 计算本周的周一日期
start_of_week = given_date - datetime.timedelta(days=given_date.weekday())
# 获取本周周一到周日的日期列表
week_dates = [start_of_week + datetime.timedelta(days=i) for i in range(7)]
# 将日期格式化为字符串列表('YYYY-MM-DD'
week_dates_str = [date.strftime('%Y-%m-%d') for date in week_dates]
return week_dates_str
def get_date_of_week(year_week_str):
year_str, week_str = year_week_str.split('-')
year = int(year_str)
week = int(week_str)
first_day_of_year = datetime.date(year, 1, 1)
first_day_iso = first_day_of_year.isocalendar()
delta_weeks = week - first_day_iso[1]
first_monday = first_day_of_year + datetime.timedelta(weeks=delta_weeks, days=-first_day_of_year.weekday())
return first_monday.strftime('%Y-%m-%d')
def cut_screenshot_img(wave_id, wave_name, time_list, tp_name, high=0, odi=0, divide=0, remit=0):
error = None
for i in range(0, 3):
if i > 0:
print(f"重试{i}")
image, error = screenshot_wave_img_v3(wave_id, wave_name, time_list, tp_name, high, odi, divide, remit)
if not error:
return image, None
time.sleep(1)
return None, error
def cut_report_img(attrs: List[CutReportImgColumn], nodeid, area_id, userid='15836903493', role='manager',
filter_date: List[str] = None):
"""路口对比报告截图"""
error = None
image = []
print('参数', vars(attrs[0]),nodeid, area_id)
for i in range(0, 3):
if i > 0:
print(f"重试{i}")
image, error = screenshot_report_img(attrs, nodeid, area_id, userid, role)
if not error:
return image, None
time.sleep(1)
print(error)
return image, error
def cut_monitor_img(attrs: List[CutMonitorImgColumn], userid='15836903493', role='manager'):
error = None
image = []
for i in range(0, 3):
if i > 0:
print(f"重试{i}")
image, error = screenshot_wave_monitor_img(attrs, userid, role)
if not error:
return image, None
time.sleep(1)
return image, error
if __name__ == '__main__':
#路口对比报告截图
attrs = []
attrs.append(CutReportImgColumn(
crossid='CR_10264777_2509810',
cross_name='104国道与178县道交叉口',
time_list=[['2026-05-01', '2026-05-04'],['2026-05-06', '2026-05-09']],
tp_name='02:00-03:00',
walk=1,
delay=1,
flow_in=1
))
image_path, err = cut_report_img(attrs, 530100, 530101)
print(image_path)
exit(0)
#绿波巡检截图
attrs = []
attrs.append(CutMonitorImgColumn(
wave_id='CR_10265182_2496051@3@1770800675',
wave_name='大同33条',
week_or_day=2, #1日2周
date_name='2026-12', #2025-07-062025-27
tp_start='07:00',
tp_end='09:00'))
image_path, err = cut_monitor_img(attrs)
print(image_path, err)