|
@@ -0,0 +1,257 @@
|
|
|
+import json,os
|
|
|
+from typing import List
|
|
|
+from PyPDF2 import PdfReader, PdfWriter
|
|
|
+from model import SplitModel,PageConfig
|
|
|
+from PIL import Image
|
|
|
+import io
|
|
|
+import pymupdf
|
|
|
+
|
|
|
+class PDFProcessor:
|
|
|
+ """PDF处理器类,负责执行PDF文件的拆分操作"""
|
|
|
+
|
|
|
+ def __init__(self):
|
|
|
+ pass
|
|
|
+
|
|
|
+ s="""
|
|
|
+ 按照我提供的目录整理信息,页数要准确,要求:
|
|
|
+ 1. 每章的标题精确到每节的一、二..的页数。 例如 第一章 第一节 一、xxxx 二、xxxx 。
|
|
|
+ 2. 返回的结构体:```typescript
|
|
|
+ type PageConfig = {
|
|
|
+ start_page: number; // 起始页码
|
|
|
+ end_page: number; // 结束页码
|
|
|
+ output_name: string; // 输出文件名称
|
|
|
+ };
|
|
|
+ type SplitModel = {
|
|
|
+ output_dir: string; // 输出目录
|
|
|
+ page_configs: PageConfig[]; // 页面配置数组
|
|
|
+ };
|
|
|
+ ```
|
|
|
+ 3. 输出文件名 格式 章@节@小节.pdf 例如 第一章 第一节 一、xxxx 的格式为 01xxxx@01xxxx@01xxxx.pdf (xxxx为具体标题内容)
|
|
|
+ 4. 输出目录路径为 章节/ 例如 第一章 就是 01xxxx/
|
|
|
+ 5. 目录一定要完整,不能有遗漏,不能有多余的目录
|
|
|
+ 6. 帮我整理1,2,3,4,5,6,7,8,9,10章的目录信息,并返回SplitModel的json数组,一个章节一个SplitModel
|
|
|
+ 7. end_page不能与下一个start_page相同
|
|
|
+ """
|
|
|
+
|
|
|
+ _generated_pdfs = [] # 类变量,用于存储生成的PDF文件路径
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def split(filename:str)-> None:
|
|
|
+ """将PDF文件按指定配置拆分成多个PDF文件"""
|
|
|
+ version = "v1"
|
|
|
+ base_json_dir = "./tools/pdf_json/"
|
|
|
+ base_input_dir = "./temp_files/pdf/source/"
|
|
|
+ base_output_dir = "./temp_files/pdf/output/"
|
|
|
+ json_file = f"{base_json_dir}/{version}/{filename}.json"
|
|
|
+ input_file = f"{base_input_dir}/{version}/{filename}.pdf"
|
|
|
+ output_dir = f"{base_output_dir}/{version}/{filename}/"
|
|
|
+ # 清空生成的PDF文件列表
|
|
|
+ PDFProcessor._generated_pdfs = []
|
|
|
+
|
|
|
+ try:
|
|
|
+ # 读取并解析JSON文件
|
|
|
+ with open(json_file, 'r', encoding='utf-8') as f:
|
|
|
+ json_data = json.load(f)
|
|
|
+
|
|
|
+ # 将JSON数据转换为SplitModel对象列表
|
|
|
+ split_models = []
|
|
|
+ for item in json_data:
|
|
|
+ page_configs = [PageConfig(**page) for page in item['page_configs']]
|
|
|
+ split_model = SplitModel(output_dir=item['output_dir'], page_configs=page_configs)
|
|
|
+ split_models.append(split_model)
|
|
|
+
|
|
|
+ # 调用batch_split_pdf进行处理
|
|
|
+ PDFProcessor.batch_split_pdf(input_file, output_dir, split_models)
|
|
|
+
|
|
|
+ # 所有PDF文件拆分完成后,执行图片转换
|
|
|
+ PDFProcessor.convert_pdf_images(PDFProcessor._generated_pdfs)
|
|
|
+
|
|
|
+
|
|
|
+ print("PDF文件拆分成功!")
|
|
|
+ except FileNotFoundError:
|
|
|
+ print(f"错误: 找不到JSON文件 {json_file}")
|
|
|
+ return
|
|
|
+ except json.JSONDecodeError as e:
|
|
|
+ print(f"错误: JSON文件格式无效 {str(e)}")
|
|
|
+ return
|
|
|
+ except Exception as e:
|
|
|
+ print(f"处理过程中发生错误: {str(e)}")
|
|
|
+ return
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def batch_split_pdf(input_file: str, base_output_dir: str, split_models: List[SplitModel]) -> None:
|
|
|
+ """批量处理多个PDF拆分任务
|
|
|
+
|
|
|
+ Args:
|
|
|
+ input_file: 输入PDF文件路径
|
|
|
+ base_output_dir: 基础输出目录路径
|
|
|
+ split_models: SplitModel配置对象数组
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ for split_model in split_models:
|
|
|
+ try:
|
|
|
+ PDFProcessor.split_pdf(input_file, base_output_dir, split_model)
|
|
|
+ except Exception as e:
|
|
|
+ print(f"处理拆分任务时发生错误: {str(e)}")
|
|
|
+ continue
|
|
|
+ except Exception as e:
|
|
|
+ print(f"批量处理PDF文件时发生错误: {str(e)}")
|
|
|
+ return
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def split_pdf(input_file: str, base_output_dir: str, split_model: SplitModel) -> None:
|
|
|
+ """将PDF文件按指定配置拆分成多个PDF文件,并为每个拆分的PDF文件执行图片转换
|
|
|
+
|
|
|
+ Args:
|
|
|
+ input_file: 输入PDF文件路径
|
|
|
+ base_output_dir: 基础输出目录路径
|
|
|
+ split_model: SplitModel配置对象
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ # 确保输出目录存在
|
|
|
+ output_dir = os.path.join(f"{base_output_dir}pdf/", split_model.output_dir)
|
|
|
+ os.makedirs(output_dir, exist_ok=True)
|
|
|
+
|
|
|
+ # 读取PDF文件
|
|
|
+ reader = PdfReader(input_file)
|
|
|
+ total_pages = len(reader.pages)
|
|
|
+
|
|
|
+ # 处理每个页面配置
|
|
|
+ for page_config in split_model.page_configs:
|
|
|
+ try:
|
|
|
+ # 验证页码范围
|
|
|
+ if page_config.start_page < 1 or page_config.end_page > total_pages or page_config.start_page > page_config.end_page:
|
|
|
+ print(f"警告: 页码范围 {page_config.start_page}-{page_config.end_page} 无效,已跳过")
|
|
|
+ continue
|
|
|
+
|
|
|
+ # 创建新的PDF文件
|
|
|
+ writer = PdfWriter()
|
|
|
+
|
|
|
+ # 添加指定范围的页面
|
|
|
+ for page_num in range(page_config.start_page - 1, page_config.end_page):
|
|
|
+ writer.add_page(reader.pages[page_num])
|
|
|
+
|
|
|
+ # 生成输出文件名
|
|
|
+ output_name = page_config.output_name
|
|
|
+ if not page_config.output_name.endswith(".pdf"):
|
|
|
+ output_name = f"{output_name}.pdf"
|
|
|
+ output_file = os.path.join(output_dir, output_name)
|
|
|
+
|
|
|
+ # 保存拆分后的PDF文件
|
|
|
+ with open(output_file, 'wb') as output:
|
|
|
+ writer.write(output)
|
|
|
+
|
|
|
+ print(f"成功创建文件: {output_file}")
|
|
|
+ PDFProcessor._generated_pdfs.append(output_file)
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ print(f"处理页面配置时发生错误: {str(e)}")
|
|
|
+ continue
|
|
|
+ except Exception as e:
|
|
|
+ print(f"处理PDF文件时发生错误: {str(e)}")
|
|
|
+ return
|
|
|
+ @staticmethod
|
|
|
+ def convert_pdf_images(generated_pdfs: List[str]) -> None:
|
|
|
+ """处理PDF文件的图片转换
|
|
|
+
|
|
|
+ Args:
|
|
|
+ generated_pdfs: 生成的PDF文件路径列表
|
|
|
+ """
|
|
|
+ print("开始处理图片转换...")
|
|
|
+ for pdf_file in generated_pdfs:
|
|
|
+ try:
|
|
|
+ result = PDFProcessor.extract_and_merge_images(pdf_file)
|
|
|
+ if not result:
|
|
|
+ print(f"图片转换失败: {pdf_file}")
|
|
|
+ except Exception as e:
|
|
|
+ print(f"图片转换过程中发生错误: {str(e)}")
|
|
|
+ continue
|
|
|
+ @staticmethod
|
|
|
+ def extract_and_merge_images(input_file: str, output_file: str = None) -> str:
|
|
|
+ try:
|
|
|
+ pdf_document = pymupdf.open(input_file)
|
|
|
+ images = []
|
|
|
+ total_height = 0
|
|
|
+ max_width = 0
|
|
|
+
|
|
|
+ # 遍历每一页提取图片
|
|
|
+ for page_num in range(pdf_document.page_count):
|
|
|
+ page = pdf_document[page_num]
|
|
|
+
|
|
|
+ # 获取页面上的所有图片,包括内嵌图片
|
|
|
+ pix = page.get_pixmap(matrix=pymupdf.Matrix(2, 2)) # 使用2倍缩放以获得更好的质量
|
|
|
+ img_data = pix.tobytes("png")
|
|
|
+
|
|
|
+ # 将图片字节转换为PIL Image对象
|
|
|
+ image = Image.open(io.BytesIO(img_data))
|
|
|
+ if image.mode != 'RGB':
|
|
|
+ image = image.convert('RGB')
|
|
|
+
|
|
|
+ images.append(image)
|
|
|
+ total_height += image.height
|
|
|
+ max_width = max(max_width, image.width)
|
|
|
+
|
|
|
+ # 如果没有找到图片
|
|
|
+ if not images:
|
|
|
+ print("未在PDF中找到任何图片")
|
|
|
+ return ''
|
|
|
+
|
|
|
+ # 创建新的图片用于拼接
|
|
|
+ merged_image = Image.new('RGB', (max_width, total_height))
|
|
|
+ y_offset = 0
|
|
|
+
|
|
|
+ # 将所有图片垂直拼接
|
|
|
+ for img in images:
|
|
|
+ x_offset = (max_width - img.width) // 2
|
|
|
+ merged_image.paste(img, (x_offset, y_offset))
|
|
|
+ y_offset += img.height
|
|
|
+
|
|
|
+ # 设置输出路径
|
|
|
+ if output_file is None:
|
|
|
+ parts = input_file.rsplit('/pdf/', 1)
|
|
|
+ output_file = '/pdf/'.join(parts[:-1]) + '/img/' + parts[-1]
|
|
|
+ output_file = os.path.splitext(output_file)[0] + "_merged.png"
|
|
|
+ os.makedirs(os.path.dirname(output_file), exist_ok=True)
|
|
|
+
|
|
|
+ # 根据图片数量计算目标大小
|
|
|
+ target_size_per_image = 100 * 1024 # 每张图片100KB
|
|
|
+ max_size = target_size_per_image * len(images)
|
|
|
+ scale = 1.0
|
|
|
+ quality = 95
|
|
|
+
|
|
|
+ while True:
|
|
|
+ temp_buffer = io.BytesIO()
|
|
|
+ if scale < 1.0:
|
|
|
+ new_size = (int(merged_image.width * scale), int(merged_image.height * scale))
|
|
|
+ resized_image = merged_image.resize(new_size, Image.Resampling.LANCZOS)
|
|
|
+ resized_image.save(temp_buffer, 'PNG', optimize=True, quality=quality)
|
|
|
+ else:
|
|
|
+ merged_image.save(temp_buffer, 'PNG', optimize=True, quality=quality)
|
|
|
+
|
|
|
+ size = temp_buffer.tell()
|
|
|
+
|
|
|
+ if size <= max_size:
|
|
|
+ with open(output_file, 'wb') as f:
|
|
|
+ f.write(temp_buffer.getvalue())
|
|
|
+ print(f"成功保存图片:[{(size // 1024)} KB] {output_file}")
|
|
|
+ break
|
|
|
+
|
|
|
+ if scale > 0.5:
|
|
|
+ scale *= 0.9
|
|
|
+ else:
|
|
|
+ # 如果达到最小缩放比例,直接保存当前结果
|
|
|
+ with open(output_file, 'wb') as f:
|
|
|
+ f.write(temp_buffer.getvalue())
|
|
|
+ print(f"成功保存图片:[{(size // 1024)} KB] {output_file}")
|
|
|
+ break
|
|
|
+
|
|
|
+ return output_file
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ print(f"处理图片时发生错误: {str(e)}")
|
|
|
+ return ''
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|