1010 lines
50 KiB
Python
1010 lines
50 KiB
Python
import asyncio
|
||
import datetime
|
||
import os
|
||
import platform
|
||
import random
|
||
import threading
|
||
import time
|
||
from typing import List
|
||
|
||
import requests
|
||
from flask import jsonify
|
||
from playwright.async_api import async_playwright
|
||
from playwright.sync_api import sync_playwright
|
||
from PIL import Image
|
||
|
||
domain = 'http://101.42.25.182:8082/'
|
||
|
||
|
||
def get_os_type():
|
||
system = platform.system()
|
||
if system == 'Darwin':
|
||
return 'macOS'
|
||
elif system == 'Linux':
|
||
try:
|
||
import distro
|
||
if distro.name() == 'Ubuntu':
|
||
return 'Ubuntu'
|
||
return 'Linux'
|
||
except ImportError:
|
||
return 'Linux'
|
||
elif system == 'Windows':
|
||
return 'Windows'
|
||
else:
|
||
return 'Unknown OS'
|
||
|
||
|
||
class CutMonitorImgColumn:
|
||
def __init__(self, wave_id, wave_name, week_or_day, date_name, tp_start, tp_end):
|
||
self.wave_id = wave_id
|
||
self.wave_name = wave_name
|
||
self.week_or_day = week_or_day #1日,2周
|
||
self.date_name = date_name
|
||
self.tp_start = tp_start
|
||
self.tp_end = tp_end
|
||
|
||
|
||
class CutReportImgColumn:
|
||
def __init__(self, crossid, cross_name, time_list, tp_name, walk=0, delay=0, flow_in=0):
|
||
self.crossid = crossid
|
||
self.cross_name = cross_name
|
||
self.time_list = time_list
|
||
self.tp_name = tp_name
|
||
self.walk = walk
|
||
self.delay = delay
|
||
self.flow_in = flow_in
|
||
|
||
|
||
class ScreenError(Exception):
|
||
def __init__(self, message):
|
||
self.message = message
|
||
|
||
def __str__(self):
|
||
return f"CustomError: {self.message}"
|
||
|
||
|
||
def get_request_host(nodeid):
|
||
response = requests.get(f"http://101.42.25.182:9999/api/route?nodeid={nodeid}")
|
||
if response.status_code == 200:
|
||
return response.json(), None
|
||
|
||
return None, 'Failed to fetch data'
|
||
|
||
|
||
def screenshot_wave_img_v3(wave_id, wave_name, time_list, tp_name, high=0, odi=0, divide=0, remit=0):
|
||
return asyncio.run(_screenshot_wave_img_v3(wave_id, wave_name, time_list, tp_name, high, odi, divide, remit))
|
||
|
||
|
||
def screenshot_report_img(attrs: List[CutReportImgColumn], nodeid, area_id, userid, role):
|
||
return asyncio.run(_screenshot_report_img(attrs, nodeid, area_id, userid, role))
|
||
|
||
|
||
def screenshot_wave_monitor_img(attrs: List[CutMonitorImgColumn], userid, role):
|
||
return asyncio.run(_screenshot_wave_monitor_img(attrs, userid, role))
|
||
|
||
|
||
async def _screenshot_wave_img_v3(wave_id, wave_name, time_list, tp_name, high=0, odi=0, divide=0, remit=0):
|
||
image_dir = os.path.dirname(os.path.abspath(__file__))
|
||
|
||
file_name = f"{wave_id}{tp_name[:11]}{high}{odi}{divide}{remit}"
|
||
for item_time in time_list:
|
||
file_name += f"{item_time[0]}{item_time[1]}"
|
||
file_dir = f"{image_dir}/../temp/{file_name}.jpg"
|
||
if os.path.exists(file_dir):
|
||
return file_dir, None
|
||
try:
|
||
async with async_playwright() as p:
|
||
# 启动浏览器
|
||
browser = await p.chromium.launch(headless=True,
|
||
args=['--window-size=1900,768', '--no-sandbox',
|
||
'--disable-setuid-sandbox',
|
||
'--disable-gpu'])
|
||
page = await browser.new_page() # 新建页面
|
||
|
||
# 这里模拟获取 host 响应的方式
|
||
#response, error = get_request_host(nodeid) # 确保这个函数能返回正确的响应
|
||
# 设置浏览器的视口大小
|
||
await page.set_viewport_size({'width': 1900, 'height': 768})
|
||
|
||
await asyncio.sleep(2)
|
||
|
||
# 打开目标网页
|
||
print('打开页面')
|
||
await page.goto(f"http://101.42.25.182:8082/page/wave_evaluate/evaluate", timeout=120000)
|
||
# 等待元素加载
|
||
print('等待amap')
|
||
await page.wait_for_selector('#amap')
|
||
|
||
await page.wait_for_function('document.readyState === "complete"')
|
||
await page.wait_for_selector('.amap-container')
|
||
await page.wait_for_function('document.querySelector(".amap-container") !== null')
|
||
#await page.wait_for_function('window.AMap && window.AMap.Map')
|
||
# await page.wait_for_function('window.AMap && window.AMap.Map')
|
||
print('等待网络')
|
||
await page.wait_for_load_state('load')
|
||
await page.wait_for_load_state('networkidle')
|
||
|
||
print('等待amap-layer-overlay')
|
||
await page.wait_for_selector('.el-loading-mask', state="hidden")
|
||
await page.wait_for_selector(".el-loading-spinner", state="hidden")
|
||
await page.wait_for_selector('.amap-layer-overlay', timeout=30000)
|
||
print("右缩放")
|
||
right = await page.query_selector('.anticon.anticon-right-square')
|
||
while not await right.is_visible() or not await right.is_enabled():
|
||
await asyncio.sleep(0.1) # 等待 100 毫秒再检查
|
||
await right.click()
|
||
await page.wait_for_selector(".metrics", state="hidden")
|
||
#await asyncio.sleep(1)
|
||
print("下缩放")
|
||
bottom = await page.query_selector('.anticon.anticon-down-square')
|
||
while not await bottom.is_visible() or not await bottom.is_enabled():
|
||
await asyncio.sleep(0.1) # 等待 100 毫秒再检查
|
||
await bottom.click()
|
||
await page.wait_for_selector(".b-panel", state="hidden")
|
||
#await asyncio.sleep(1)
|
||
print("选择绿波....")
|
||
input_element = await page.query_selector("#rc_select_0")
|
||
await input_element.click()
|
||
await input_element.fill(wave_name)
|
||
print("输入绿波....")
|
||
|
||
await page.wait_for_selector('.rc-virtual-list-holder-inner')
|
||
dev_select = await page.query_selector(".rc-virtual-list-holder-inner")
|
||
await dev_select.click()
|
||
print("点击绿波....")
|
||
|
||
if high == 1:
|
||
checkbox_locator = page.locator('input[type="checkbox"][value="high_frequency"]')
|
||
await checkbox_locator.click()
|
||
|
||
if odi == 1:
|
||
checkbox_locator = page.locator('input[type="checkbox"][value="ODI"]')
|
||
await checkbox_locator.click()
|
||
|
||
if divide == 1:
|
||
checkbox_locator = page.locator('input[type="radio"][value="forward-out_ts_info"]')
|
||
await checkbox_locator.click()
|
||
|
||
if divide == 2:
|
||
checkbox_locator = page.locator('input[type="radio"][value="backward-out_ts_info"]')
|
||
await checkbox_locator.click()
|
||
|
||
if remit == 1:
|
||
checkbox_locator = page.locator('input[type="radio"][value="forward-in_ts_info"]')
|
||
await checkbox_locator.click()
|
||
|
||
if remit == 2:
|
||
checkbox_locator = page.locator('input[type="radio"][value="backward-in_ts_info"]')
|
||
await checkbox_locator.click()
|
||
|
||
for index, item_time in enumerate(time_list):
|
||
time_select = page.locator('input[placeholder="开始日期"]')
|
||
#time_select = await page.query_selector(".ant-picker-input.ant-picker-input-active")
|
||
await time_select.click()
|
||
locator = page.locator('div.ant-picker-date-panel')
|
||
locator_elements = await locator.all()
|
||
if len(locator_elements) != 2:
|
||
await browser.close()
|
||
return '时间选择器不存在'
|
||
start_elements = locator_elements[0]
|
||
end_elements = locator_elements[1]
|
||
|
||
start = item_time[0]
|
||
end = item_time[1]
|
||
#找开始时间
|
||
start_td = start_elements.locator('td')
|
||
start_td_elements = await start_td.all()
|
||
start_td_title = await start_td_elements[0].get_attribute('title')
|
||
start_button = start_elements.locator('button.ant-picker-header-prev-btn')
|
||
#结束日期
|
||
end_td = end_elements.locator('td')
|
||
end_td_elements = await end_td.all()
|
||
end_td_title = await end_td_elements[-1].get_attribute('title')
|
||
end_button = end_elements.locator('ant-picker-header-next-btn')
|
||
while start < start_td_title:
|
||
await start_button.click()
|
||
start_td = start_elements.locator('td')
|
||
start_td_elements = await start_td.all()
|
||
start_td_title = await start_td_elements[0].get_attribute('title')
|
||
|
||
while start > end_td_title:
|
||
await end_button.click()
|
||
end_td = end_elements.locator('td')
|
||
end_td_elements = await end_td.all()
|
||
end_td_title = await end_td_elements[-1].get_attribute('title')
|
||
|
||
#开始时间所在区域
|
||
for td_element in start_td_elements:
|
||
title = await td_element.get_attribute('title') # 获取 title 属性
|
||
if title == start:
|
||
await td_element.click()
|
||
break
|
||
|
||
# 开始时间所在区域
|
||
for td_element in end_td_elements:
|
||
title = await td_element.get_attribute('title') # 获取 title 属性
|
||
if title == start:
|
||
await td_element.click()
|
||
break
|
||
|
||
# 找结束时间
|
||
start_td = start_elements.locator('td')
|
||
start_td_elements = await start_td.all()
|
||
# 结束日期
|
||
end_td = end_elements.locator('td')
|
||
end_td_elements = await end_td.all()
|
||
|
||
at_start = False
|
||
# 结束日期在开始时间所在区域
|
||
for td_element in start_td_elements:
|
||
title = await td_element.get_attribute('title') # 获取 title 属性
|
||
if title == end:
|
||
at_start = True
|
||
await td_element.click()
|
||
break
|
||
if not at_start:
|
||
# 结束日期在结束时间所在区域
|
||
for td_element in end_td_elements:
|
||
title = await td_element.get_attribute('title') # 获取 title 属性
|
||
if title == end:
|
||
await td_element.click()
|
||
break
|
||
button = page.locator('button.ant-btn.ant-btn-circle.ant-btn-primary.ant-btn-sm.ant-btn-icon-only')
|
||
while not await button.is_visible() or not await button.is_enabled():
|
||
await asyncio.sleep(0.1) # 等待 100 毫秒再检查
|
||
await button.click()
|
||
|
||
input_select_1 = page.locator('input#rc_select_1')
|
||
await input_select_1.click()
|
||
tp_divs = page.locator('div.ant-select-item.ant-select-item-option')
|
||
tp_div_elements = await tp_divs.all()
|
||
for item_tp_div in tp_div_elements:
|
||
title = await item_tp_div.get_attribute('title')
|
||
if title == tp_name:
|
||
await item_tp_div.click()
|
||
search = await page.query_selector(
|
||
'.css-1u7g04k.ant-btn.ant-btn-primary.ant-btn-sm:not(.ant-btn-circle)')
|
||
while not await search.is_visible() or not await search.is_enabled():
|
||
await asyncio.sleep(0.1) # 等待 100 毫秒再检查
|
||
await search.click()
|
||
print('查询等待网络')
|
||
await page.wait_for_load_state('load')
|
||
await page.wait_for_load_state('networkidle')
|
||
if divide > 0 or remit > 0:
|
||
print('等待转向比')
|
||
await page.wait_for_selector('div.flow-container', state='attached')
|
||
await page.wait_for_selector('div.flow-container >> :visible', timeout=10000)
|
||
print('等待图片加载完成')
|
||
image_locators = await page.locator('img').all()
|
||
# 检查每个图片元素的加载状态
|
||
for img_locator in image_locators:
|
||
is_loaded = await img_locator.evaluate('(img) => img.complete') # 检查图片是否加载完成
|
||
if not is_loaded:
|
||
await img_locator.wait_for(state='attached')
|
||
print('查询等待amap-layer-overlay')
|
||
await page.wait_for_selector('.el-loading-mask', state="hidden")
|
||
await page.wait_for_selector(".el-loading-spinner", state="hidden")
|
||
element = await page.query_selector('.amap-layer-overlay')
|
||
markers = await page.query_selector_all('.amap-marker')
|
||
marker_positions = []
|
||
for marker in markers:
|
||
# 获取 left 和 top 样式属性
|
||
left = await marker.evaluate("el => window.getComputedStyle(el).left")
|
||
top = await marker.evaluate("el => window.getComputedStyle(el).top")
|
||
marker_positions.append({"left": left.replace('px', ''), "top": top.replace('px', '')})
|
||
left_values = [float(item['left']) for item in marker_positions]
|
||
top_values = [float(item['top']) for item in marker_positions]
|
||
min_left = min(left_values)
|
||
min_top = min(top_values)
|
||
max_left = max(left_values)
|
||
max_top = max(top_values)
|
||
print('截图')
|
||
if element:
|
||
bounding_box = await element.bounding_box()
|
||
if bounding_box:
|
||
bound_x = bounding_box['x']
|
||
bound_y = bounding_box['y']
|
||
x = min_left - 200 if min_left - 200 > 0 else 0
|
||
y = min_top - 70 if min_top - 70 > 0 else 0
|
||
width = max_left + 250 - x if max_left + 250 < bounding_box['width'] else bounding_box[
|
||
'width'] - x
|
||
height = max_top + 70 - y if max_top + 70 < bounding_box['height'] else bounding_box[
|
||
'height'] - y
|
||
await page.screenshot(path=file_dir,
|
||
clip={
|
||
'x': x + bound_x,
|
||
'y': y + bound_y,
|
||
'width': width,
|
||
'height': height
|
||
})
|
||
return file_dir, None
|
||
await browser.close()
|
||
return None, None
|
||
except Exception as error:
|
||
await browser.close()
|
||
return None, error
|
||
|
||
|
||
async def _screenshot_report_img(attrs: List[CutReportImgColumn], nodeid, area_id, userid, role):
|
||
"""
|
||
路口对比报告截图
|
||
"""
|
||
image_map = {}
|
||
if len(attrs) <= 0:
|
||
return {}, None
|
||
image_dir = os.path.dirname(os.path.abspath(__file__))
|
||
for item_attr in attrs:
|
||
date_list_str = ''.join([date for sublist in item_attr.time_list for date in sublist])
|
||
file_name = f"{item_attr.crossid}{date_list_str}{item_attr.tp_name[:11]}"
|
||
file_dir_full = f"{image_dir}/../temp/{file_name}.jpg"
|
||
image_map['cross'] = file_dir_full
|
||
if item_attr.walk == 1:
|
||
#人行横道
|
||
file_name = f"{item_attr.crossid}{date_list_str}{item_attr.tp_name[:11]}_walk"
|
||
file_dir_full = f"{image_dir}/../temp/{file_name}.jpg"
|
||
image_map['walk'] = file_dir_full
|
||
if item_attr.delay == 1:
|
||
#延误
|
||
file_name = f"{item_attr.crossid}{date_list_str}{item_attr.tp_name[:11]}_delay"
|
||
file_dir_full = f"{image_dir}/../temp/{file_name}.jpg"
|
||
image_map['delay'] = file_dir_full
|
||
if item_attr.flow_in == 1:
|
||
#分流
|
||
file_name = f"{item_attr.crossid}{date_list_str}{item_attr.tp_name[:11]}_flow_in"
|
||
file_dir_full = f"{image_dir}/../temp/{file_name}.jpg"
|
||
image_map['flow_in'] = file_dir_full
|
||
|
||
exists = True
|
||
for index, image_file_path in image_map.items():
|
||
if not os.path.exists(image_file_path):
|
||
exists = False
|
||
|
||
if exists:
|
||
return image_map, None
|
||
|
||
try:
|
||
async with async_playwright() as p:
|
||
# 启动浏览器
|
||
browser = await p.chromium.launch(headless=True,
|
||
args=['--window-size=1900,800',
|
||
'--no-sandbox',
|
||
'--disable-setuid-sandbox',
|
||
'--disable-gpu',
|
||
'--enable-gpu-rasterization',
|
||
'--ignore-gpu-blocklist',
|
||
'--enable-zero-copy', '--disable-software-rasterizer',
|
||
'--disable-extensions',
|
||
'--disable-plugins', '--disable-hardware-media-key', '--no-zygote',
|
||
'--process-per-site', '--disable-background-networking',
|
||
'--disable-sync',
|
||
'--disable-background-timer-throttling'])
|
||
|
||
print('打开页面')
|
||
page = await browser.new_page() # 新建页面
|
||
|
||
cookies = [
|
||
{
|
||
'name': 'cross_manage_nodeid',
|
||
'value': str(nodeid),
|
||
'domain': '101.42.25.182',
|
||
'path': '/',
|
||
},
|
||
{
|
||
'name': 'cross_manage_areaid',
|
||
'value': str(area_id),
|
||
'domain': '101.42.25.182',
|
||
'path': '/',
|
||
},
|
||
{
|
||
'name': 'cross_manage_token',
|
||
'value': "iuqwefhjdbcsajhdshcgaiudncjadhajn_15001383357",
|
||
'domain': '101.42.25.182',
|
||
'path': '/',
|
||
},
|
||
{
|
||
'name': 'cross_manage_userid',
|
||
'value': userid,
|
||
'domain': '101.42.25.182',
|
||
'path': '/',
|
||
},
|
||
]
|
||
await page.context.add_cookies(cookies)
|
||
await page.set_viewport_size({'width': 1200, 'height': 860})
|
||
await asyncio.sleep(2)
|
||
await page.goto(f"http://101.42.25.182:8085/page/diagnosis/detail", timeout=120000)
|
||
print('等待amap')
|
||
await page.wait_for_selector('#amap')
|
||
await page.wait_for_function('document.readyState === "complete"')
|
||
await page.wait_for_selector('.amap-container')
|
||
await page.wait_for_function('document.querySelector(".amap-container") !== null')
|
||
print('等待网络')
|
||
await page.wait_for_load_state('load')
|
||
await page.wait_for_load_state('networkidle')
|
||
print('等待amap-layer-overlay')
|
||
await page.wait_for_selector('.el-loading-mask', state="hidden")
|
||
await page.wait_for_selector(".el-loading-spinner", state="hidden")
|
||
await page.wait_for_selector('.amap-layer-overlay', timeout=30000)
|
||
print("滚动到下面")
|
||
info_detail = await page.query_selector('.info-detail')
|
||
if info_detail:
|
||
await info_detail.evaluate('element => element.scrollTop = element.scrollHeight')
|
||
print("等到图片加载完成")
|
||
image_locators = await page.locator('img').all()
|
||
for img_locator in image_locators:
|
||
is_loaded = await img_locator.evaluate('(img) => img.complete') # 检查图片是否加载完成
|
||
if not is_loaded:
|
||
await img_locator.wait_for(state='attached')
|
||
|
||
print("选择路口....")
|
||
input_element = await page.query_selector("#rc_select_0")
|
||
await input_element.click()
|
||
await input_element.fill(attrs[0].cross_name)
|
||
print("输入路口....")
|
||
await page.wait_for_selector('.rc-virtual-list-holder-inner')
|
||
dev_select = await page.query_selector(".rc-virtual-list-holder-inner")
|
||
await dev_select.click()
|
||
print("点击路口....")
|
||
for index, item_time in enumerate(attrs[0].time_list):
|
||
time_select = page.locator('input[placeholder="开始日期"]')
|
||
#time_select = await page.query_selector(".ant-picker-input.ant-picker-input-active")
|
||
await time_select.click()
|
||
locator = page.locator('div.ant-picker-date-panel')
|
||
locator_elements = await locator.all()
|
||
if len(locator_elements) != 2:
|
||
await browser.close()
|
||
return {}, '时间选择器不存在'
|
||
start_elements = locator_elements[0]
|
||
end_elements = locator_elements[1]
|
||
start = item_time[0]
|
||
end = item_time[1]
|
||
#找开始时间
|
||
start_td = start_elements.locator('td')
|
||
start_td_elements = await start_td.all()
|
||
start_td_title = await start_td_elements[0].get_attribute('title')
|
||
start_button = start_elements.locator('button.ant-picker-header-prev-btn')
|
||
#结束日期
|
||
end_td = end_elements.locator('td')
|
||
end_td_elements = await end_td.all()
|
||
end_td_title = await end_td_elements[-1].get_attribute('title')
|
||
end_button = end_elements.locator('ant-picker-header-next-btn')
|
||
while start < start_td_title:
|
||
await start_button.click()
|
||
start_td = start_elements.locator('td')
|
||
start_td_elements = await start_td.all()
|
||
start_td_title = await start_td_elements[0].get_attribute('title')
|
||
|
||
while start > end_td_title:
|
||
await end_button.click()
|
||
end_td = end_elements.locator('td')
|
||
end_td_elements = await end_td.all()
|
||
end_td_title = await end_td_elements[-1].get_attribute('title')
|
||
start_click = False
|
||
#开始时间所在区域
|
||
for td_element in start_td_elements:
|
||
title = await td_element.get_attribute('title') # 获取 title 属性
|
||
if title == start:
|
||
start_click = True
|
||
await td_element.click()
|
||
break
|
||
# 开始时间所在区域
|
||
if not start_click:
|
||
for td_element in end_td_elements:
|
||
title = await td_element.get_attribute('title') # 获取 title 属性
|
||
if title == start:
|
||
start_click = True
|
||
await td_element.click()
|
||
break
|
||
# 找结束时间
|
||
start_td = start_elements.locator('td')
|
||
start_td_elements = await start_td.all()
|
||
# 结束日期
|
||
end_td = end_elements.locator('td')
|
||
end_td_elements = await end_td.all()
|
||
|
||
at_start = False
|
||
# 结束日期在开始时间所在区域
|
||
for td_element in start_td_elements:
|
||
title = await td_element.get_attribute('title') # 获取 title 属性
|
||
if title == end:
|
||
at_start = True
|
||
await td_element.click()
|
||
break
|
||
if not at_start:
|
||
# 结束日期在结束时间所在区域
|
||
for td_element in end_td_elements:
|
||
title = await td_element.get_attribute('title') # 获取 title 属性
|
||
if title == end:
|
||
await td_element.click()
|
||
break
|
||
|
||
button = page.locator('button.ant-btn.ant-btn-circle.ant-btn-primary.ant-btn-sm.ant-btn-icon-only')
|
||
while not await button.is_visible() or not await button.is_enabled():
|
||
await asyncio.sleep(0.1) # 等待 100 毫秒再检查
|
||
await button.click()
|
||
input_select_1 = page.locator('input#rc_select_1')
|
||
await input_select_1.click()
|
||
tp_divs = page.locator('div.ant-cascader-menu-item-content')
|
||
tp_div_elements = await tp_divs.all()
|
||
print("诊断时段....")
|
||
for item_tp_div in tp_div_elements:
|
||
#title = await item_tp_div.get_attribute('title')
|
||
title = await item_tp_div.text_content()
|
||
if title == attrs[0].tp_name or title == '小时时段':
|
||
if title == '小时时段':
|
||
await item_tp_div.click()
|
||
tp_divs_hour = page.locator('div.ant-cascader-menu-item-content')
|
||
tp_div_elements_hour = await tp_divs_hour.all()
|
||
for item_tp_div_hour in tp_div_elements_hour:
|
||
title2 = await item_tp_div_hour.text_content()
|
||
if title2 == attrs[0].tp_name:
|
||
await item_tp_div_hour.click()
|
||
else:
|
||
await item_tp_div.click()
|
||
print('点击查询')
|
||
search = page.locator('button.ant-btn.ant-btn-primary.ant-btn-sm').filter(has_text="查 询")
|
||
await search.click()
|
||
print('查询等待网络')
|
||
await page.wait_for_load_state('load')
|
||
await page.wait_for_load_state('networkidle', timeout=30000)
|
||
await page.wait_for_selector('.ant-spin-container.ant-spin-blur', state='visible', timeout=30000)
|
||
await page.wait_for_selector('.ant-spin-container.ant-spin-blur', state='hidden', timeout=30000)
|
||
await page.wait_for_selector('.css-1u7g04k.ant-spin.ant-spin-spinning', state='hidden',timeout=30000)
|
||
# 额外等待动画完成
|
||
await page.wait_for_timeout(300)
|
||
#await page.locator('div.clear').click()
|
||
|
||
print("去掉服务水平")
|
||
clear_buttons = await page.query_selector_all('button:has-text("清空")')
|
||
for item_clear_button in clear_buttons:
|
||
await item_clear_button.click()
|
||
await page.wait_for_timeout(500)
|
||
server = await page.query_selector('.ant-checkbox-wrapper.ant-checkbox-wrapper-checked.css-1u7g04k')
|
||
await server.click()
|
||
await page.wait_for_selector(".cross-level", state="hidden")
|
||
await page.wait_for_timeout(500)
|
||
if image_map.get('cross'):
|
||
element = await page.query_selector('.amap-layer-overlay')
|
||
cross_bounding_box = await element.bounding_box()
|
||
await page.screenshot(path=image_map['cross'],
|
||
clip={
|
||
'x': cross_bounding_box['x'],
|
||
'y': cross_bounding_box['y'],
|
||
'width': cross_bounding_box['width'],
|
||
'height': cross_bounding_box['height'] - 45
|
||
})
|
||
|
||
print('全屏')
|
||
all_screen = page.locator('button.css-1u7g04k.ant-btn.ant-btn-link').filter(has_text="全屏")
|
||
await all_screen.click()
|
||
await page.wait_for_timeout(500)
|
||
ledger_element = await page.query_selector('.ledger-box')
|
||
bounding_box = await ledger_element.bounding_box()
|
||
x_offset = 170
|
||
x_width = 850
|
||
#去掉方案异常路口显示
|
||
for image_key , item_image_path in image_map.items():
|
||
#人行横道
|
||
if image_key == 'walk':
|
||
print("人行横道截图")
|
||
clear_buttons = await page.query_selector_all('button:has-text("清空")')
|
||
for item_clear_button in clear_buttons:
|
||
await item_clear_button.click()
|
||
await page.wait_for_timeout(500)
|
||
flow_buttons = await page.query_selector_all('.ant-checkbox-wrapper.css-1u7g04k:has-text("人行横道距离")')
|
||
for item_clear_button in flow_buttons:
|
||
await item_clear_button.click()
|
||
await page.wait_for_timeout(500)
|
||
await page.screenshot(path=item_image_path,
|
||
clip={
|
||
'x': bounding_box['x'] + x_offset,
|
||
'y': bounding_box['y'],
|
||
'width': x_width,
|
||
'height': bounding_box['height']
|
||
})
|
||
# 延误
|
||
if image_key == 'delay':
|
||
print("延误截图")
|
||
clear_buttons = await page.query_selector_all('button:has-text("清空")')
|
||
for item_clear_button in clear_buttons:
|
||
await item_clear_button.click()
|
||
await page.wait_for_timeout(500)
|
||
clear_buttons = await page.query_selector_all('.ant-radio-wrapper.css-1u7g04k:has-text("延误")')
|
||
for item_clear_button in clear_buttons:
|
||
await item_clear_button.click()
|
||
await page.wait_for_timeout(500)
|
||
await page.screenshot(path=item_image_path,
|
||
clip={
|
||
'x': bounding_box['x'] + x_offset,
|
||
'y': bounding_box['y'],
|
||
'width': x_width,
|
||
'height': bounding_box['height']
|
||
})
|
||
# 汇入截图
|
||
if image_key == 'flow_in':
|
||
print("分流截图")
|
||
clear_buttons = await page.query_selector_all('button:has-text("清空")')
|
||
for item_clear_button in clear_buttons:
|
||
await item_clear_button.click()
|
||
await page.wait_for_timeout(500)
|
||
flow_in_buttons = await page.query_selector_all('.ant-radio-wrapper.css-1u7g04k:has-text("分流")')
|
||
for item_clear_button in flow_in_buttons:
|
||
await item_clear_button.click()
|
||
await page.wait_for_timeout(500)
|
||
await page.screenshot(path=item_image_path,
|
||
clip={
|
||
'x': bounding_box['x'] + x_offset,
|
||
'y': bounding_box['y'],
|
||
'width': x_width,
|
||
'height': bounding_box['height']
|
||
})
|
||
await browser.close()
|
||
return image_map, None
|
||
except Exception as error:
|
||
await browser.close()
|
||
return {}, error
|
||
|
||
|
||
async def _screenshot_wave_monitor_img(attrs: List[CutMonitorImgColumn], userid, role):
|
||
image_list = []
|
||
result = []
|
||
if len(attrs) <= 0:
|
||
return result, None
|
||
image_dir = os.path.dirname(os.path.abspath(__file__))
|
||
cut = False
|
||
for item_attr in attrs:
|
||
file_name = f"{item_attr.wave_id}{item_attr.week_or_day}{item_attr.date_name}"
|
||
file_dir_full = f"{image_dir}/../temp/{file_name}.jpg"
|
||
if not os.path.exists(file_dir_full):
|
||
file_dir_full = ''
|
||
cut = True
|
||
image_list.append({
|
||
'attr': item_attr,
|
||
'img': file_dir_full
|
||
})
|
||
|
||
if not cut:
|
||
for item_image in image_list:
|
||
result.append(item_image['img'])
|
||
return result, None
|
||
|
||
try:
|
||
async with async_playwright() as p:
|
||
# 启动浏览器
|
||
browser = await p.chromium.launch(headless=True,
|
||
args=['--window-size=1200,800', '--no-sandbox',
|
||
'--disable-setuid-sandbox',
|
||
'--disable-gpu',
|
||
'--enable-gpu-rasterization',
|
||
'--ignore-gpu-blocklist',
|
||
'--enable-zero-copy', '--disable-software-rasterizer',
|
||
'--disable-extensions',
|
||
'--disable-plugins', '--disable-hardware-media-key', '--no-zygote',
|
||
'--process-per-site', '--disable-background-networking',
|
||
'--disable-sync',
|
||
'--disable-background-timer-throttling'
|
||
])
|
||
|
||
print('打开页面')
|
||
page = await browser.new_page() # 新建页面
|
||
cookies = [
|
||
{
|
||
'name': 'manage_token',
|
||
'value': userid,
|
||
'domain': '101.42.25.182',
|
||
'path': '/',
|
||
},
|
||
{
|
||
'name': 'manage_userid',
|
||
'value': userid,
|
||
'domain': '101.42.25.182',
|
||
'path': '/',
|
||
},
|
||
]
|
||
await page.context.add_cookies(cookies)
|
||
await page.set_viewport_size({'width': 1200, 'height': 700})
|
||
await asyncio.sleep(2)
|
||
await page.goto(f"{domain}page/wave_monitor/situation", timeout=120000)
|
||
#await page.goto(f"http://82.157.173.20:8082/page/wave_monitor/situation", timeout=120000)
|
||
print('等待amap')
|
||
await page.wait_for_selector('#amap')
|
||
await page.wait_for_function('document.readyState === "complete"')
|
||
await page.wait_for_selector('.amap-container')
|
||
await page.wait_for_function('document.querySelector(".amap-container") !== null')
|
||
print('等待网络')
|
||
await page.wait_for_load_state('load')
|
||
await page.wait_for_load_state('networkidle')
|
||
print('等待amap-layer-overlay')
|
||
await page.wait_for_selector('.el-loading-mask', state="hidden")
|
||
await page.wait_for_selector(".el-loading-spinner", state="hidden")
|
||
await page.wait_for_selector('.amap-layer-overlay', timeout=30000)
|
||
|
||
for item_image_index, item_image in enumerate(image_list):
|
||
if item_image['img'] != '':
|
||
continue
|
||
item_file_name = f"{item_image['attr'].wave_id}{item_image['attr'].week_or_day}{item_image['attr'].date_name}"
|
||
item_file_dir_full = f"{image_dir}/../temp/{item_file_name}.jpg"
|
||
print("选择绿波....")
|
||
if item_image_index > 0:
|
||
print('重载')
|
||
await page.reload()
|
||
print('等待amap')
|
||
await page.wait_for_selector('#amap')
|
||
await page.wait_for_function('document.readyState === "complete"')
|
||
await page.wait_for_selector('.amap-container')
|
||
await page.wait_for_function('document.querySelector(".amap-container") !== null')
|
||
print('等待网络')
|
||
await page.wait_for_load_state('load')
|
||
await page.wait_for_load_state('networkidle')
|
||
print('等待amap-layer-overlay')
|
||
await page.wait_for_selector('.el-loading-mask', state="hidden")
|
||
await page.wait_for_selector(".el-loading-spinner", state="hidden")
|
||
await page.wait_for_selector('.amap-layer-overlay', timeout=30000)
|
||
|
||
input_element = await page.query_selector("#rc_select_0")
|
||
await input_element.click()
|
||
await input_element.fill(item_image['attr'].wave_name)
|
||
print("输入绿波....")
|
||
await page.wait_for_selector('.rc-virtual-list-holder-inner')
|
||
dev_select = await page.query_selector(".rc-virtual-list-holder-inner")
|
||
await dev_select.click()
|
||
print("点击周日....")
|
||
|
||
week_or_day_span = await page.query_selector('.ant-input-group.css-1u7g04k.ant-input-group-compact')
|
||
week_or_day = await week_or_day_span.query_selector(
|
||
'.ant-select.ant-select-sm.css-1u7g04k.ant-select-single.ant-select-show-arrow')
|
||
await week_or_day.click()
|
||
|
||
divs = page.locator('div.ant-select-item-option-content')
|
||
divs_all = await divs.all()
|
||
for item_div in divs_all:
|
||
text = await item_div.text_content()
|
||
print(text)
|
||
if text == '日' and item_image['attr'].week_or_day == 1:
|
||
await item_div.click()
|
||
break
|
||
if text == '周' and item_image['attr'].week_or_day == 2:
|
||
await item_div.click()
|
||
break
|
||
|
||
locator = await page.query_selector('div.ant-picker.ant-picker-small.css-1u7g04k')
|
||
await locator.click()
|
||
|
||
time_selector = page.locator('.ant-picker-cell:not(.ant-picker-cell-week)')
|
||
time_selector_elements = await time_selector.all()
|
||
|
||
start_date = await time_selector_elements[0].get_attribute('title')
|
||
end_date = await time_selector_elements[-1].get_attribute('title')
|
||
|
||
search_date = item_image['attr'].date_name
|
||
if item_image['attr'].week_or_day == 2:
|
||
search_date = get_date_of_week(item_image['attr'].date_name)
|
||
|
||
while search_date < start_date:
|
||
start_button = page.locator('button.ant-picker-header-prev-btn')
|
||
await start_button.click()
|
||
time_selector = page.locator('.ant-picker-cell:not(.ant-picker-cell-week)')
|
||
time_selector_elements = await time_selector.all()
|
||
start_date = await time_selector_elements[0].get_attribute('title')
|
||
end_date = await time_selector_elements[-1].get_attribute('title')
|
||
|
||
while search_date > end_date:
|
||
end_button = page.locator('button.ant-picker-header-next-btn')
|
||
await end_button.click()
|
||
time_selector = page.locator('.ant-picker-cell:not(.ant-picker-cell-week)')
|
||
time_selector_elements = await time_selector.all()
|
||
start_date = await time_selector_elements[0].get_attribute('title')
|
||
end_date = await time_selector_elements[-1].get_attribute('title')
|
||
|
||
day_list = []
|
||
#日
|
||
if item_image['attr'].week_or_day == 1:
|
||
day_list.append(search_date)
|
||
#周
|
||
if item_image['attr'].week_or_day == 2:
|
||
day_list = get_week_dates(search_date)
|
||
print(day_list)
|
||
time_selector = page.locator(
|
||
'.ant-picker-cell:not(.ant-picker-cell-week):not(.ant-picker-cell-disabled)')
|
||
time_selector_elements = await time_selector.all()
|
||
for item_time_selector in time_selector_elements:
|
||
title = await item_time_selector.get_attribute('title')
|
||
if title in day_list:
|
||
await item_time_selector.click()
|
||
break
|
||
tp_div = await page.query_selector('.ant-picker.ant-picker-range.ant-picker-small.css-1u7g04k')
|
||
#开始时间
|
||
start_tp = await tp_div.query_selector('input[placeholder="开始时间"]')
|
||
await start_tp.click()
|
||
tp_time = item_image['attr'].tp_start.split(':')
|
||
start_ul = page.locator(
|
||
'div.ant-picker-panel.ant-picker-panel-has-range ul.ant-picker-time-panel-column')
|
||
uls = await start_ul.all()
|
||
for index, item_ul in enumerate(uls):
|
||
li_divs = item_ul.locator('div.ant-picker-time-panel-cell-inner')
|
||
tp_divs = await li_divs.all()
|
||
for item_start_div in tp_divs:
|
||
title = await item_start_div.text_content()
|
||
# print(tp_time[index],title)
|
||
if tp_time[index] == title:
|
||
await item_start_div.click()
|
||
break
|
||
|
||
li_button = await page.query_selector(
|
||
'ul.ant-picker-ranges li.ant-picker-ok button.css-1u7g04k.ant-btn.ant-btn-primary.ant-btn-sm')
|
||
await li_button.click()
|
||
# 结束时间
|
||
# end_tp = await tp_div.query_selector('input[placeholder="结束时间"]')
|
||
# await end_tp.click()
|
||
# time.sleep(100000)
|
||
tp_time = item_image['attr'].tp_end.split(':')
|
||
end_ul = page.locator(
|
||
'div.ant-picker-panel.ant-picker-panel-has-range ul.ant-picker-time-panel-column')
|
||
end_uls = await end_ul.all()
|
||
for index, item_ul in enumerate(end_uls):
|
||
li_divs = item_ul.locator('div.ant-picker-time-panel-cell-inner')
|
||
tp_divs = await li_divs.all()
|
||
for item_start_div in tp_divs:
|
||
title = await item_start_div.text_content()
|
||
# print(tp_time[index], title)
|
||
if tp_time[index] == title:
|
||
await item_start_div.click()
|
||
break
|
||
li_button = await page.query_selector(
|
||
'ul.ant-picker-ranges li.ant-picker-ok button.css-1u7g04k.ant-btn.ant-btn-primary.ant-btn-sm')
|
||
await li_button.click()
|
||
right_button = await page.query_selector(
|
||
'div.custom-right-panel span.anticon.anticon-right-square')
|
||
if right_button:
|
||
await right_button.evaluate('element => element.click()')
|
||
bottom_button = await page.query_selector(
|
||
'span.anticon.anticon-down-square')
|
||
if bottom_button:
|
||
await bottom_button.evaluate('element => element.click()')
|
||
print('查询等待网络')
|
||
search_button = await page.query_selector(
|
||
'div.ant-col.ant-col-offset-1.css-1u7g04k button.css-1u7g04k.ant-btn.ant-btn-primary.ant-btn-sm')
|
||
await search_button.click()
|
||
print('查询等待网络')
|
||
|
||
await page.wait_for_load_state('load')
|
||
await page.wait_for_load_state('networkidle')
|
||
print('等待图片加载完成')
|
||
image_locators = await page.locator('img').all()
|
||
# 检查每个图片元素的加载状态
|
||
for img_locator in image_locators:
|
||
is_loaded = await img_locator.evaluate('(img) => img.complete') # 检查图片是否加载完成
|
||
if not is_loaded:
|
||
await img_locator.wait_for(state='attached')
|
||
print('查询等待amap-layer-overlay')
|
||
await page.wait_for_selector('.el-loading-mask', state="hidden")
|
||
await page.wait_for_selector(".el-loading-spinner", state="hidden")
|
||
element = await page.query_selector('.amap-layer-overlay')
|
||
|
||
# 去掉方案异常路口显示
|
||
tp_divs = page.locator('div.ant-col.ant-col-8.css-1u7g04k input[type="checkbox"]')
|
||
tp_div_elements = await tp_divs.all()
|
||
for item_tp_div_elements in tp_div_elements:
|
||
if await item_tp_div_elements.is_checked():
|
||
await item_tp_div_elements.click()
|
||
#markers = await page.query_selector_all('.amap-marker')
|
||
print('截图')
|
||
if element:
|
||
bounding_box = await element.bounding_box()
|
||
if bounding_box:
|
||
bound_x = bounding_box['x']
|
||
bound_y = bounding_box['y']
|
||
await page.screenshot(path=item_file_dir_full,
|
||
clip={
|
||
'x': bound_x,
|
||
'y': bound_y,
|
||
'width': bounding_box['width'],
|
||
'height': bounding_box['height'] - 30
|
||
})
|
||
item_image['img'] = item_file_dir_full
|
||
await browser.close()
|
||
result = []
|
||
for item_image_list in image_list:
|
||
result.append(item_image_list['img'])
|
||
return result, None
|
||
except Exception as error:
|
||
await browser.close()
|
||
return [], error
|
||
|
||
|
||
def get_week_dates(date_str):
|
||
# 将给定的日期字符串转换为日期对象
|
||
given_date = datetime.datetime.strptime(date_str, '%Y-%m-%d')
|
||
|
||
# 计算本周的周一日期
|
||
start_of_week = given_date - datetime.timedelta(days=given_date.weekday())
|
||
|
||
# 获取本周周一到周日的日期列表
|
||
week_dates = [start_of_week + datetime.timedelta(days=i) for i in range(7)]
|
||
|
||
# 将日期格式化为字符串列表('YYYY-MM-DD')
|
||
week_dates_str = [date.strftime('%Y-%m-%d') for date in week_dates]
|
||
|
||
return week_dates_str
|
||
|
||
|
||
def get_date_of_week(year_week_str):
|
||
year_str, week_str = year_week_str.split('-')
|
||
year = int(year_str)
|
||
week = int(week_str)
|
||
first_day_of_year = datetime.date(year, 1, 1)
|
||
first_day_iso = first_day_of_year.isocalendar()
|
||
delta_weeks = week - first_day_iso[1]
|
||
first_monday = first_day_of_year + datetime.timedelta(weeks=delta_weeks, days=-first_day_of_year.weekday())
|
||
return first_monday.strftime('%Y-%m-%d')
|
||
|
||
|
||
def cut_screenshot_img(wave_id, wave_name, time_list, tp_name, high=0, odi=0, divide=0, remit=0):
|
||
error = None
|
||
for i in range(0, 3):
|
||
if i > 0:
|
||
print(f"重试{i}次")
|
||
image, error = screenshot_wave_img_v3(wave_id, wave_name, time_list, tp_name, high, odi, divide, remit)
|
||
if not error:
|
||
return image, None
|
||
time.sleep(1)
|
||
return None, error
|
||
|
||
|
||
def cut_report_img(attrs: List[CutReportImgColumn], nodeid, area_id, userid='15836903493', role='manager',
|
||
filter_date: List[str] = None):
|
||
"""路口对比报告截图"""
|
||
error = None
|
||
image = []
|
||
print('参数', vars(attrs[0]),nodeid, area_id)
|
||
for i in range(0, 3):
|
||
if i > 0:
|
||
print(f"重试{i}次")
|
||
image, error = screenshot_report_img(attrs, nodeid, area_id, userid, role)
|
||
if not error:
|
||
return image, None
|
||
time.sleep(1)
|
||
print(error)
|
||
return image, error
|
||
|
||
|
||
def cut_monitor_img(attrs: List[CutMonitorImgColumn], userid='15836903493', role='manager'):
|
||
error = None
|
||
image = []
|
||
for i in range(0, 3):
|
||
if i > 0:
|
||
print(f"重试{i}次")
|
||
image, error = screenshot_wave_monitor_img(attrs, userid, role)
|
||
if not error:
|
||
return image, None
|
||
time.sleep(1)
|
||
return image, error
|
||
|
||
|
||
if __name__ == '__main__':
|
||
#路口对比报告截图
|
||
attrs = []
|
||
attrs.append(CutReportImgColumn(
|
||
crossid='CR_10264777_2509810',
|
||
cross_name='104国道与178县道交叉口',
|
||
time_list=[['2026-05-01', '2026-05-04'],['2026-05-06', '2026-05-09']],
|
||
tp_name='02:00-03:00',
|
||
walk=1,
|
||
delay=1,
|
||
flow_in=1
|
||
))
|
||
|
||
image_path, err = cut_report_img(attrs, 530100, 530101)
|
||
print(image_path)
|
||
exit(0)
|
||
#绿波巡检截图
|
||
attrs = []
|
||
attrs.append(CutMonitorImgColumn(
|
||
wave_id='CR_10265182_2496051@3@1770800675',
|
||
wave_name='大同33条(新)',
|
||
week_or_day=2, #1日,2周
|
||
date_name='2026-12', #2025-07-06,2025-27
|
||
tp_start='07:00',
|
||
tp_end='09:00'))
|
||
image_path, err = cut_monitor_img(attrs)
|
||
print(image_path, err)
|