huoyan-enterprise/backend/services/captcha_service.py

358 lines
12 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
图形验证码服务模块
提供图形验证码生成和验证功能。
"""
import base64
import io
import random
import string
import uuid
from pathlib import Path
from typing import Optional
from PIL import Image, ImageDraw, ImageFont, ImageFilter
from core.redis import RedisService
from logger.logging import get_logger
logger = get_logger(__name__)
# 字体文件路径
FONT_DIR = Path(__file__).parent / "fonts"
BUILTIN_FONT_PATH = FONT_DIR / "DejaVuSans-Bold.ttf"
# 验证码配置
CAPTCHA_LENGTH = 4 # 验证码长度
CAPTCHA_EXPIRE = 300 # 验证码有效期(秒)- 5分钟
CAPTCHA_RATE_LIMIT = 10 # IP 请求频率限制(次/分钟)
CAPTCHA_RATE_WINDOW = 60 # 频率限制时间窗口(秒)
CAPTCHA_FAIL_LIMIT = 3 # 验证失败次数限制
CAPTCHA_FAIL_WINDOW = 600 # 失败次数统计窗口(秒)
CAPTCHA_BAN_DURATION = 600 # IP 封禁时长(秒)- 10分钟
# 图片配置
IMAGE_WIDTH = 120
IMAGE_HEIGHT = 50
FONT_SIZE = 32 # 字体大小(调整为 32px占图片高度约 64%,满足需求 3.1
CHAR_SPACING = 26 # 字符间距(略微增加以确保字符不重叠,满足需求 3.3
class CaptchaService:
"""图形验证码服务类"""
@staticmethod
def _load_font(size: int) -> ImageFont.FreeTypeFont:
"""
加载字体文件
优先级:
1. 项目内置字体
2. 系统字体
3. 抛出异常(不再使用默认字体)
Args:
size: 字体大小
Returns:
ImageFont.FreeTypeFont: 字体对象
Raises:
RuntimeError: 所有字体加载失败
"""
attempted_paths = []
# 1. 尝试加载项目内置字体
if BUILTIN_FONT_PATH.exists():
try:
font = ImageFont.truetype(str(BUILTIN_FONT_PATH), size)
logger.info(f"使用内置字体: {BUILTIN_FONT_PATH}")
return font
except Exception as e:
logger.warning(f"加载内置字体失败: {e}")
attempted_paths.append(str(BUILTIN_FONT_PATH))
else:
logger.warning(f"内置字体文件不存在: {BUILTIN_FONT_PATH}")
attempted_paths.append(str(BUILTIN_FONT_PATH))
# 2. 尝试系统字体
system_font_paths = [
'/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf', # Linux
'/System/Library/Fonts/Helvetica.ttc', # macOS
'C:\\Windows\\Fonts\\arial.ttf', # Windows
]
for font_path in system_font_paths:
try:
font = ImageFont.truetype(font_path, size)
logger.info(f"使用系统字体: {font_path}")
return font
except Exception:
attempted_paths.append(font_path)
continue
# 3. 所有字体加载失败,抛出异常
error_msg = (
f"无法加载任何字体文件。请确保项目包含字体文件或系统已安装字体。"
f"尝试的路径:{', '.join(attempted_paths)}"
)
logger.error(error_msg)
raise RuntimeError(error_msg)
@staticmethod
def _generate_code(length: int = CAPTCHA_LENGTH) -> str:
"""
生成随机验证码字符串
Args:
length: 验证码长度
Returns:
str: 随机验证码
"""
# 只使用数字,避免字母混淆(如 0 和 O
return ''.join(random.choices(string.digits, k=length))
@staticmethod
def _create_image(code: str) -> bytes:
"""
使用 Pillow 生成验证码图片
Args:
code: 验证码字符串
Returns:
bytes: PNG 格式的图片数据
Raises:
RuntimeError: 字体加载失败
Exception: 图片生成失败
"""
try:
# 创建图片
image = Image.new('RGB', (IMAGE_WIDTH, IMAGE_HEIGHT), color='white')
draw = ImageDraw.Draw(image)
# 加载字体(可能抛出 RuntimeError
font = CaptchaService._load_font(FONT_SIZE)
# 绘制干扰线
for _ in range(3):
x1 = random.randint(0, IMAGE_WIDTH)
y1 = random.randint(0, IMAGE_HEIGHT)
x2 = random.randint(0, IMAGE_WIDTH)
y2 = random.randint(0, IMAGE_HEIGHT)
draw.line([(x1, y1), (x2, y2)], fill=(200, 200, 200), width=1)
# 绘制验证码字符
x_start = 10
for i, char in enumerate(code):
# 随机颜色(深色)
color = (
random.randint(0, 100),
random.randint(0, 100),
random.randint(0, 100)
)
# 随机位置偏移(优化垂直居中)
x = x_start + i * CHAR_SPACING + random.randint(-3, 3)
y = random.randint(8, 12) # 调整 y 范围使字符更好地垂直居中
# 绘制字符
draw.text((x, y), char, font=font, fill=color)
# 添加噪点
for _ in range(50):
x = random.randint(0, IMAGE_WIDTH - 1)
y = random.randint(0, IMAGE_HEIGHT - 1)
draw.point((x, y), fill=(random.randint(0, 255), random.randint(0, 255), random.randint(0, 255)))
# 轻微模糊
image = image.filter(ImageFilter.SMOOTH)
# 转换为 PNG 字节流
buffer = io.BytesIO()
image.save(buffer, format='PNG')
return buffer.getvalue()
except RuntimeError:
# 字体加载失败,直接向上抛出
raise
except Exception as e:
# 图片生成过程中的其他错误
logger.error(f"验证码图片生成失败: {e}")
raise
@staticmethod
def _get_captcha_key(captcha_id: str) -> str:
"""获取验证码存储键"""
return f"captcha:{captcha_id}"
@staticmethod
def _get_rate_limit_key(ip: str) -> str:
"""获取 IP 限流存储键"""
return f"captcha:rate:{ip}"
@staticmethod
def _get_fail_count_key(ip: str) -> str:
"""获取失败次数存储键"""
return f"captcha:fail:{ip}"
@staticmethod
def _get_ban_key(ip: str) -> str:
"""获取 IP 封禁存储键"""
return f"captcha:ban:{ip}"
@classmethod
async def check_rate_limit(cls, ip: str) -> bool:
"""
检查 IP 请求频率限制
Args:
ip: 客户端 IP 地址
Returns:
bool: True 表示超过限制False 表示未超过
"""
rate_key = cls._get_rate_limit_key(ip)
# 获取当前请求次数
count = await RedisService.get(rate_key)
if count is None:
# 首次请求,设置计数为 1
await RedisService.set(rate_key, "1", CAPTCHA_RATE_WINDOW)
return False
count = int(count)
if count >= CAPTCHA_RATE_LIMIT:
return True
# 增加计数
await RedisService.incr(rate_key)
return False
@classmethod
async def check_ban(cls, ip: str) -> bool:
"""
检查 IP 是否被封禁
Args:
ip: 客户端 IP 地址
Returns:
bool: True 表示已封禁False 表示未封禁
"""
ban_key = cls._get_ban_key(ip)
return await RedisService.exists(ban_key)
@classmethod
async def record_fail(cls, ip: str) -> None:
"""
记录验证失败次数
Args:
ip: 客户端 IP 地址
"""
fail_key = cls._get_fail_count_key(ip)
# 获取当前失败次数
count = await RedisService.get(fail_key)
if count is None:
# 首次失败
await RedisService.set(fail_key, "1", CAPTCHA_FAIL_WINDOW)
else:
count = int(count)
count += 1
await RedisService.set(fail_key, str(count), CAPTCHA_FAIL_WINDOW)
# 如果失败次数超过限制,封禁 IP
if count >= CAPTCHA_FAIL_LIMIT:
ban_key = cls._get_ban_key(ip)
await RedisService.set(ban_key, "1", CAPTCHA_BAN_DURATION)
logger.warning(f"IP {ip} 因验证失败次数过多被封禁")
@classmethod
async def generate_captcha(cls, ip: str) -> dict:
"""
生成图形验证码
Args:
ip: 客户端 IP 地址
Returns:
dict: {
"captcha_id": str, # UUID
"image": str, # Base64 编码的图片data URL 格式)
"expires_in": int # 过期时间(秒)
}
Raises:
RuntimeError: 字体加载失败
Exception: 其他生成失败情况
"""
try:
# 生成验证码
code = cls._generate_code()
captcha_id = str(uuid.uuid4())
# 生成图片(可能抛出 RuntimeError
image_bytes = cls._create_image(code)
# Base64 编码
image_base64 = base64.b64encode(image_bytes).decode('utf-8')
image_data_url = f"data:image/png;base64,{image_base64}"
# 存储到 Redis
captcha_key = cls._get_captcha_key(captcha_id)
await RedisService.set(captcha_key, code, CAPTCHA_EXPIRE)
logger.info(f"生成验证码成功: captcha_id={captcha_id}, ip={ip}")
return {
"captcha_id": captcha_id,
"image": image_data_url,
"expires_in": CAPTCHA_EXPIRE
}
except RuntimeError as e:
# 字体加载失败,直接向上抛出
logger.error(f"验证码字体加载失败 [IP: {ip}]: {e}")
raise
except Exception as e:
# 其他错误(如 Redis 连接失败、图片编码失败等)
logger.exception(f"生成验证码失败 [IP: {ip}]: {e}")
raise
@classmethod
async def verify_captcha(cls, captcha_id: str, code: str) -> bool:
"""
验证图形验证码
Args:
captcha_id: 验证码 ID
code: 用户输入的验证码
Returns:
bool: 验证是否成功
"""
captcha_key = cls._get_captcha_key(captcha_id)
# 从 Redis 获取验证码
stored_code = await RedisService.get(captcha_key)
if stored_code is None:
logger.warning(f"验证码不存在或已过期: captcha_id={captcha_id}")
return False
# 不区分大小写比对(虽然当前只有数字,但为未来扩展做准备)
if stored_code.lower() != code.lower():
logger.warning(f"验证码错误: captcha_id={captcha_id}")
return False
# 验证成功,删除验证码(一次性使用)
await RedisService.delete(captcha_key)
logger.info(f"验证码验证成功: captcha_id={captcha_id}")
return True