123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112 |
- import json
- import re
- from openai import OpenAI
- import utils
- class AiHelper:
- _ai_api_key = None
- _ai_api_url = None
- _ai_max_tokens = 150
- def __init__(self):
- self._ai_api_key = utils.get_config_value("ai.key")
- self._ai_api_url = utils.get_config_value("ai.url")
- self._api_model = utils.get_config_value("ai.model")
- max_tokens = utils.get_config_value("ai.max_tokens")
- if max_tokens:
- self._ai_max_tokens = int(max_tokens)
- def call_openai(self, system_prompt: str, user_prompt: str) -> json:
- utils.get_logger().info("调用AI API")
- if self._ai_api_key is None:
- raise Exception("AI API key 没有配置")
- if self._ai_api_url is None:
- raise Exception("AI API url 没有配置")
- if self._api_model is None:
- raise Exception("AI API model 没有配置")
- client = OpenAI(api_key=self._ai_api_key, base_url=self._ai_api_url)
- completion = client.chat.completions.create(
- model=self._api_model,
- messages=[
- {
- "role": "system",
- "content": system_prompt,
- },
- {
- "role": "user",
- "content": user_prompt,
- },
- ],
- stream=False,
- temperature=0.7,
- )
- try:
- response = completion.model_dump_json()
- response_json = json.loads(response)
- res_str = self._extract_message_content(response_json)
- result = self._parse_response(res_str, True)
- if result:
- usage = response_json["usage"]
- result["completion_tokens"] = usage.get("completion_tokens", 0)
- result["prompt_tokens"] = usage.get("prompt_tokens", 0)
- result["total_tokens"] = usage.get("total_tokens", 0)
- # utils.get_logger().info(f"AI Process JSON: {result}")
- else:
- utils.get_logger().info(f"AI Response: {response}")
- return result
- except Exception as e:
- raise Exception(f"解析 AI 响应错误: {e}")
- @staticmethod
- def _extract_message_content(response_json: dict) -> str:
- if "choices" in response_json and len(response_json["choices"]) > 0:
- choice = response_json["choices"][0]
- message_content = choice.get("message", {}).get("content", "")
- elif "message" in response_json:
- message_content = response_json["message"].get("content", "")
- else:
- raise Exception("AI 响应中未找到有效的 choices 或 message 数据")
- # 移除多余的 ```json 和 ```
- if message_content.startswith("```json") and message_content.endswith("```"):
- message_content = message_content[6:-3]
- # 去除开头的 'n' 字符
- if message_content.startswith("n"):
- message_content = message_content[1:]
- # 移除无效的转义字符和时间戳前缀
- message_content = re.sub(
- r"\\[0-9]{2}", "", message_content
- ) # 移除 \32 等无效转义字符
- message_content = re.sub(
- r"\d{4}-\d{2}-\dT\d{2}:\d{2}:\d{2}\.\d+Z", "", message_content
- ) # 移除时间戳
- message_content = message_content.strip() # 去除首尾空白字符
- # 替换所有的反斜杠
- message_content = message_content.replace("\\", "")
- return message_content
- def _parse_response(self, response: str, first=True) -> json:
- # utils.get_logger().info(f"AI Response JSON STR: {response}")
- try:
- data = json.loads(response)
- return data
- except json.JSONDecodeError as e:
- if first:
- utils.get_logger().error(
- f"JSON 解析错误,去除部分特殊字符重新解析一次: {e}"
- )
- # 替换中文引号为空
- message_content = re.sub(r"[“”]", "", response) # 替换双引号
- message_content = re.sub(r"[‘’]", "", message_content) # 替换单引号
- return self._parse_response(message_content, False)
- else:
- raise Exception(f"解析 AI 响应错误: {response} {e}")
|