123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201 |
- import json, re
- import tools.utils as utils
- from openai import OpenAI
- from pathlib import Path
- class AiHelper:
- _ai_api_key = None
- _ai_api_url = None
- _ai_max_tokens = 150
- def __init__(self, api_url: str = None, api_key: str = None, api_model: str = None):
- self._ai_api_url = api_url if api_url else utils.get_config_value("ai.url")
- self._ai_api_key = api_key if api_key else utils.get_config_value("ai.key")
- self._api_model = api_model if api_model else utils.get_config_value("ai.model")
- max_tokens = utils.get_config_value("ai.max_tokens")
- if max_tokens:
- self._ai_max_tokens = int(max_tokens)
- def call_openai(
- self,
- system_prompt: str,
- user_prompt: str,
- api_url: str = None,
- api_key: str = None,
- api_model: str = None,
- ) -> json:
- self.check_api(api_key, api_model, api_url)
- utils.get_logger().info(
- f"调用AI API ==> Url:{self._ai_api_url},Model:{self._api_model}"
- )
- client = OpenAI(api_key=self._ai_api_key, base_url=self._ai_api_url)
- completion = client.chat.completions.create(
- model=self._api_model,
- messages=[
- {
- "role": "system",
- "content": system_prompt,
- },
- {
- "role": "user",
- "content": user_prompt,
- },
- ],
- stream=False,
- temperature=0.7,
- response_format={"type": "json_object"},
- # max_tokens=self._ai_max_tokens,
- )
- try:
- response = completion.model_dump_json()
- result = {}
- response_json = json.loads(response)
- res_str = self._extract_message_content(response_json)
- result_data = self._parse_response(res_str, True)
- if result_data:
- result["data"] = result_data
- usage = response_json["usage"]
- result["completion_tokens"] = usage.get("completion_tokens", 0)
- result["prompt_tokens"] = usage.get("prompt_tokens", 0)
- result["total_tokens"] = usage.get("total_tokens", 0)
- utils.get_logger().info(f"AI Process JSON: {result}")
- else:
- utils.get_logger().info(f"AI Response: {response}")
- return result
- except Exception as e:
- raise Exception(f"解析 AI 响应错误: {e}")
- def check_api(self, api_key, api_model, api_url):
- if api_url:
- self._ai_api_url = api_url
- if api_key:
- self._ai_api_key = api_key
- if api_model:
- self._api_model = api_model
- if self._ai_api_key is None:
- raise Exception("AI API key 没有配置")
- if self._ai_api_url is None:
- raise Exception("AI API url 没有配置")
- if self._api_model is None:
- raise Exception("AI API model 没有配置")
- @staticmethod
- def _extract_message_content(response_json: dict) -> str:
- utils.get_logger().info(f"AI Response JSON: {response_json}")
- if "choices" in response_json and len(response_json["choices"]) > 0:
- choice = response_json["choices"][0]
- message_content = choice.get("message", {}).get("content", "")
- elif "message" in response_json:
- message_content = response_json["message"].get("content", "")
- else:
- raise Exception("AI 响应中未找到有效的 choices 或 message 数据")
- # 移除多余的 ```json 和 ```
- if message_content.startswith("```json") and message_content.endswith("```"):
- message_content = message_content[6:-3]
- # 去除开头的 'n' 字符
- if message_content.startswith("n"):
- message_content = message_content[1:]
- # 移除无效的转义字符和时间戳前缀
- message_content = re.sub(
- r"\\[0-9]{2}", "", message_content
- ) # 移除 \32 等无效转义字符
- message_content = re.sub(
- r"\d{4}-\d{2}-\dT\d{2}:\d{2}:\d{2}\.\d+Z", "", message_content
- ) # 移除时间戳
- message_content = message_content.strip() # 去除首尾空白字符
- # 替换所有的反斜杠
- message_content = message_content.replace("\\", "")
- return message_content
- def _parse_response(self, response: str, first=True) -> json:
- # utils.get_logger().info(f"AI Response JSON STR: {response}")
- try:
- data = json.loads(response)
- return data
- except json.JSONDecodeError as e:
- if first:
- utils.get_logger().error(
- f"JSON 解析错误,去除部分特殊字符重新解析一次: {e}"
- )
- # 替换中文引号为空
- message_content = re.sub(r"[“”]", "", response) # 替换双引号
- message_content = re.sub(r"[‘’]", "", message_content) # 替换单引号
- return self._parse_response(message_content, False)
- else:
- raise Exception(f"解析 AI 响应错误: {response} {e}")
- def call_openai_with_image(
- self,
- image_path,
- system_prompt: str,
- user_prompt: str,
- api_url: str = None,
- api_key: str = None,
- api_model: str = None,
- ) -> json:
- pass
- def call_openai_with_file(
- self,
- file_path,
- system_prompt: str,
- user_prompt: str,
- api_url: str = None,
- api_key: str = None,
- api_model: str = None,
- ) -> json:
- self.check_api(api_key, api_model, api_url)
- utils.get_logger().info(
- f"调用AI API File==> Url:{self._ai_api_url},Model:{self._api_model}"
- )
- client = OpenAI(api_key=self._ai_api_key, base_url=self._ai_api_url)
- file_object = client.files.create(
- file=Path(file_path),
- purpose="file-extract",
- )
- completion = client.chat.completions.create(
- model=self._api_model,
- messages=[
- {
- "role": "system",
- # "content": system_prompt,
- "content": f"fileid://{file_object.id}",
- },
- {
- "role": "user",
- "content": user_prompt,
- },
- ],
- stream=False,
- temperature=0.7,
- response_format={"type": "json_object"},
- # max_tokens=self._ai_max_tokens,
- )
- try:
- response = completion.model_dump_json()
- result = {}
- response_json = json.loads(response)
- res_str = self._extract_message_content(response_json)
- result_data = self._parse_response(res_str, True)
- if result_data:
- result["data"] = result_data
- usage = response_json["usage"]
- result["completion_tokens"] = usage.get("completion_tokens", 0)
- result["prompt_tokens"] = usage.get("prompt_tokens", 0)
- result["total_tokens"] = usage.get("total_tokens", 0)
- utils.get_logger().info(f"AI Process JSON: {result}")
- else:
- utils.get_logger().info(f"AI Response: {response}")
- return result
- except Exception as e:
- raise Exception(f"解析 AI 响应错误: {e}")
- pass
|