123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507 |
- import json,os,time
- import re
- from typing import List
- from PyPDF2 import PdfReader, PdfWriter
- from tools.pdf_split.model import SplitModel,PageConfig
- from PIL import Image
- import io
- import pymupdf
- import tools.utils as utils
- from tools.utils.file_helper import encode_image
- from tools.utils.ai_helper import AiHelper
- from tools.pdf_split.mysql_store import MysqlStore
- class PDFProcessor:
- """PDF处理器类,负责执行PDF文件的拆分操作"""
- def __init__(self):
- pass
-
- s="""
- 按照我提供的目录整理信息,页数要准确,要求:
- 1. 每章的标题精确到每节的一、二..的页数。 例如 第一章 第一节 一、xxxx 二、xxxx 。
- 2. 返回的结构体:```typescript
- type PageConfig = {
- start_page: number; // 起始页码
- end_page: number; // 结束页码
- output_name: string; // 输出文件名称
- };
- type SplitModel = {
- output_dir: string; // 输出目录
- page_configs: PageConfig[]; // 页面配置数组
- };
- ```
- 3. 输出文件名 格式 章@节@小节.pdf 例如 第一章 第一节 一、xxxx 的格式为 01xxxx@01xxxx@01xxxx.pdf (xxxx为具体标题内容)
- 4. 输出目录路径为 章节/ 例如 第一章 就是 01xxxx/
- 5. 目录一定要完整,不能有遗漏,不能有多余的目录
- 6. 帮我整理1,2,3,4,5,6,7,8,9,10章的目录信息,并返回SplitModel的json数组,一个章节一个SplitModel
- 7. end_page不能与下一个start_page相同
- """
- _generated_pdfs = [] # 类变量,用于存储生成的PDF文件路径
- @staticmethod
- def split(filename:str)-> None:
- """将PDF文件按指定配置拆分成多个PDF文件"""
- version = "v1"
- base_json_dir = "./tools/pdf_json/"
- base_input_dir = "./temp_files/pdf/source/"
- base_output_dir = "./temp_files/pdf/output/"
- json_file = f"{base_json_dir}/{version}/{filename}.json"
- input_file = f"{base_input_dir}/{version}/{filename}.pdf"
- output_dir = f"{base_output_dir}/{version}/{filename}/"
- # 清空生成的PDF文件列表
- PDFProcessor._generated_pdfs = []
- try:
- # 读取并解析JSON文件
- with open(json_file, 'r', encoding='utf-8') as f:
- json_data = json.load(f)
- # 将JSON数据转换为SplitModel对象列表
- split_models = []
- for item in json_data:
- page_configs = [PageConfig(**page) for page in item['page_configs']]
- split_model = SplitModel(output_dir=item['output_dir'], page_configs=page_configs)
- split_models.append(split_model)
- # 调用batch_split_pdf进行处理
- PDFProcessor.batch_split_pdf(input_file, output_dir, split_models)
-
- # 所有PDF文件拆分完成后,执行图片转换
- PDFProcessor.convert_pdf_images(PDFProcessor._generated_pdfs)
-
-
- print("PDF文件拆分成功!")
- except FileNotFoundError:
- print(f"错误: 找不到JSON文件 {json_file}")
- return
- except json.JSONDecodeError as e:
- print(f"错误: JSON文件格式无效 {str(e)}")
- return
- except Exception as e:
- print(f"处理过程中发生错误: {str(e)}")
- return
- @staticmethod
- def batch_split_pdf(input_file: str, base_output_dir: str, split_models: List[SplitModel]) -> None:
- """批量处理多个PDF拆分任务
- Args:
- input_file: 输入PDF文件路径
- base_output_dir: 基础输出目录路径
- split_models: SplitModel配置对象数组
- """
- try:
- for split_model in split_models:
- try:
- PDFProcessor.split_pdf(input_file, base_output_dir, split_model)
- except Exception as e:
- print(f"处理拆分任务时发生错误: {str(e)}")
- continue
- except Exception as e:
- print(f"批量处理PDF文件时发生错误: {str(e)}")
- return
- @staticmethod
- def split_pdf(input_file: str, base_output_dir: str, split_model: SplitModel) -> None:
- """将PDF文件按指定配置拆分成多个PDF文件,并为每个拆分的PDF文件执行图片转换
- Args:
- input_file: 输入PDF文件路径
- base_output_dir: 基础输出目录路径
- split_model: SplitModel配置对象
- """
- try:
- # 确保输出目录存在
- output_dir = os.path.join(f"{base_output_dir}pdf/", split_model.output_dir)
- os.makedirs(output_dir, exist_ok=True)
-
- # 读取PDF文件
- reader = PdfReader(input_file)
- total_pages = len(reader.pages)
- # 处理每个页面配置
- for page_config in split_model.page_configs:
- try:
- # 验证页码范围
- if page_config.start_page < 1 or page_config.end_page > total_pages or page_config.start_page > page_config.end_page:
- print(f"警告: 页码范围 {page_config.start_page}-{page_config.end_page} 无效,已跳过")
- continue
-
- # 创建新的PDF文件
- writer = PdfWriter()
-
- # 添加指定范围的页面
- for page_num in range(page_config.start_page - 1, page_config.end_page):
- writer.add_page(reader.pages[page_num])
-
- # 生成输出文件名
- output_name = page_config.output_name
- if not page_config.output_name.endswith(".pdf"):
- output_name = f"{output_name}.pdf"
- output_file = os.path.join(output_dir, output_name)
-
- # 保存拆分后的PDF文件
- with open(output_file, 'wb') as output:
- writer.write(output)
-
- print(f"成功创建文件: {output_file}")
- PDFProcessor._generated_pdfs.append(output_file)
-
- except Exception as e:
- print(f"处理页面配置时发生错误: {str(e)}")
- continue
- except Exception as e:
- print(f"处理PDF文件时发生错误: {str(e)}")
- return
- @staticmethod
- def convert_pdf_images(generated_pdfs: List[str]) -> None:
- """处理PDF文件的图片转换
- Args:
- generated_pdfs: 生成的PDF文件路径列表
- """
- print("开始处理图片转换...")
- for pdf_file in generated_pdfs:
- try:
- result = PDFProcessor.extract_and_merge_images(pdf_file)
- if not result:
- print(f"图片转换失败: {pdf_file}")
- except Exception as e:
- print(f"图片转换过程中发生错误: {str(e)}")
- continue
- @staticmethod
- def extract_and_merge_images(input_file: str, output_file: str = None) -> str:
- try:
- pdf_document = pymupdf.open(input_file)
- # 根据输入路径生成图片目录
- output_name = os.path.splitext(os.path.basename(input_file))[0]
- parts = input_file.rsplit('/pdf/', 1)
- output_file = '/pdf/'.join(parts[:-1]) + '/img/' + parts[-1]
- output_dir = os.path.splitext(output_file)[0]
- #output_dir = output_file + f'/{output_name}/'
- os.makedirs(output_dir, exist_ok=True)
- # 遍历每一页提取图片
- for page_num in range(pdf_document.page_count):
- page = pdf_document[page_num]
- # 获取页面上的所有图片,包括内嵌图片
- # pix = page.get_pixmap(matrix=pymupdf.Matrix(2, 2)) # 实际使用的缩放参数
- # img_data = pix.tobytes("png")
- # 初始化压缩参数
- scale = 1.0
- img_data = None
- max_size = 200 * 1024
- # 循环调整缩放直到符合大小要求
- while scale >= 0.5: # 最小缩放比例50%
- # 生成临时图片数据
- temp_pix = page.get_pixmap(matrix=pymupdf.Matrix(1.5 * scale, 1.5 * scale))
- img_data = temp_pix.tobytes("png")
- if len(img_data) <= max_size: # 100KB限制
- break
- scale *= 0.9 # 每次缩小10%
- # 生成序列文件名
- img_path = os.path.join(output_dir, f"{page_num + 1:02d}.png")
- # 保存单页图片
- with open(img_path, 'wb') as f:
- f.write(img_data)
- print(f"成功保存图片({len(img_data) // 1024}KB): {img_path}")
- return output_dir
- except Exception as e:
- print(f"处理图片时发生错误: {str(e)}")
- return ''
- # @staticmethod
- # def extract_and_merge_images(input_file: str, output_file: str = None) -> str:
- # try:
- # pdf_document = pymupdf.open(input_file)
- # images = []
- # total_height = 0
- # max_width = 0
- #
- # # 遍历每一页提取图片
- # for page_num in range(pdf_document.page_count):
- # page = pdf_document[page_num]
- #
- # # 获取页面上的所有图片,包括内嵌图片
- # pix = page.get_pixmap(matrix=pymupdf.Matrix(2, 2)) # 使用2倍缩放以获得更好的质量
- # img_data = pix.tobytes("png")
- #
- # # 将图片字节转换为PIL Image对象
- # image = Image.open(io.BytesIO(img_data))
- # if image.mode != 'RGB':
- # image = image.convert('RGB')
- #
- # images.append(image)
- # total_height += image.height
- # max_width = max(max_width, image.width)
- #
- # # 如果没有找到图片
- # if not images:
- # print("未在PDF中找到任何图片")
- # return ''
- #
- # # 创建新的图片用于拼接
- # merged_image = Image.new('RGB', (max_width, total_height))
- # y_offset = 0
- #
- # # 将所有图片垂直拼接
- # for img in images:
- # x_offset = (max_width - img.width) // 2
- # merged_image.paste(img, (x_offset, y_offset))
- # y_offset += img.height
- #
- # # 设置输出路径
- # if output_file is None:
- # parts = input_file.rsplit('/pdf/', 1)
- # output_file = '/pdf/'.join(parts[:-1]) + '/img/' + parts[-1]
- # output_file = os.path.splitext(output_file)[0] + "_merged.png"
- # os.makedirs(os.path.dirname(output_file), exist_ok=True)
- #
- # # 根据图片数量计算目标大小
- # target_size_per_image = 200 * 1024 # 每张图片100KB
- # max_size = target_size_per_image * len(images)
- # scale = 1.0
- # quality = 95
- #
- # while True:
- # temp_buffer = io.BytesIO()
- # if scale < 1.0:
- # new_size = (int(merged_image.width * scale), int(merged_image.height * scale))
- # resized_image = merged_image.resize(new_size, Image.Resampling.LANCZOS)
- # resized_image.save(temp_buffer, 'PNG', optimize=True, quality=quality)
- # else:
- # merged_image.save(temp_buffer, 'PNG', optimize=True, quality=quality)
- #
- # size = temp_buffer.tell()
- #
- # if size <= max_size:
- # with open(output_file, 'wb') as f:
- # f.write(temp_buffer.getvalue())
- # print(f"成功保存图片:[{(size // 1024)} KB] {output_file}")
- # break
- #
- # if scale > 0.5:
- # scale *= 0.9
- # else:
- # # 如果达到最小缩放比例,直接保存当前结果
- # with open(output_file, 'wb') as f:
- # f.write(temp_buffer.getvalue())
- # print(f"成功保存图片:[{(size // 1024)} KB] {output_file}")
- # break
- #
- # return output_file
- #
- # except Exception as e:
- # print(f"处理图片时发生错误: {str(e)}")
- # return ''
- @staticmethod
- def process_image_to_txt(filename: str):
- """将目录下的多张图片合并生成一个Markdown文件"""
- version = "v1"
- base_output_dir = "./temp_files/pdf/output/"
- output_dir = f"{base_output_dir}/{version}/{filename}/"
- image_dir = f"{output_dir}/img/"
- txt_dir = f"{output_dir}/txt/"
- db_store = MysqlStore()
- ai_helper = AiHelper()
- # 创建标准记录(如果不存在)
- if not db_store.get_standard_by_name(filename):
- db_store.create_standard(code=filename, name=filename)
- try:
- # 遍历图片目录中的每个子目录(新增目录处理逻辑)
- for dir_path, dir_names, file_names in os.walk(image_dir):
- # 跳过根目录
- if dir_path == image_dir:
- continue
- # 解析目录结构(新增章节解析)
- dir_rel_path = os.path.relpath(dir_path, image_dir)
- chapter_parts = dir_rel_path.split('@')
- if len(chapter_parts) < 3:
- continue # 跳过不符合命名规范的目录
- # 生成对应的txt目录
- #txt_subdir = os.path.join(txt_dir, dir_rel_path)
- #os.makedirs(txt_subdir, exist_ok=True)
- # 收集当前目录的所有图片(新增图片收集逻辑)
- image_files = sorted(
- [f for f in file_names if f.lower().endswith(('.png', '.jpg', '.jpeg'))],
- key=lambda x: int(x.split('.')[0])
- )
- if not image_files:
- continue
- # 创建合并的markdown文件(修改文件生成逻辑)
- md_filename = f"{dir_rel_path.replace('@', '_')}.md"
- md_path = os.path.join(txt_dir, md_filename)
- os.makedirs(os.path.dirname(md_path), exist_ok=True)
- md_content = f"# {filename}\n## {'/'.join(chapter_parts)}\n\n"
- # 处理目录下所有图片(新增合并循环)
- all_images = []
- for img_file in image_files:
-
- img_path = os.path.join(dir_path, img_file)
- img_name = os.path.basename(img_path)
- try:
- # 调用AI分析图片
- page_content = ai_helper.analyze_image_with_ai(img_path)
- # 添加5秒延时以控制API请求频率
- time.sleep(5)
- # 生成图片相对路径
- #rel_path = os.path.relpath(img_path, txt_subdir)
- utils.get_logger().info(f"处理图片 {img_path} 成功")
- md_content += f"########### {img_path} ####################\n"
- md_content += f"--start{img_name}--\n\n"
- md_content += f"\n\n{page_content}\n\n"
- md_content += f"--end{img_name}--\n\n"
- all_images.append(img_path)
- except Exception as e:
- print(f"处理图片 {img_file} 失败: {str(e)}")
- continue
- # 保存合并的文档(修改保存逻辑)
- with open(md_path, 'w', encoding='utf-8') as f:
- f.write(md_content)
- # 插入数据库记录(新增批量记录)
- db_store.add_pdf_record(
- standard_name=filename,
- pdf_path=os.path.abspath(all_images[0].replace("/img/", "/pdf/").rsplit('.', 1)[0] + '.pdf'),
- image_path='\n'.join([os.path.abspath(p) for p in all_images]),
- markdown_text=md_content,
- chapter=chapter_parts[0],
- section=chapter_parts[1],
- subsection=chapter_parts[2]
- )
- print(f"成功生成合并文档: {md_path}")
- except Exception as e:
- print(f"处理过程中发生错误: {str(e)}")
- @staticmethod
- def regenerate_markdown(img_path: str):
- """重新生成指定图片或目录的Markdown内容"""
- processor = PDFProcessor()
- db_store = MysqlStore()
- ai_helper = AiHelper()
- if os.path.isdir(img_path):
- # 处理整个目录
- dir_path = img_path
- # 通过图片路径反向推导标准名称和目录结构
- parts = dir_path.split('/img/')
- if len(parts) < 2:
- print("无效的目录路径")
- return
- # 获取原Markdown文件路径
- txt_root = dir_path.replace('/img/', '/txt/')
- md_files = [f for f in os.listdir(txt_root) if f.endswith('.md')]
- if not md_files:
- print("找不到对应的Markdown文件")
- return
- md_path = os.path.join(txt_root, md_files[0])
- # 重新生成整个目录内容
- processor._process_directory(dir_path, md_path, db_store, ai_helper)
- elif os.path.isfile(img_path):
- # 处理单个图片
- img_file = os.path.basename(img_path)
- dir_path = os.path.dirname(img_path)
- # 查找对应的Markdown文件
- txt_dir = dir_path.replace('/img/', '/txt/')
- md_files = [f for f in os.listdir(txt_dir) if f.endswith('.md')]
- if not md_files:
- print("找不到对应的Markdown文件")
- return
- md_path = os.path.join(txt_dir, md_files[0])
- # 更新单个图片内容
- processor._update_single_image(img_path, md_path, db_store, ai_helper)
- def _process_directory(self, dir_path: str, md_path: str, db_store, ai_helper):
- """处理整个目录重新生成"""
- # 收集目录下所有图片
- image_files = sorted(
- [f for f in os.listdir(dir_path) if f.lower().endswith(('.png', '.jpg', '.jpeg'))],
- key=lambda x: int(x.split('.')[0])
- )
- # 重新生成Markdown内容
- new_content = f"# {os.path.basename(os.path.dirname(md_path))}\n"
- all_images = []
- for img_file in image_files:
- img_path = os.path.join(dir_path, img_file)
- page_content = ai_helper.analyze_image_with_ai(img_path)
- img_name = os.path.splitext(img_file)[0]
- new_content += f"########### {img_path} ####################\n"
- new_content += f"--start{img_name}--\n\n"
- new_content += f"\n\n{page_content}\n\n"
- new_content += f"--end{img_name}--\n\n"
- all_images.append(img_path)
- # 写入更新后的内容
- with open(md_path, 'w', encoding='utf-8') as f:
- f.write(new_content)
- # 更新数据库记录
- db_store.update_pdf_record(
- markdown_text=new_content,
- image_paths=','.join(all_images),
- by_image_path=all_images[0]
- )
- def _update_single_image(self, img_path: str, md_path: str, db_store, ai_helper):
- """更新单个图片内容"""
- img_name = os.path.splitext(os.path.basename(img_path))[0]
- # 读取原有内容
- with open(md_path, 'r', encoding='utf-8') as f:
- content = f.read()
- # 生成新内容
- start_tag = f"--start{img_name}--"
- end_tag = f"--end{img_name}--"
- pattern = re.compile(f'{re.escape(start_tag)}(.*?){re.escape(end_tag)}', re.DOTALL)
- # 调用AI重新分析
- new_content = ai_helper.analyze_image_with_ai(img_path)
- updated_section = f"{start_tag}\n\n{new_content}\n\n{end_tag}"
- # 替换内容
- new_md_content = re.sub(pattern, updated_section, content)
- # 写入更新后的内容
- with open(md_path, 'w', encoding='utf-8') as f:
- f.write(new_md_content)
- # 更新数据库记录
- # db_store.update_pdf_record(
- # markdown_text=new_md_content,
- # image_paths=img_path,
- # by_image_path=img_path
- # )
|