import asyncio import datetime import os import platform import random import threading import time from typing import List import requests from flask import jsonify from playwright.async_api import async_playwright from playwright.sync_api import sync_playwright from PIL import Image domain = 'http://101.42.25.182:8082/' def get_os_type(): system = platform.system() if system == 'Darwin': return 'macOS' elif system == 'Linux': try: import distro if distro.name() == 'Ubuntu': return 'Ubuntu' return 'Linux' except ImportError: return 'Linux' elif system == 'Windows': return 'Windows' else: return 'Unknown OS' class CutMonitorImgColumn: def __init__(self, wave_id, wave_name, week_or_day, date_name, tp_start, tp_end): self.wave_id = wave_id self.wave_name = wave_name self.week_or_day = week_or_day #1日,2周 self.date_name = date_name self.tp_start = tp_start self.tp_end = tp_end class CutReportImgColumn: def __init__(self, crossid, cross_name, time_list, tp_name, walk=0, delay=0, flow_in=0): self.crossid = crossid self.cross_name = cross_name self.time_list = time_list self.tp_name = tp_name self.walk = walk self.delay = delay self.flow_in = flow_in class ScreenError(Exception): def __init__(self, message): self.message = message def __str__(self): return f"CustomError: {self.message}" def get_request_host(nodeid): response = requests.get(f"http://101.42.25.182:9999/api/route?nodeid={nodeid}") if response.status_code == 200: return response.json(), None return None, 'Failed to fetch data' def screenshot_wave_img_v3(wave_id, wave_name, time_list, tp_name, high=0, odi=0, divide=0, remit=0): return asyncio.run(_screenshot_wave_img_v3(wave_id, wave_name, time_list, tp_name, high, odi, divide, remit)) def screenshot_report_img(attrs: List[CutReportImgColumn], nodeid, area_id, userid, role): return asyncio.run(_screenshot_report_img(attrs, nodeid, area_id, userid, role)) def screenshot_wave_monitor_img(attrs: List[CutMonitorImgColumn], userid, role): return asyncio.run(_screenshot_wave_monitor_img(attrs, userid, role)) async def _screenshot_wave_img_v3(wave_id, wave_name, time_list, tp_name, high=0, odi=0, divide=0, remit=0): image_dir = os.path.dirname(os.path.abspath(__file__)) file_name = f"{wave_id}{tp_name[:11]}{high}{odi}{divide}{remit}" for item_time in time_list: file_name += f"{item_time[0]}{item_time[1]}" file_dir = f"{image_dir}/../temp/{file_name}.jpg" if os.path.exists(file_dir): return file_dir, None try: async with async_playwright() as p: # 启动浏览器 browser = await p.chromium.launch(headless=True, args=['--window-size=1900,768', '--no-sandbox', '--disable-setuid-sandbox', '--disable-gpu']) page = await browser.new_page() # 新建页面 # 这里模拟获取 host 响应的方式 #response, error = get_request_host(nodeid) # 确保这个函数能返回正确的响应 # 设置浏览器的视口大小 await page.set_viewport_size({'width': 1900, 'height': 768}) await asyncio.sleep(2) # 打开目标网页 print('打开页面') await page.goto(f"http://101.42.25.182:8082/page/wave_evaluate/evaluate", timeout=120000) # 等待元素加载 print('等待amap') await page.wait_for_selector('#amap') await page.wait_for_function('document.readyState === "complete"') await page.wait_for_selector('.amap-container') await page.wait_for_function('document.querySelector(".amap-container") !== null') #await page.wait_for_function('window.AMap && window.AMap.Map') # await page.wait_for_function('window.AMap && window.AMap.Map') print('等待网络') await page.wait_for_load_state('load') await page.wait_for_load_state('networkidle') print('等待amap-layer-overlay') await page.wait_for_selector('.el-loading-mask', state="hidden") await page.wait_for_selector(".el-loading-spinner", state="hidden") await page.wait_for_selector('.amap-layer-overlay', timeout=30000) print("右缩放") right = await page.query_selector('.anticon.anticon-right-square') while not await right.is_visible() or not await right.is_enabled(): await asyncio.sleep(0.1) # 等待 100 毫秒再检查 await right.click() await page.wait_for_selector(".metrics", state="hidden") #await asyncio.sleep(1) print("下缩放") bottom = await page.query_selector('.anticon.anticon-down-square') while not await bottom.is_visible() or not await bottom.is_enabled(): await asyncio.sleep(0.1) # 等待 100 毫秒再检查 await bottom.click() await page.wait_for_selector(".b-panel", state="hidden") #await asyncio.sleep(1) print("选择绿波....") input_element = await page.query_selector("#rc_select_0") await input_element.click() await input_element.fill(wave_name) print("输入绿波....") await page.wait_for_selector('.rc-virtual-list-holder-inner') dev_select = await page.query_selector(".rc-virtual-list-holder-inner") await dev_select.click() print("点击绿波....") if high == 1: checkbox_locator = page.locator('input[type="checkbox"][value="high_frequency"]') await checkbox_locator.click() if odi == 1: checkbox_locator = page.locator('input[type="checkbox"][value="ODI"]') await checkbox_locator.click() if divide == 1: checkbox_locator = page.locator('input[type="radio"][value="forward-out_ts_info"]') await checkbox_locator.click() if divide == 2: checkbox_locator = page.locator('input[type="radio"][value="backward-out_ts_info"]') await checkbox_locator.click() if remit == 1: checkbox_locator = page.locator('input[type="radio"][value="forward-in_ts_info"]') await checkbox_locator.click() if remit == 2: checkbox_locator = page.locator('input[type="radio"][value="backward-in_ts_info"]') await checkbox_locator.click() for index, item_time in enumerate(time_list): time_select = page.locator('input[placeholder="开始日期"]') #time_select = await page.query_selector(".ant-picker-input.ant-picker-input-active") await time_select.click() locator = page.locator('div.ant-picker-date-panel') locator_elements = await locator.all() if len(locator_elements) != 2: await browser.close() return '时间选择器不存在' start_elements = locator_elements[0] end_elements = locator_elements[1] start = item_time[0] end = item_time[1] #找开始时间 start_td = start_elements.locator('td') start_td_elements = await start_td.all() start_td_title = await start_td_elements[0].get_attribute('title') start_button = start_elements.locator('button.ant-picker-header-prev-btn') #结束日期 end_td = end_elements.locator('td') end_td_elements = await end_td.all() end_td_title = await end_td_elements[-1].get_attribute('title') end_button = end_elements.locator('ant-picker-header-next-btn') while start < start_td_title: await start_button.click() start_td = start_elements.locator('td') start_td_elements = await start_td.all() start_td_title = await start_td_elements[0].get_attribute('title') while start > end_td_title: await end_button.click() end_td = end_elements.locator('td') end_td_elements = await end_td.all() end_td_title = await end_td_elements[-1].get_attribute('title') #开始时间所在区域 for td_element in start_td_elements: title = await td_element.get_attribute('title') # 获取 title 属性 if title == start: await td_element.click() break # 开始时间所在区域 for td_element in end_td_elements: title = await td_element.get_attribute('title') # 获取 title 属性 if title == start: await td_element.click() break # 找结束时间 start_td = start_elements.locator('td') start_td_elements = await start_td.all() # 结束日期 end_td = end_elements.locator('td') end_td_elements = await end_td.all() at_start = False # 结束日期在开始时间所在区域 for td_element in start_td_elements: title = await td_element.get_attribute('title') # 获取 title 属性 if title == end: at_start = True await td_element.click() break if not at_start: # 结束日期在结束时间所在区域 for td_element in end_td_elements: title = await td_element.get_attribute('title') # 获取 title 属性 if title == end: await td_element.click() break button = page.locator('button.ant-btn.ant-btn-circle.ant-btn-primary.ant-btn-sm.ant-btn-icon-only') while not await button.is_visible() or not await button.is_enabled(): await asyncio.sleep(0.1) # 等待 100 毫秒再检查 await button.click() input_select_1 = page.locator('input#rc_select_1') await input_select_1.click() tp_divs = page.locator('div.ant-select-item.ant-select-item-option') tp_div_elements = await tp_divs.all() for item_tp_div in tp_div_elements: title = await item_tp_div.get_attribute('title') if title == tp_name: await item_tp_div.click() search = await page.query_selector( '.css-1u7g04k.ant-btn.ant-btn-primary.ant-btn-sm:not(.ant-btn-circle)') while not await search.is_visible() or not await search.is_enabled(): await asyncio.sleep(0.1) # 等待 100 毫秒再检查 await search.click() print('查询等待网络') await page.wait_for_load_state('load') await page.wait_for_load_state('networkidle') if divide > 0 or remit > 0: print('等待转向比') await page.wait_for_selector('div.flow-container', state='attached') await page.wait_for_selector('div.flow-container >> :visible', timeout=10000) print('等待图片加载完成') image_locators = await page.locator('img').all() # 检查每个图片元素的加载状态 for img_locator in image_locators: is_loaded = await img_locator.evaluate('(img) => img.complete') # 检查图片是否加载完成 if not is_loaded: await img_locator.wait_for(state='attached') print('查询等待amap-layer-overlay') await page.wait_for_selector('.el-loading-mask', state="hidden") await page.wait_for_selector(".el-loading-spinner", state="hidden") element = await page.query_selector('.amap-layer-overlay') markers = await page.query_selector_all('.amap-marker') marker_positions = [] for marker in markers: # 获取 left 和 top 样式属性 left = await marker.evaluate("el => window.getComputedStyle(el).left") top = await marker.evaluate("el => window.getComputedStyle(el).top") marker_positions.append({"left": left.replace('px', ''), "top": top.replace('px', '')}) left_values = [float(item['left']) for item in marker_positions] top_values = [float(item['top']) for item in marker_positions] min_left = min(left_values) min_top = min(top_values) max_left = max(left_values) max_top = max(top_values) print('截图') if element: bounding_box = await element.bounding_box() if bounding_box: bound_x = bounding_box['x'] bound_y = bounding_box['y'] x = min_left - 200 if min_left - 200 > 0 else 0 y = min_top - 70 if min_top - 70 > 0 else 0 width = max_left + 250 - x if max_left + 250 < bounding_box['width'] else bounding_box[ 'width'] - x height = max_top + 70 - y if max_top + 70 < bounding_box['height'] else bounding_box[ 'height'] - y await page.screenshot(path=file_dir, clip={ 'x': x + bound_x, 'y': y + bound_y, 'width': width, 'height': height }) return file_dir, None await browser.close() return None, None except Exception as error: await browser.close() return None, error async def _screenshot_report_img(attrs: List[CutReportImgColumn], nodeid, area_id, userid, role): """ 路口对比报告截图 """ image_map = {} if len(attrs) <= 0: return {}, None image_dir = os.path.dirname(os.path.abspath(__file__)) for item_attr in attrs: date_list_str = ''.join([date for sublist in item_attr.time_list for date in sublist]) file_name = f"{item_attr.crossid}{date_list_str}{item_attr.tp_name[:11]}" file_dir_full = f"{image_dir}/../temp/{file_name}.jpg" image_map['cross'] = file_dir_full if item_attr.walk == 1: #人行横道 file_name = f"{item_attr.crossid}{date_list_str}{item_attr.tp_name[:11]}_walk" file_dir_full = f"{image_dir}/../temp/{file_name}.jpg" image_map['walk'] = file_dir_full if item_attr.delay == 1: #延误 file_name = f"{item_attr.crossid}{date_list_str}{item_attr.tp_name[:11]}_delay" file_dir_full = f"{image_dir}/../temp/{file_name}.jpg" image_map['delay'] = file_dir_full if item_attr.flow_in == 1: #分流 file_name = f"{item_attr.crossid}{date_list_str}{item_attr.tp_name[:11]}_flow_in" file_dir_full = f"{image_dir}/../temp/{file_name}.jpg" image_map['flow_in'] = file_dir_full exists = True for index, image_file_path in image_map.items(): if not os.path.exists(image_file_path): exists = False if exists: return image_map, None try: async with async_playwright() as p: # 启动浏览器 browser = await p.chromium.launch(headless=True, args=['--window-size=1900,800', '--no-sandbox', '--disable-setuid-sandbox', '--disable-gpu', '--enable-gpu-rasterization', '--ignore-gpu-blocklist', '--enable-zero-copy', '--disable-software-rasterizer', '--disable-extensions', '--disable-plugins', '--disable-hardware-media-key', '--no-zygote', '--process-per-site', '--disable-background-networking', '--disable-sync', '--disable-background-timer-throttling']) print('打开页面') page = await browser.new_page() # 新建页面 cookies = [ { 'name': 'cross_manage_nodeid', 'value': str(nodeid), 'domain': '101.42.25.182', 'path': '/', }, { 'name': 'cross_manage_areaid', 'value': str(area_id), 'domain': '101.42.25.182', 'path': '/', }, { 'name': 'cross_manage_token', 'value': "iuqwefhjdbcsajhdshcgaiudncjadhajn_15001383357", 'domain': '101.42.25.182', 'path': '/', }, { 'name': 'cross_manage_userid', 'value': userid, 'domain': '101.42.25.182', 'path': '/', }, ] await page.context.add_cookies(cookies) await page.set_viewport_size({'width': 1200, 'height': 860}) await asyncio.sleep(2) await page.goto(f"http://101.42.25.182:8085/page/diagnosis/detail", timeout=120000) print('等待amap') await page.wait_for_selector('#amap') await page.wait_for_function('document.readyState === "complete"') await page.wait_for_selector('.amap-container') await page.wait_for_function('document.querySelector(".amap-container") !== null') print('等待网络') await page.wait_for_load_state('load') await page.wait_for_load_state('networkidle') print('等待amap-layer-overlay') await page.wait_for_selector('.el-loading-mask', state="hidden") await page.wait_for_selector(".el-loading-spinner", state="hidden") await page.wait_for_selector('.amap-layer-overlay', timeout=30000) print("滚动到下面") info_detail = await page.query_selector('.info-detail') if info_detail: await info_detail.evaluate('element => element.scrollTop = element.scrollHeight') print("等到图片加载完成") image_locators = await page.locator('img').all() for img_locator in image_locators: is_loaded = await img_locator.evaluate('(img) => img.complete') # 检查图片是否加载完成 if not is_loaded: await img_locator.wait_for(state='attached') print("选择路口....") input_element = await page.query_selector("#rc_select_0") await input_element.click() await input_element.fill(attrs[0].cross_name) print("输入路口....") await page.wait_for_selector('.rc-virtual-list-holder-inner') dev_select = await page.query_selector(".rc-virtual-list-holder-inner") await dev_select.click() print("点击路口....") for index, item_time in enumerate(attrs[0].time_list): time_select = page.locator('input[placeholder="开始日期"]') #time_select = await page.query_selector(".ant-picker-input.ant-picker-input-active") await time_select.click() locator = page.locator('div.ant-picker-date-panel') locator_elements = await locator.all() if len(locator_elements) != 2: await browser.close() return {}, '时间选择器不存在' start_elements = locator_elements[0] end_elements = locator_elements[1] start = item_time[0] end = item_time[1] #找开始时间 start_td = start_elements.locator('td') start_td_elements = await start_td.all() start_td_title = await start_td_elements[0].get_attribute('title') start_button = start_elements.locator('button.ant-picker-header-prev-btn') #结束日期 end_td = end_elements.locator('td') end_td_elements = await end_td.all() end_td_title = await end_td_elements[-1].get_attribute('title') end_button = end_elements.locator('ant-picker-header-next-btn') while start < start_td_title: await start_button.click() start_td = start_elements.locator('td') start_td_elements = await start_td.all() start_td_title = await start_td_elements[0].get_attribute('title') while start > end_td_title: await end_button.click() end_td = end_elements.locator('td') end_td_elements = await end_td.all() end_td_title = await end_td_elements[-1].get_attribute('title') start_click = False #开始时间所在区域 for td_element in start_td_elements: title = await td_element.get_attribute('title') # 获取 title 属性 if title == start: start_click = True await td_element.click() break # 开始时间所在区域 if not start_click: for td_element in end_td_elements: title = await td_element.get_attribute('title') # 获取 title 属性 if title == start: start_click = True await td_element.click() break # 找结束时间 start_td = start_elements.locator('td') start_td_elements = await start_td.all() # 结束日期 end_td = end_elements.locator('td') end_td_elements = await end_td.all() at_start = False # 结束日期在开始时间所在区域 for td_element in start_td_elements: title = await td_element.get_attribute('title') # 获取 title 属性 if title == end: at_start = True await td_element.click() break if not at_start: # 结束日期在结束时间所在区域 for td_element in end_td_elements: title = await td_element.get_attribute('title') # 获取 title 属性 if title == end: await td_element.click() break button = page.locator('button.ant-btn.ant-btn-circle.ant-btn-primary.ant-btn-sm.ant-btn-icon-only') while not await button.is_visible() or not await button.is_enabled(): await asyncio.sleep(0.1) # 等待 100 毫秒再检查 await button.click() input_select_1 = page.locator('input#rc_select_1') await input_select_1.click() tp_divs = page.locator('div.ant-cascader-menu-item-content') tp_div_elements = await tp_divs.all() print("诊断时段....") for item_tp_div in tp_div_elements: #title = await item_tp_div.get_attribute('title') title = await item_tp_div.text_content() if title == attrs[0].tp_name or title == '小时时段': if title == '小时时段': await item_tp_div.click() tp_divs_hour = page.locator('div.ant-cascader-menu-item-content') tp_div_elements_hour = await tp_divs_hour.all() for item_tp_div_hour in tp_div_elements_hour: title2 = await item_tp_div_hour.text_content() if title2 == attrs[0].tp_name: await item_tp_div_hour.click() else: await item_tp_div.click() print('点击查询') search = page.locator('button.ant-btn.ant-btn-primary.ant-btn-sm').filter(has_text="查 询") await search.click() print('查询等待网络') await page.wait_for_load_state('load') await page.wait_for_load_state('networkidle', timeout=30000) await page.wait_for_selector('.ant-spin-container.ant-spin-blur', state='visible', timeout=30000) await page.wait_for_selector('.ant-spin-container.ant-spin-blur', state='hidden', timeout=30000) await page.wait_for_selector('.css-1u7g04k.ant-spin.ant-spin-spinning', state='hidden',timeout=30000) # 额外等待动画完成 await page.wait_for_timeout(300) #await page.locator('div.clear').click() print("去掉服务水平") clear_buttons = await page.query_selector_all('button:has-text("清空")') for item_clear_button in clear_buttons: await item_clear_button.click() await page.wait_for_timeout(500) server = await page.query_selector('.ant-checkbox-wrapper.ant-checkbox-wrapper-checked.css-1u7g04k') await server.click() await page.wait_for_selector(".cross-level", state="hidden") await page.wait_for_timeout(500) if image_map.get('cross'): element = await page.query_selector('.amap-layer-overlay') cross_bounding_box = await element.bounding_box() await page.screenshot(path=image_map['cross'], clip={ 'x': cross_bounding_box['x'], 'y': cross_bounding_box['y'], 'width': cross_bounding_box['width'], 'height': cross_bounding_box['height'] - 45 }) print('全屏') all_screen = page.locator('button.css-1u7g04k.ant-btn.ant-btn-link').filter(has_text="全屏") await all_screen.click() await page.wait_for_timeout(500) ledger_element = await page.query_selector('.ledger-box') bounding_box = await ledger_element.bounding_box() x_offset = 170 x_width = 850 #去掉方案异常路口显示 for image_key , item_image_path in image_map.items(): #人行横道 if image_key == 'walk': print("人行横道截图") clear_buttons = await page.query_selector_all('button:has-text("清空")') for item_clear_button in clear_buttons: await item_clear_button.click() await page.wait_for_timeout(500) flow_buttons = await page.query_selector_all('.ant-checkbox-wrapper.css-1u7g04k:has-text("人行横道距离")') for item_clear_button in flow_buttons: await item_clear_button.click() await page.wait_for_timeout(500) await page.screenshot(path=item_image_path, clip={ 'x': bounding_box['x'] + x_offset, 'y': bounding_box['y'], 'width': x_width, 'height': bounding_box['height'] }) # 延误 if image_key == 'delay': print("延误截图") clear_buttons = await page.query_selector_all('button:has-text("清空")') for item_clear_button in clear_buttons: await item_clear_button.click() await page.wait_for_timeout(500) clear_buttons = await page.query_selector_all('.ant-radio-wrapper.css-1u7g04k:has-text("延误")') for item_clear_button in clear_buttons: await item_clear_button.click() await page.wait_for_timeout(500) await page.screenshot(path=item_image_path, clip={ 'x': bounding_box['x'] + x_offset, 'y': bounding_box['y'], 'width': x_width, 'height': bounding_box['height'] }) # 汇入截图 if image_key == 'flow_in': print("分流截图") clear_buttons = await page.query_selector_all('button:has-text("清空")') for item_clear_button in clear_buttons: await item_clear_button.click() await page.wait_for_timeout(500) flow_in_buttons = await page.query_selector_all('.ant-radio-wrapper.css-1u7g04k:has-text("分流")') for item_clear_button in flow_in_buttons: await item_clear_button.click() await page.wait_for_timeout(500) await page.screenshot(path=item_image_path, clip={ 'x': bounding_box['x'] + x_offset, 'y': bounding_box['y'], 'width': x_width, 'height': bounding_box['height'] }) await browser.close() return image_map, None except Exception as error: await browser.close() return {}, error async def _screenshot_wave_monitor_img(attrs: List[CutMonitorImgColumn], userid, role): image_list = [] result = [] if len(attrs) <= 0: return result, None image_dir = os.path.dirname(os.path.abspath(__file__)) cut = False for item_attr in attrs: file_name = f"{item_attr.wave_id}{item_attr.week_or_day}{item_attr.date_name}" file_dir_full = f"{image_dir}/../temp/{file_name}.jpg" if not os.path.exists(file_dir_full): file_dir_full = '' cut = True image_list.append({ 'attr': item_attr, 'img': file_dir_full }) if not cut: for item_image in image_list: result.append(item_image['img']) return result, None try: async with async_playwright() as p: # 启动浏览器 browser = await p.chromium.launch(headless=True, args=['--window-size=1200,800', '--no-sandbox', '--disable-setuid-sandbox', '--disable-gpu', '--enable-gpu-rasterization', '--ignore-gpu-blocklist', '--enable-zero-copy', '--disable-software-rasterizer', '--disable-extensions', '--disable-plugins', '--disable-hardware-media-key', '--no-zygote', '--process-per-site', '--disable-background-networking', '--disable-sync', '--disable-background-timer-throttling' ]) print('打开页面') page = await browser.new_page() # 新建页面 cookies = [ { 'name': 'manage_token', 'value': userid, 'domain': '101.42.25.182', 'path': '/', }, { 'name': 'manage_userid', 'value': userid, 'domain': '101.42.25.182', 'path': '/', }, ] await page.context.add_cookies(cookies) await page.set_viewport_size({'width': 1200, 'height': 700}) await asyncio.sleep(2) await page.goto(f"{domain}page/wave_monitor/situation", timeout=120000) #await page.goto(f"http://82.157.173.20:8082/page/wave_monitor/situation", timeout=120000) print('等待amap') await page.wait_for_selector('#amap') await page.wait_for_function('document.readyState === "complete"') await page.wait_for_selector('.amap-container') await page.wait_for_function('document.querySelector(".amap-container") !== null') print('等待网络') await page.wait_for_load_state('load') await page.wait_for_load_state('networkidle') print('等待amap-layer-overlay') await page.wait_for_selector('.el-loading-mask', state="hidden") await page.wait_for_selector(".el-loading-spinner", state="hidden") await page.wait_for_selector('.amap-layer-overlay', timeout=30000) for item_image_index, item_image in enumerate(image_list): if item_image['img'] != '': continue item_file_name = f"{item_image['attr'].wave_id}{item_image['attr'].week_or_day}{item_image['attr'].date_name}" item_file_dir_full = f"{image_dir}/../temp/{item_file_name}.jpg" print("选择绿波....") if item_image_index > 0: print('重载') await page.reload() print('等待amap') await page.wait_for_selector('#amap') await page.wait_for_function('document.readyState === "complete"') await page.wait_for_selector('.amap-container') await page.wait_for_function('document.querySelector(".amap-container") !== null') print('等待网络') await page.wait_for_load_state('load') await page.wait_for_load_state('networkidle') print('等待amap-layer-overlay') await page.wait_for_selector('.el-loading-mask', state="hidden") await page.wait_for_selector(".el-loading-spinner", state="hidden") await page.wait_for_selector('.amap-layer-overlay', timeout=30000) input_element = await page.query_selector("#rc_select_0") await input_element.click() await input_element.fill(item_image['attr'].wave_name) print("输入绿波....") await page.wait_for_selector('.rc-virtual-list-holder-inner') dev_select = await page.query_selector(".rc-virtual-list-holder-inner") await dev_select.click() print("点击周日....") week_or_day_span = await page.query_selector('.ant-input-group.css-1u7g04k.ant-input-group-compact') week_or_day = await week_or_day_span.query_selector( '.ant-select.ant-select-sm.css-1u7g04k.ant-select-single.ant-select-show-arrow') await week_or_day.click() divs = page.locator('div.ant-select-item-option-content') divs_all = await divs.all() for item_div in divs_all: text = await item_div.text_content() print(text) if text == '日' and item_image['attr'].week_or_day == 1: await item_div.click() break if text == '周' and item_image['attr'].week_or_day == 2: await item_div.click() break locator = await page.query_selector('div.ant-picker.ant-picker-small.css-1u7g04k') await locator.click() time_selector = page.locator('.ant-picker-cell:not(.ant-picker-cell-week)') time_selector_elements = await time_selector.all() start_date = await time_selector_elements[0].get_attribute('title') end_date = await time_selector_elements[-1].get_attribute('title') search_date = item_image['attr'].date_name if item_image['attr'].week_or_day == 2: search_date = get_date_of_week(item_image['attr'].date_name) while search_date < start_date: start_button = page.locator('button.ant-picker-header-prev-btn') await start_button.click() time_selector = page.locator('.ant-picker-cell:not(.ant-picker-cell-week)') time_selector_elements = await time_selector.all() start_date = await time_selector_elements[0].get_attribute('title') end_date = await time_selector_elements[-1].get_attribute('title') while search_date > end_date: end_button = page.locator('button.ant-picker-header-next-btn') await end_button.click() time_selector = page.locator('.ant-picker-cell:not(.ant-picker-cell-week)') time_selector_elements = await time_selector.all() start_date = await time_selector_elements[0].get_attribute('title') end_date = await time_selector_elements[-1].get_attribute('title') day_list = [] #日 if item_image['attr'].week_or_day == 1: day_list.append(search_date) #周 if item_image['attr'].week_or_day == 2: day_list = get_week_dates(search_date) print(day_list) time_selector = page.locator( '.ant-picker-cell:not(.ant-picker-cell-week):not(.ant-picker-cell-disabled)') time_selector_elements = await time_selector.all() for item_time_selector in time_selector_elements: title = await item_time_selector.get_attribute('title') if title in day_list: await item_time_selector.click() break tp_div = await page.query_selector('.ant-picker.ant-picker-range.ant-picker-small.css-1u7g04k') #开始时间 start_tp = await tp_div.query_selector('input[placeholder="开始时间"]') await start_tp.click() tp_time = item_image['attr'].tp_start.split(':') start_ul = page.locator( 'div.ant-picker-panel.ant-picker-panel-has-range ul.ant-picker-time-panel-column') uls = await start_ul.all() for index, item_ul in enumerate(uls): li_divs = item_ul.locator('div.ant-picker-time-panel-cell-inner') tp_divs = await li_divs.all() for item_start_div in tp_divs: title = await item_start_div.text_content() # print(tp_time[index],title) if tp_time[index] == title: await item_start_div.click() break li_button = await page.query_selector( 'ul.ant-picker-ranges li.ant-picker-ok button.css-1u7g04k.ant-btn.ant-btn-primary.ant-btn-sm') await li_button.click() # 结束时间 # end_tp = await tp_div.query_selector('input[placeholder="结束时间"]') # await end_tp.click() # time.sleep(100000) tp_time = item_image['attr'].tp_end.split(':') end_ul = page.locator( 'div.ant-picker-panel.ant-picker-panel-has-range ul.ant-picker-time-panel-column') end_uls = await end_ul.all() for index, item_ul in enumerate(end_uls): li_divs = item_ul.locator('div.ant-picker-time-panel-cell-inner') tp_divs = await li_divs.all() for item_start_div in tp_divs: title = await item_start_div.text_content() # print(tp_time[index], title) if tp_time[index] == title: await item_start_div.click() break li_button = await page.query_selector( 'ul.ant-picker-ranges li.ant-picker-ok button.css-1u7g04k.ant-btn.ant-btn-primary.ant-btn-sm') await li_button.click() right_button = await page.query_selector( 'div.custom-right-panel span.anticon.anticon-right-square') if right_button: await right_button.evaluate('element => element.click()') bottom_button = await page.query_selector( 'span.anticon.anticon-down-square') if bottom_button: await bottom_button.evaluate('element => element.click()') print('查询等待网络') search_button = await page.query_selector( 'div.ant-col.ant-col-offset-1.css-1u7g04k button.css-1u7g04k.ant-btn.ant-btn-primary.ant-btn-sm') await search_button.click() print('查询等待网络') await page.wait_for_load_state('load') await page.wait_for_load_state('networkidle') print('等待图片加载完成') image_locators = await page.locator('img').all() # 检查每个图片元素的加载状态 for img_locator in image_locators: is_loaded = await img_locator.evaluate('(img) => img.complete') # 检查图片是否加载完成 if not is_loaded: await img_locator.wait_for(state='attached') print('查询等待amap-layer-overlay') await page.wait_for_selector('.el-loading-mask', state="hidden") await page.wait_for_selector(".el-loading-spinner", state="hidden") element = await page.query_selector('.amap-layer-overlay') # 去掉方案异常路口显示 tp_divs = page.locator('div.ant-col.ant-col-8.css-1u7g04k input[type="checkbox"]') tp_div_elements = await tp_divs.all() for item_tp_div_elements in tp_div_elements: if await item_tp_div_elements.is_checked(): await item_tp_div_elements.click() #markers = await page.query_selector_all('.amap-marker') print('截图') if element: bounding_box = await element.bounding_box() if bounding_box: bound_x = bounding_box['x'] bound_y = bounding_box['y'] await page.screenshot(path=item_file_dir_full, clip={ 'x': bound_x, 'y': bound_y, 'width': bounding_box['width'], 'height': bounding_box['height'] - 30 }) item_image['img'] = item_file_dir_full await browser.close() result = [] for item_image_list in image_list: result.append(item_image_list['img']) return result, None except Exception as error: await browser.close() return [], error def get_week_dates(date_str): # 将给定的日期字符串转换为日期对象 given_date = datetime.datetime.strptime(date_str, '%Y-%m-%d') # 计算本周的周一日期 start_of_week = given_date - datetime.timedelta(days=given_date.weekday()) # 获取本周周一到周日的日期列表 week_dates = [start_of_week + datetime.timedelta(days=i) for i in range(7)] # 将日期格式化为字符串列表('YYYY-MM-DD') week_dates_str = [date.strftime('%Y-%m-%d') for date in week_dates] return week_dates_str def get_date_of_week(year_week_str): year_str, week_str = year_week_str.split('-') year = int(year_str) week = int(week_str) first_day_of_year = datetime.date(year, 1, 1) first_day_iso = first_day_of_year.isocalendar() delta_weeks = week - first_day_iso[1] first_monday = first_day_of_year + datetime.timedelta(weeks=delta_weeks, days=-first_day_of_year.weekday()) return first_monday.strftime('%Y-%m-%d') def cut_screenshot_img(wave_id, wave_name, time_list, tp_name, high=0, odi=0, divide=0, remit=0): error = None for i in range(0, 3): if i > 0: print(f"重试{i}次") image, error = screenshot_wave_img_v3(wave_id, wave_name, time_list, tp_name, high, odi, divide, remit) if not error: return image, None time.sleep(1) return None, error def cut_report_img(attrs: List[CutReportImgColumn], nodeid, area_id, userid='15836903493', role='manager', filter_date: List[str] = None): """路口对比报告截图""" error = None image = [] print('参数', vars(attrs[0]),nodeid, area_id) for i in range(0, 3): if i > 0: print(f"重试{i}次") image, error = screenshot_report_img(attrs, nodeid, area_id, userid, role) if not error: return image, None time.sleep(1) print(error) return image, error def cut_monitor_img(attrs: List[CutMonitorImgColumn], userid='15836903493', role='manager'): error = None image = [] for i in range(0, 3): if i > 0: print(f"重试{i}次") image, error = screenshot_wave_monitor_img(attrs, userid, role) if not error: return image, None time.sleep(1) return image, error if __name__ == '__main__': #路口对比报告截图 attrs = [] attrs.append(CutReportImgColumn( crossid='CR_10264777_2509810', cross_name='104国道与178县道交叉口', time_list=[['2026-05-01', '2026-05-04'],['2026-05-06', '2026-05-09']], tp_name='02:00-03:00', walk=1, delay=1, flow_in=1 )) image_path, err = cut_report_img(attrs, 530100, 530101) print(image_path) exit(0) #绿波巡检截图 attrs = [] attrs.append(CutMonitorImgColumn( wave_id='CR_10265182_2496051@3@1770800675', wave_name='大同33条(新)', week_or_day=2, #1日,2周 date_name='2026-12', #2025-07-06,2025-27 tp_start='07:00', tp_end='09:00')) image_path, err = cut_monitor_img(attrs) print(image_path, err)