huoyan-enterprise/backend/services/moderation_service.py

811 lines
29 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
阿里云内容审核服务
提供文本和图片内容审核功能,集成阿里云内容安全增强版服务 API。
使用官方 Python SDK。
"""
from typing import Optional
import uuid
import json
import asyncio
from alibabacloud_green20220302.client import Client as GreenClient
from alibabacloud_green20220302 import models as green_models
from alibabacloud_tea_openapi.models import Config as OpenApiConfig
from logger.logging import get_logger
from models.moderation import ModerationResult, ModerationDecision, ModerationLabel
from core.exceptions import ModerationError
logger = get_logger(__name__)
class ModerationService:
"""
阿里云内容审核服务类(增强版 SDK
使用阿里云官方 Python SDK 进行内容审核。
支持异步调用和优雅降级。
"""
def __init__(
self,
access_key_id: str,
access_key_secret: str,
region: str = "cn-shanghai",
timeout: float = 10.0,
service_type: str = "comment_detection_pro",
image_service_type: str = "baselineCheck"
):
"""
初始化审核服务(增强版 SDK
Args:
access_key_id: 阿里云 AccessKey ID
access_key_secret: 阿里云 AccessKey Secret
region: 服务区域(默认: cn-shanghai
timeout: 请求超时时间(秒,默认: 10.0
service_type: 文本审核服务类型(默认: comment_detection_pro
image_service_type: 图片审核服务类型(默认: baselineCheck
"""
self.access_key_id = access_key_id
self.access_key_secret = access_key_secret
self.region = region
self.timeout = timeout
self.service_type = service_type
self.image_service_type = image_service_type
# 构建 API 端点
endpoint = f"green-cip.{region}.aliyuncs.com"
# 创建 SDK 配置
config = OpenApiConfig(
access_key_id=access_key_id,
access_key_secret=access_key_secret,
region_id=region,
endpoint=endpoint,
# 连接超时时间(毫秒)
connect_timeout=int(timeout * 1000),
# 读取超时时间(毫秒)
read_timeout=int(timeout * 1000)
)
# 创建客户端
self.client = GreenClient(config)
logger.info(
f"审核服务初始化成功(增强版 SDK- 区域: {region}, 端点: {endpoint}, "
f"文本服务类型: {service_type}, 图片服务类型: {image_service_type}, 超时: {timeout}"
)
async def close(self):
"""关闭客户端连接"""
# SDK 客户端不需要显式关闭
logger.info("审核服务客户端已关闭")
async def __aenter__(self):
"""异步上下文管理器入口"""
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""异步上下文管理器退出"""
await self.close()
async def moderate_text(
self,
text: str,
request_id: Optional[str] = None
) -> ModerationResult:
"""
审核文本内容(公共接口)
使用阿里云官方 SDK 进行文本审核。
Args:
text: 待审核的文本内容
request_id: 可选的请求标识符
Returns:
ModerationResult: 审核结果对象
Raises:
ModerationError: 当审核过程中发生严重错误时抛出
"""
# 生成唯一请求 ID
if not request_id:
request_id = str(uuid.uuid4())
logger.info(
f"开始审核文本 - request_id: {request_id}, "
f"文本长度: {len(text)} 字符"
)
try:
# 构建服务参数
service_parameters = {
'content': text,
'dataId': request_id
}
# 创建请求对象
request = green_models.TextModerationPlusRequest(
service=self.service_type,
service_parameters=json.dumps(service_parameters)
)
# 调用 SDK注意SDK 是同步的,但我们在异步函数中调用)
response = self.client.text_moderation_plus(request)
# 检查 HTTP 状态码
if response.status_code != 200:
logger.error(
f"审核请求失败 - HTTP {response.status_code}, "
f"request_id: {request_id}"
)
return self._create_degraded_result(request_id, f"http_{response.status_code}")
# 解析响应
result = self._parse_response(response.body, request_id)
logger.info(
f"审核完成 - request_id: {request_id}, "
f"decision: {result.decision.value}, "
f"labels: {[label.label for label in result.labels]}"
)
return result
except Exception as e:
# 所有错误都应用降级策略
logger.error(
f"审核服务错误(降级模式)- request_id: {request_id}, "
f"错误类型: {type(e).__name__}, "
f"错误: {str(e)}"
)
return self._create_degraded_result(request_id, "sdk_error")
async def moderate_image(
self,
image_source: str,
source_type: str = "url",
request_id: Optional[str] = None
) -> ModerationResult:
"""
审核图片内容
Args:
image_source: 图片来源
- source_type="url": 公网可访问的图片 URL
- source_type="oss": OSS 对象名称格式bucket_name/object_name
- source_type="local": 本地文件路径(将上传到临时 OSS
source_type: 来源类型可选值url、oss、local
request_id: 可选的请求标识符
Returns:
ModerationResult: 审核结果对象
Raises:
ModerationError: 当审核过程中发生严重错误时抛出(如认证失败)
"""
# 生成唯一请求 ID
if not request_id:
request_id = str(uuid.uuid4())
logger.info(
f"开始审核图片 - request_id: {request_id}, "
f"来源类型: {source_type}, 来源: {image_source[:100]}"
)
# 检查客户端是否已初始化
if self.client is None:
logger.error(f"图片审核客户端未初始化 - request_id: {request_id}")
return self._create_degraded_result(request_id, "client_not_initialized")
logger.debug(
f"图片审核客户端状态 - request_id: {request_id}, "
f"client 类型: {type(self.client).__name__}, "
f"image_service_type: {self.image_service_type}"
)
try:
# 构建服务参数
service_parameters = {
'dataId': request_id
}
# 根据来源类型设置参数
if source_type == "url":
service_parameters['imageUrl'] = image_source
elif source_type == "oss":
# OSS 格式bucket_name/object_name
# 需要拆分为 ossBucketName 和 ossObjectName
parts = image_source.split('/', 1)
if len(parts) != 2:
raise ModerationError(
f"无效的 OSS 对象名称格式: {image_source}"
f"应为 'bucket_name/object_name'"
)
service_parameters['ossBucketName'] = parts[0]
service_parameters['ossObjectName'] = parts[1]
elif source_type == "local":
# 本地文件暂不支持(需要先上传到 OSS
raise ModerationError(
"暂不支持本地文件审核,请先上传到 OSS 或使用公网 URL"
)
else:
raise ModerationError(f"不支持的来源类型: {source_type}")
logger.debug(
f"图片审核参数 - request_id: {request_id}, "
f"service_parameters: {service_parameters}"
)
# 创建请求对象
try:
request = green_models.ImageModerationRequest(
service=self.image_service_type,
service_parameters=json.dumps(service_parameters)
)
logger.debug(
f"图片审核请求对象创建成功 - request_id: {request_id}, "
f"service: {self.image_service_type}"
)
except Exception as e:
logger.error(
f"创建图片审核请求对象失败 - request_id: {request_id}, "
f"错误类型: {type(e).__name__}, 错误: {str(e)}"
)
raise
# 调用 SDK同步调用但在异步函数中
# 使用 asyncio.to_thread 避免阻塞事件循环
# 注意:必须传递 RuntimeOptions 对象,不能传 None
from alibabacloud_tea_util import models as util_models
runtime = util_models.RuntimeOptions()
response = await asyncio.to_thread(
self.client.image_moderation_with_options,
request,
runtime # 传递 RuntimeOptions 对象而不是 None
)
# 调试日志:记录响应对象的详细信息
logger.debug(
f"图片审核 SDK 响应 - request_id: {request_id}, "
f"response 类型: {type(response).__name__}, "
f"status_code: {getattr(response, 'status_code', 'N/A')}"
)
# 检查响应对象是否为 None
if response is None:
logger.error(f"图片审核 SDK 返回 None - request_id: {request_id}")
return self._create_degraded_result(request_id, "sdk_response_none")
# 检查 HTTP 状态码
status_code = getattr(response, 'status_code', None)
if status_code is None:
logger.error(f"图片审核响应缺少 status_code - request_id: {request_id}")
return self._create_degraded_result(request_id, "missing_status_code")
if status_code != 200:
# 判断是否应该降级
if self._should_degrade(None, response.status_code):
logger.warning(
f"图片审核 HTTP 错误(降级)- HTTP {response.status_code}, "
f"request_id: {request_id}"
)
return self._create_degraded_result(
request_id,
f"http_{response.status_code}"
)
else:
# 认证错误等不降级
raise ModerationError(
f"图片审核请求失败 - HTTP {response.status_code}"
)
# 解析响应
result = self._parse_image_response(response.body, request_id)
logger.info(
f"图片审核完成 - request_id: {request_id}, "
f"decision: {result.decision.value}, "
f"labels: {[label.label for label in result.labels]}"
)
return result
except ModerationError:
# 认证错误等严重错误,不降级
raise
except (asyncio.TimeoutError, TimeoutError) as e:
# 超时错误,降级
logger.warning(
f"图片审核超时(降级)- request_id: {request_id}, 错误: {str(e)}"
)
return self._create_degraded_result(request_id, "timeout")
except (ConnectionError, OSError) as e:
# 网络错误,降级
logger.warning(
f"图片审核网络错误(降级)- request_id: {request_id}, 错误: {str(e)}"
)
return self._create_degraded_result(request_id, "network_error")
except Exception as e:
# 其他未知错误,降级
logger.error(
f"图片审核未知错误(降级)- request_id: {request_id}, "
f"错误类型: {type(e).__name__}, 错误: {str(e)}"
)
return self._create_degraded_result(request_id, "unknown_error")
def _parse_response(self, body, request_id: str) -> ModerationResult:
"""
解析阿里云内容审核增强版 SDK 响应
Args:
body: SDK 响应 body 对象
request_id: 请求标识符
Returns:
ModerationResult: 解析后的审核结果
Raises:
ModerationError: 响应格式错误或包含错误码
"""
try:
# 检查响应码
if body.code != 200:
error_msg = body.message or "Unknown error"
logger.error(
f"增强版 API 返回错误 - Code: {body.code}, Message: {error_msg}"
)
raise ModerationError(
f"阿里云增强版 API 返回错误: {error_msg} (Code: {body.code})"
)
# 提取 Data 对象
data = body.data
if not data:
raise ModerationError("增强版 API 响应缺少 Data 字段")
# 提取风险等级
risk_level = (data.risk_level or "").lower()
# 映射风险等级到决策
decision = self._map_risk_level(risk_level)
# 提取违规标签
labels = []
result_list = data.result or []
for item in result_list:
label_name = item.label or ""
confidence = item.confidence or 0.0
risk_words = item.risk_words or ""
description = item.description or ""
if label_name:
labels.append(
ModerationLabel(
label=label_name,
score=float(confidence)
)
)
# 记录详细信息到日志
if risk_words:
logger.warning(
f"命中违规内容 - request_id: {request_id}, "
f"标签: {label_name}, 置信度: {confidence}, "
f"违规词: {risk_words}, 描述: {description}"
)
# 如果没有违规标签,添加 normal 标签
if not labels:
labels.append(
ModerationLabel(
label="normal",
score=100.0
)
)
# 构建用户友好的消息
message = None
if decision == ModerationDecision.BLOCK:
message = "您的消息包含不当内容,无法处理。"
# 构建结果对象
result = ModerationResult(
decision=decision,
labels=labels,
request_id=request_id,
message=message
)
logger.info(
f"解析增强版审核结果 - request_id: {request_id}, "
f"RiskLevel: {risk_level}, decision: {decision.value}, "
f"labels: {[label.label for label in labels]}"
)
return result
except AttributeError as e:
logger.error(f"增强版响应解析错误 - 缺少必需字段: {str(e)}")
raise ModerationError(
f"增强版 API 响应格式错误: 缺少字段 {str(e)}",
original_error=e
)
except (ValueError, TypeError) as e:
logger.error(f"增强版响应解析错误 - 数据类型错误: {str(e)}")
raise ModerationError(
f"增强版 API 响应数据格式错误: {str(e)}",
original_error=e
)
except Exception as e:
logger.error(f"增强版响应解析未知错误: {str(e)}")
raise ModerationError(
f"解析增强版 API 响应时发生错误: {str(e)}",
original_error=e
)
def _parse_image_response(self, body, request_id: str) -> ModerationResult:
"""
解析阿里云图片审核增强版 SDK 响应
Args:
body: SDK 响应 body 对象
request_id: 请求标识符
Returns:
ModerationResult: 解析后的审核结果
Raises:
ModerationError: 响应格式错误或包含错误码
"""
try:
# 检查 body 是否为 None
if body is None:
logger.error(f"图片审核响应 body 为 None - request_id: {request_id}")
raise ModerationError("图片审核增强版 API 响应 body 为空")
# 检查响应码
code = getattr(body, 'code', None)
if code is None:
logger.error(f"图片审核响应缺少 code 字段 - request_id: {request_id}")
raise ModerationError("图片审核增强版 API 响应缺少 code 字段")
if code != 200:
error_msg = getattr(body, 'msg', None) or "Unknown error"
logger.error(
f"图片审核增强版 API 返回错误 - Code: {code}, Message: {error_msg}"
)
raise ModerationError(
f"阿里云图片审核增强版 API 返回错误: {error_msg} (Code: {code})"
)
# 提取 Data 对象
data = getattr(body, 'data', None)
if not data:
logger.error(f"图片审核响应缺少 data 字段 - request_id: {request_id}")
raise ModerationError("图片审核增强版 API 响应缺少 Data 字段")
# 提取风险等级(图片审核使用 RiskLevel 字段)
risk_level = (getattr(data, 'risk_level', None) or "").lower()
# 映射风险等级到决策(图片审核使用保守策略)
decision = self._map_risk_level_to_decision(risk_level)
# 提取违规标签
labels = []
result_list = getattr(data, 'result', None) or []
for item in result_list:
label_name = getattr(item, 'label', None) or ""
confidence = getattr(item, 'confidence', None) or 0.0
if label_name:
labels.append(
ModerationLabel(
label=label_name,
score=float(confidence)
)
)
# 记录详细信息到日志
logger.info(
f"图片审核标签 - request_id: {request_id}, "
f"标签: {label_name}, 置信度: {confidence}"
)
# 如果没有违规标签,添加 normal 标签
if not labels:
labels.append(
ModerationLabel(
label="normal",
score=100.0
)
)
# 构建用户友好的消息
message = None
if decision == ModerationDecision.BLOCK:
message = "图片包含不当内容,无法上传。"
# 构建结果对象
result = ModerationResult(
decision=decision,
labels=labels,
request_id=request_id,
message=message
)
logger.info(
f"解析图片审核结果 - request_id: {request_id}, "
f"RiskLevel: {risk_level}, decision: {decision.value}, "
f"labels: {[label.label for label in labels]}"
)
return result
except AttributeError as e:
logger.error(
f"图片审核响应解析错误 - 缺少必需字段: {str(e)}, "
f"request_id: {request_id}"
)
raise ModerationError(
f"图片审核增强版 API 响应格式错误: 缺少字段 {str(e)}",
original_error=e
)
except (ValueError, TypeError) as e:
logger.error(
f"图片审核响应解析错误 - 数据类型错误: {str(e)}, "
f"request_id: {request_id}"
)
raise ModerationError(
f"图片审核增强版 API 响应数据格式错误: {str(e)}",
original_error=e
)
except Exception as e:
logger.error(
f"图片审核响应解析未知错误: {str(e)}, "
f"request_id: {request_id}"
)
raise ModerationError(
f"解析图片审核增强版 API 响应时发生错误: {str(e)}",
original_error=e
)
def _map_risk_level(self, risk_level: str) -> ModerationDecision:
"""
将增强版 API 的风险等级映射到审核决策
Args:
risk_level: 风险等级字符串high/medium/low/none
Returns:
ModerationDecision: 审核决策枚举
"""
risk_level = risk_level.lower()
if risk_level == "high":
return ModerationDecision.BLOCK
elif risk_level == "medium":
return ModerationDecision.REVIEW
elif risk_level in ["low", "none"]:
return ModerationDecision.PASS
else:
logger.warning(f"未知的风险等级: {risk_level}, 默认为 REVIEW")
return ModerationDecision.REVIEW
def _map_risk_level_to_decision(self, risk_level: str) -> ModerationDecision:
"""
将风险等级映射到审核决策(图片审核使用保守策略)
Args:
risk_level: 风险等级字符串high/medium/low/none
Returns:
ModerationDecision: 审核决策枚举
- high -> BLOCK
- medium -> BLOCK保守策略
- low/none -> PASS
"""
risk_level = risk_level.lower()
if risk_level == "high":
return ModerationDecision.BLOCK
elif risk_level == "medium":
# 图片审核使用保守策略medium 也拒绝
return ModerationDecision.BLOCK
elif risk_level in ["low", "none"]:
return ModerationDecision.PASS
else:
logger.warning(f"未知的风险等级: {risk_level}, 默认为 REVIEW")
return ModerationDecision.REVIEW
def _should_degrade(
self,
error: Optional[Exception] = None,
status_code: Optional[int] = None
) -> bool:
"""
判断是否应该采用降级策略
Args:
error: 异常对象(可选)
status_code: HTTP 状态码(可选)
Returns:
bool: True 表示应该降级允许上传False 表示应该抛出异常
降级规则:
- 超时错误 -> 降级
- 网络错误 -> 降级
- 5xx 服务器错误 -> 降级
- 401/403 认证错误 -> 不降级
- 4xx 其他客户端错误 -> 不降级
"""
# 检查 HTTP 状态码
if status_code:
if status_code in [401, 403]:
# 认证错误,不降级
logger.error(f"认证错误 - HTTP {status_code},不采用降级策略")
return False
elif 500 <= status_code < 600:
# 服务器错误,降级
logger.warning(f"服务器错误 - HTTP {status_code},采用降级策略")
return True
elif 400 <= status_code < 500:
# 其他客户端错误,不降级
logger.error(f"客户端错误 - HTTP {status_code},不采用降级策略")
return False
# 检查异常类型
if error:
if isinstance(error, (asyncio.TimeoutError, TimeoutError)):
# 超时错误,降级
logger.warning(f"超时错误,采用降级策略: {str(error)}")
return True
elif isinstance(error, (ConnectionError, OSError)):
# 网络错误,降级
logger.warning(f"网络错误,采用降级策略: {str(error)}")
return True
# 默认不降级
return False
def _create_degraded_result(self, request_id: str, reason: str) -> ModerationResult:
"""
创建降级模式的审核结果
Args:
request_id: 请求标识符
reason: 降级原因
Returns:
ModerationResult: 降级结果decision 为 PASS
"""
logger.warning(
f"应用降级策略 - request_id: {request_id}, "
f"原因: {reason}, "
f"决策: 允许通过PASS"
)
return ModerationResult(
decision=ModerationDecision.PASS,
labels=[
ModerationLabel(
label="degraded",
score=0.0
)
],
request_id=request_id,
message=None
)
class NoOpModerationService:
"""
空操作审核服务(占位符实现)
当审核功能被禁用时使用此服务。
"""
def __init__(self):
"""初始化空操作审核服务"""
logger.info("审核服务已禁用 - 使用 NoOpModerationService")
async def moderate_text(
self,
text: str,
request_id: Optional[str] = None
) -> ModerationResult:
"""
空操作审核方法 - 始终返回 PASS 决策
Args:
text: 待审核的文本内容
request_id: 可选的请求标识符
Returns:
ModerationResult: 始终返回 PASS 决策的审核结果
"""
if not request_id:
request_id = str(uuid.uuid4())
logger.debug(
f"NoOp 审核 - request_id: {request_id}, "
f"文本长度: {len(text)} 字符, "
f"决策: PASS审核已禁用"
)
return ModerationResult(
decision=ModerationDecision.PASS,
labels=[
ModerationLabel(
label="noop",
score=100.0
)
],
request_id=request_id,
message=None
)
async def moderate_image(
self,
image_source: str,
source_type: str = "url",
request_id: Optional[str] = None
) -> ModerationResult:
"""
空操作图片审核方法 - 始终返回 PASS 决策
Args:
image_source: 图片来源
source_type: 来源类型
request_id: 可选的请求标识符
Returns:
ModerationResult: 始终返回 PASS 决策的审核结果
"""
if not request_id:
request_id = str(uuid.uuid4())
logger.debug(
f"NoOp 图片审核 - request_id: {request_id}, "
f"来源类型: {source_type}, "
f"决策: PASS审核已禁用"
)
return ModerationResult(
decision=ModerationDecision.PASS,
labels=[
ModerationLabel(
label="noop",
score=100.0
)
],
request_id=request_id,
message=None
)
async def close(self):
"""关闭服务(空操作)"""
pass
async def __aenter__(self):
"""异步上下文管理器入口"""
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""异步上下文管理器退出"""
pass