| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162 |
- import json
- import re
- import tools.utils as utils
- from tools.utils.file_helper import encode_image
- from openai import OpenAI
- class AiHelper:
- _ai_api_key = None
- _ai_api_url = None
- _ai_max_tokens = 150
- def __init__(self, api_url: str=None, api_key: str=None, api_model: str=None):
- self._ai_api_url = api_url if api_url else utils.get_config_value("ai.url")
- self._ai_api_key = api_key if api_key else utils.get_config_value("ai.key")
- self._api_model = api_model if api_model else utils.get_config_value("ai.model")
- max_tokens = utils.get_config_value("ai.max_tokens")
- if max_tokens:
- self._ai_max_tokens = int(max_tokens)
- def call_openai(self, system_prompt: str, user_prompt: str,api_url: str=None,api_key: str=None,api_model: str=None) -> json:
- if api_url:
- self._ai_api_url = api_url
- if api_key:
- self._ai_api_key = api_key
- if api_model:
- self._api_model = api_model
- if self._ai_api_key is None:
- raise Exception("AI API key 没有配置")
- if self._ai_api_url is None:
- raise Exception("AI API url 没有配置")
- if self._api_model is None:
- raise Exception("AI API model 没有配置")
- utils.get_logger().info(f"调用AI API ==> Url:{self._ai_api_url},Model:{self._api_model}")
- client = OpenAI(api_key=self._ai_api_key, base_url=self._ai_api_url)
- completion = client.chat.completions.create(
- model=self._api_model,
- messages=[
- {
- "role": "system",
- "content": system_prompt,
- },
- {
- "role": "user",
- "content": user_prompt,
- },
- ],
- stream=False,
- temperature=0.7,
- response_format={"type": "json_object"},
- # max_tokens=self._ai_max_tokens,
- )
- try:
- response = completion.model_dump_json()
- result = {}
- response_json = json.loads(response)
- res_str = self._extract_message_content(response_json)
- result_data = self._parse_response(res_str, True)
- if result_data:
- result["data"] = result_data
- usage = response_json["usage"]
- result["completion_tokens"] = usage.get("completion_tokens", 0)
- result["prompt_tokens"] = usage.get("prompt_tokens", 0)
- result["total_tokens"] = usage.get("total_tokens", 0)
- utils.get_logger().info(f"AI Process JSON: {result}")
- else:
- utils.get_logger().info(f"AI Response: {response}")
- return result
- except Exception as e:
- raise Exception(f"解析 AI 响应错误: {e}")
- @staticmethod
- def _extract_message_content(response_json: dict) -> str:
- utils.get_logger().info(f"AI Response JSON: {response_json}")
- if "choices" in response_json and len(response_json["choices"]) > 0:
- choice = response_json["choices"][0]
- message_content = choice.get("message", {}).get("content", "")
- elif "message" in response_json:
- message_content = response_json["message"].get("content", "")
- else:
- raise Exception("AI 响应中未找到有效的 choices 或 message 数据")
- # 移除多余的 ```json 和 ```
- if message_content.startswith("```json") and message_content.endswith(
- "```"):
- message_content = message_content[6:-3]
- # 去除开头的 'n' 字符
- if message_content.startswith("n"):
- message_content = message_content[1:]
- # 移除无效的转义字符和时间戳前缀
- message_content = re.sub(r"\\[0-9]{2}", "",
- message_content) # 移除 \32 等无效转义字符
- message_content = re.sub(r"\d{4}-\d{2}-\dT\d{2}:\d{2}:\d{2}\.\d+Z", "",
- message_content) # 移除时间戳
- message_content = message_content.strip() # 去除首尾空白字符
- # 替换所有的反斜杠
- message_content = message_content.replace("\\", "")
- return message_content
- def _parse_response(self, response: str, first=True) -> json:
- # utils.get_logger().info(f"AI Response JSON STR: {response}")
- try:
- data = json.loads(response)
- return data
- except json.JSONDecodeError as e:
- if first:
- utils.get_logger().error(f"JSON 解析错误,去除部分特殊字符重新解析一次: {e}")
- # 替换中文引号为空
- message_content = re.sub(r"[“”]", "", response) # 替换双引号
- message_content = re.sub(r"[‘’]", "", message_content) # 替换单引号
- return self._parse_response(message_content, False)
- else:
- raise Exception(f"解析 AI 响应错误: {response} {e}")
- def analyze_image_with_ai(self,image_path, api_url: str=None,api_key: str=None,api_model: str=None):
- """调用OpenAI的API分析图片内容"""
- if api_url:
- self._ai_api_url = api_url
- if api_key:
- self._ai_api_key = api_key
- if api_model:
- self._api_model = api_model
- if self._ai_api_key is None:
- raise Exception("AI API key 没有配置")
- if self._ai_api_url is None:
- raise Exception("AI API url 没有配置")
- if self._api_model is None:
- raise Exception("AI API model 没有配置")
- try:
- client = OpenAI(api_key=self._ai_api_key, base_url=self._ai_api_url)
- base64_str = encode_image(image_path)
- response = client.chat.completions.create(
- model=self._api_model,
- messages=[
- {
- "role": "user",
- "content": [
- {"type": "text",
- "text": "请总结图片中的表格,供RAG系统embedding使用。要求以文本的信息列出,定额编号对应的详细信息,其中表格的列名中显示了定额编号,行名中显示了电算代号。定额编号所示的列代表了这一类定额,通过项目的不同条件来区分,比如长度、地质条件等;而电算代号所示的行则代表了具体的材料、人工等的消耗量,表示在特定定额编号所示的条件下,具体的资源(人力或材料)消耗量。"},
- {
- "type": "image_url",
- "image_url": {
- "url": base64_str
- }
- }
- ]
- }
- ],
- timeout=600
- )
- return response.choices[0].message.content
- except Exception as e:
- print(f"调用AI接口时出错: {e}")
- return ''
|