cross_doctor/tool/cross_monitor_week_report.py

298 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
"""
路口巡检周报自动生成脚本 (适配新数据结构版)
数据结构映射:
1. 大标题 -> data['title'] (城市 + 周数)
2. 正文/日期 -> data['date'] 和 data['part1']
3. 表格 -> data['part2']
"""
import io
import logging
import re
from datetime import datetime
from docx import Document
from docx.shared import Pt, Cm
from docx.enum.text import WD_ALIGN_PARAGRAPH
from docx.enum.table import WD_ALIGN_VERTICAL
from docx.oxml.ns import qn
# ================= 1. 样式定义函数 =================
from tool.qcos_func import g_cos_bucket, g_cos_root
from tool.export_cross_monitor_task_file import folder_manager, cos_client
def get_style_body(doc):
"""正文样式:宋体,小四"""
style_name = '自定义正文'
try:
return doc.styles[style_name]
except KeyError:
style = doc.styles.add_style(style_name, 1)
style.font.name = '宋体'
style.font.size = Pt(12) # 小四
style.font._element.rPr.rFonts.set(qn('w:eastAsia'), '宋体')
# 设置行距为1.5倍,看起来更舒服
pf = style.paragraph_format
pf.line_spacing = 1.5
return style
def get_style_title(doc):
"""大标题样式:宋体,小二,加粗"""
style_name = '自定义大标题'
try:
return doc.styles[style_name]
except KeyError:
style = doc.styles.add_style(style_name, 1)
style.font.name = '宋体'
style.font.size = Pt(18) # 小二
style.font.bold = True
style.font._element.rPr.rFonts.set(qn('w:eastAsia'), '宋体')
pf = style.paragraph_format
pf.alignment = WD_ALIGN_PARAGRAPH.CENTER
pf.space_before = Pt(12) # 稍微加点段前距
pf.space_after = Pt(6)
return style
def get_style_table_cell(doc):
"""表格样式:宋体,五号,无缩进"""
style_name = '自定义表格内容'
try:
return doc.styles[style_name]
except KeyError:
style = doc.styles.add_style(style_name, 1)
style.font.name = '宋体'
style.font.size = Pt(10.5) # 五号
style.font._element.rPr.rFonts.set(qn('w:eastAsia'), '宋体')
pf = style.paragraph_format
pf.first_line_indent = Cm(0)
pf.left_indent = Cm(0)
pf.right_indent = Cm(0)
pf.space_before = Pt(0)
pf.space_after = Pt(0)
pf.line_spacing = 1.0
pf.alignment = WD_ALIGN_PARAGRAPH.CENTER
return style
# ================= 2. 业务逻辑函数 =================
def set_title(doc, data):
"""
处理大标题
逻辑:在文档开头插入或替换第一行作为标题
"""
title_style = get_style_title(doc)
title_text = f"{data['title']['city']}市第{data['title']['week_num']}周路口巡检周报"
# 检查文档第一行是否为空或包含旧标题,这里简单处理:直接替换第一行
if len(doc.paragraphs) > 0:
doc.paragraphs[0].text = title_text
doc.paragraphs[0].style = title_style
else:
# 如果文档为空(极少见),添加一个段落
doc.add_paragraph(title_text, style=title_style)
def replace_basic_placeholders(doc, data):
"""
替换正文占位符 (日期 + 第一部分数据)
应用样式:自定义正文 (宋体, 小四)
"""
body_style = get_style_body(doc)
# 组合所有需要替换的键值对
# 注意:日期单独处理,其他数据来自 part1
placeholders = {
'{{date}}': str(data.get('date', '')),
'{{total}}': str(data.get('part1', {}).get('total', '')),
'{{normal_cross_num}}': str(data.get('part1', {}).get('normal_cross_num', '')),
'{{focus_cross_num}}': str(data.get('part1', {}).get('focus_cross_num', '')),
'{{error_cross_num}}': str(data.get('part1', {}).get('error_cross_num', '')),
'{{usually_err_info}}': str(data.get('part1', {}).get('usually_err_info', '')),
'{{phase_err_num}}': str(data.get('part1', {}).get('phase_err_num', '')),
'{{static_org_num}}': str(data.get('part1', {}).get('static_org_num', '')),
'{{static_device_num}}': str(data.get('part1', {}).get('static_device_num', ''))
}
# 从第二个段落开始遍历(跳过标题)
for paragraph in doc.paragraphs[1:]:
contains_placeholder = any(key in paragraph.text for key in placeholders.keys())
if contains_placeholder:
for key, value in placeholders.items():
if key in paragraph.text:
paragraph.text = paragraph.text.replace(key, value)
# 强制应用正文样式
paragraph.style = body_style
def set_cell_content(cell, text, style):
"""写入单元格并应用样式"""
cell.text = str(text)
paragraph = cell.paragraphs[0]
paragraph.style = style
cell.vertical_alignment = WD_ALIGN_VERTICAL.CENTER
def merge_cells_in_column(table, start_row, end_row, col_index):
if start_row >= end_row:
return
start_cell = table.cell(start_row, col_index)
end_cell = table.cell(end_row, col_index)
start_cell.merge(end_cell)
def add_merged_table_rows(table, table_data, cell_style):
"""
填充表格数据(最终适配你的 dict 结构版)
"""
merge_list = []
current_row_idx = 1 # 0是表头数据从索引1开始
for item in table_data:
item_name = item.get('name', '') # 一级类目
item_data_list = item.get('item_data', [])
item_start_row = current_row_idx
item_total_rows = 0
for a in item_data_list:
a_name = a.get('name', '') # 二级类目
a_num = a.get('num', '')
a_item_data = a.get('item_data', []) # 路口列表
# 兜底逻辑:如果没有路口数据,给一个默认行
if not a_item_data:
a_item_data = [{'cross_name': '-', 'process_func': '-'}]
a_start_row = current_row_idx
a_total_rows = len(a_item_data)
for idx, b in enumerate(a_item_data):
row = table.add_row()
cells = row.cells
# 填充最右侧两列(路口名称、处理措施)
set_cell_content(cells[3], b.get('cross_name', ''), cell_style)
set_cell_content(cells[4], b.get('process_func', ''), cell_style)
# 填充中间两列(二级类目、异常数)- 只有该组的第一行填
if idx == 0:
set_cell_content(cells[1], a_name, cell_style)
set_cell_content(cells[2], a_num, cell_style)
else:
set_cell_content(cells[1], '', cell_style)
set_cell_content(cells[2], '', cell_style)
# 填充最左侧列(一级类目)- 只有整个大类的第一行填
if item_total_rows == 0 and idx == 0:
set_cell_content(cells[0], item_name, cell_style)
else:
set_cell_content(cells[0], '', cell_style)
current_row_idx += 1
# 记录二级类目的合并范围
if a_total_rows > 1:
merge_list.append((1, a_start_row, current_row_idx - 1))
merge_list.append((2, a_start_row, current_row_idx - 1))
item_total_rows += a_total_rows
# 记录一级类目的合并范围
if item_total_rows > 1:
merge_list.append((0, item_start_row, current_row_idx - 1))
# 从下往上执行合并(防止索引错乱)
merge_list.sort(key=lambda x: x[1], reverse=True)
for col_idx, start_idx, end_idx in merge_list:
try:
top_cell = table.cell(start_idx, col_idx)
bottom_cell = table.cell(end_idx, col_idx)
top_cell.merge(bottom_cell)
except Exception as e:
print(f"合并单元格失败: {e}")
def process_merged_table(doc, data):
table_data = data.get('part2', {}).get('data', [])
table_cell_style = get_style_table_cell(doc)
for table in doc.tables:
# 锁定目标表格
if len(table.rows) > 0 and len(table.rows[0].cells) == 5:
header_texts = [cell.text.strip() for cell in table.rows[0].cells]
if '巡检类型' in header_texts and '巡检项' in header_texts:
# 1. 清空模板里的示例行(只保留表头)
while len(table.rows) > 1:
table._tbl.remove(table.rows[1]._tr)
# 2. 填充数据并合并
add_merged_table_rows(table, table_data, table_cell_style)
# 3. 恢复样式
table.style = 'Table Grid'
for row in table.rows:
for cell in row.cells:
for paragraph in cell.paragraphs:
if paragraph.text.strip() not in ['巡检类型', '巡检项', '异常路口数', '异常路口', '处理措施']:
paragraph.style = table_cell_style
cell.vertical_alignment = WD_ALIGN_VERTICAL.CENTER
break
def fill_report_template(data):
doc = Document('./tool/XX市第XX周路口巡检周报模板.docx')
# 1. 设置大标题
set_title(doc, data)
# 2. 替换正文和日期
replace_basic_placeholders(doc, data)
tabel_style = get_style_table_cell(doc)
# 3. 处理表格
process_merged_table(doc, data)
for table in doc.tables:
if len(table.rows) > 0 and len(table.rows[0].cells) == 5:
for row_index, row in enumerate(table.rows):
if row_index == 0:
continue
columns = [table.columns[0], table.columns[1], table.columns[2]]
for column in columns:
for cell in column.cells:
set_cell_content(cell, cell.text.replace('\n', ''), tabel_style)
city_name = data.get('title', {}).get('city', '')
week_num = data.get('title', {}).get('week_num', '')
file_stream = io.BytesIO()
doc.save(file_stream)
file_stream.seek(0)
doc.save('./text.docx')
cos_path = f'user/cross_doctor/task_file'
folder_manager.ensure_folder(cos_path)
cos_key = f"{cos_path}/{city_name}{week_num}周路口巡检周报.docx"
cos_client.put_object(Bucket=g_cos_bucket, Key=cos_key, Body=file_stream)
download_url = f'{g_cos_root}/{cos_key}'
print(download_url)
return download_url
# ================= 3. 测试运行 =================
if __name__ == "__main__":
# 使用您提供的新数据结构
report_data = {'title': {'city': '福州市', 'week_num': 19}, 'date': '2026年05月12日', 'part1': {'total': 3, 'normal_cross_num': 3, 'focus_cross_num': 0, 'error_cross_num': 2, 'usually_err_info': '异常最多的巡检项为机动车多次排队2个、路口周期过大2个', 'phase_err_num': '需要优化路口方案的路口0个', 'static_org_num': '需要调整交通组织路口2个', 'static_device_num': '需要调整交通设施路口1个'}, 'part2': {'data': [{'name': '台账录入情况', 'item_data': [{'name': '录入渠化台账信息与现场不符', 'num': 0, 'item_data': []}, {'name': '录入配时方案与实际运行方案不符', 'num': 0, 'item_data': []}]}, {'name': '路口运行情况', 'item_data': [{'name': '机动车多次排队', 'num': 2, 'item_data': [{'cross_name': '康达路与清展街交叉口', 'process_func': ''}, {'cross_name': '清展街与福和路交叉口', 'process_func': '调整交通组织,其他处置措施'}]}, {'name': '停车次数较高', 'num': 0, 'item_data': []}]}, {'name': '配时方案情况', 'item_data': [{'name': '路口周期过大', 'num': 2, 'item_data': [{'cross_name': '康达路与清展街交叉口', 'process_func': ''}, {'cross_name': '清展街与福和路交叉口', 'process_func': '调整交通组织,其他处置措施'}]}, {'name': '行人过街时间不足', 'num': 0, 'item_data': []}]}, {'name': '设备设施情况', 'item_data': [{'name': '信号灯缺、损', 'num': 0, 'item_data': []}]}]}}
fill_report_template(
report_data
)
print("✅ 报告生成完成!已适配新数据结构。")