358 lines
12 KiB
Python
358 lines
12 KiB
Python
"""
|
||
图形验证码服务模块
|
||
|
||
提供图形验证码生成和验证功能。
|
||
"""
|
||
import base64
|
||
import io
|
||
import random
|
||
import string
|
||
import uuid
|
||
from pathlib import Path
|
||
from typing import Optional
|
||
|
||
from PIL import Image, ImageDraw, ImageFont, ImageFilter
|
||
|
||
from core.redis import RedisService
|
||
from logger.logging import get_logger
|
||
|
||
logger = get_logger(__name__)
|
||
|
||
# 字体文件路径
|
||
FONT_DIR = Path(__file__).parent / "fonts"
|
||
BUILTIN_FONT_PATH = FONT_DIR / "DejaVuSans-Bold.ttf"
|
||
|
||
# 验证码配置
|
||
CAPTCHA_LENGTH = 4 # 验证码长度
|
||
CAPTCHA_EXPIRE = 300 # 验证码有效期(秒)- 5分钟
|
||
CAPTCHA_RATE_LIMIT = 10 # IP 请求频率限制(次/分钟)
|
||
CAPTCHA_RATE_WINDOW = 60 # 频率限制时间窗口(秒)
|
||
CAPTCHA_FAIL_LIMIT = 3 # 验证失败次数限制
|
||
CAPTCHA_FAIL_WINDOW = 600 # 失败次数统计窗口(秒)
|
||
CAPTCHA_BAN_DURATION = 600 # IP 封禁时长(秒)- 10分钟
|
||
|
||
# 图片配置
|
||
IMAGE_WIDTH = 120
|
||
IMAGE_HEIGHT = 50
|
||
FONT_SIZE = 32 # 字体大小(调整为 32px,占图片高度约 64%,满足需求 3.1)
|
||
CHAR_SPACING = 26 # 字符间距(略微增加以确保字符不重叠,满足需求 3.3)
|
||
|
||
|
||
class CaptchaService:
|
||
"""图形验证码服务类"""
|
||
|
||
@staticmethod
|
||
def _load_font(size: int) -> ImageFont.FreeTypeFont:
|
||
"""
|
||
加载字体文件
|
||
|
||
优先级:
|
||
1. 项目内置字体
|
||
2. 系统字体
|
||
3. 抛出异常(不再使用默认字体)
|
||
|
||
Args:
|
||
size: 字体大小
|
||
|
||
Returns:
|
||
ImageFont.FreeTypeFont: 字体对象
|
||
|
||
Raises:
|
||
RuntimeError: 所有字体加载失败
|
||
"""
|
||
attempted_paths = []
|
||
|
||
# 1. 尝试加载项目内置字体
|
||
if BUILTIN_FONT_PATH.exists():
|
||
try:
|
||
font = ImageFont.truetype(str(BUILTIN_FONT_PATH), size)
|
||
logger.info(f"使用内置字体: {BUILTIN_FONT_PATH}")
|
||
return font
|
||
except Exception as e:
|
||
logger.warning(f"加载内置字体失败: {e}")
|
||
attempted_paths.append(str(BUILTIN_FONT_PATH))
|
||
else:
|
||
logger.warning(f"内置字体文件不存在: {BUILTIN_FONT_PATH}")
|
||
attempted_paths.append(str(BUILTIN_FONT_PATH))
|
||
|
||
# 2. 尝试系统字体
|
||
system_font_paths = [
|
||
'/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf', # Linux
|
||
'/System/Library/Fonts/Helvetica.ttc', # macOS
|
||
'C:\\Windows\\Fonts\\arial.ttf', # Windows
|
||
]
|
||
|
||
for font_path in system_font_paths:
|
||
try:
|
||
font = ImageFont.truetype(font_path, size)
|
||
logger.info(f"使用系统字体: {font_path}")
|
||
return font
|
||
except Exception:
|
||
attempted_paths.append(font_path)
|
||
continue
|
||
|
||
# 3. 所有字体加载失败,抛出异常
|
||
error_msg = (
|
||
f"无法加载任何字体文件。请确保项目包含字体文件或系统已安装字体。"
|
||
f"尝试的路径:{', '.join(attempted_paths)}"
|
||
)
|
||
logger.error(error_msg)
|
||
raise RuntimeError(error_msg)
|
||
|
||
@staticmethod
|
||
def _generate_code(length: int = CAPTCHA_LENGTH) -> str:
|
||
"""
|
||
生成随机验证码字符串
|
||
|
||
Args:
|
||
length: 验证码长度
|
||
|
||
Returns:
|
||
str: 随机验证码
|
||
"""
|
||
# 只使用数字,避免字母混淆(如 0 和 O)
|
||
return ''.join(random.choices(string.digits, k=length))
|
||
|
||
@staticmethod
|
||
def _create_image(code: str) -> bytes:
|
||
"""
|
||
使用 Pillow 生成验证码图片
|
||
|
||
Args:
|
||
code: 验证码字符串
|
||
|
||
Returns:
|
||
bytes: PNG 格式的图片数据
|
||
|
||
Raises:
|
||
RuntimeError: 字体加载失败
|
||
Exception: 图片生成失败
|
||
"""
|
||
try:
|
||
# 创建图片
|
||
image = Image.new('RGB', (IMAGE_WIDTH, IMAGE_HEIGHT), color='white')
|
||
draw = ImageDraw.Draw(image)
|
||
|
||
# 加载字体(可能抛出 RuntimeError)
|
||
font = CaptchaService._load_font(FONT_SIZE)
|
||
|
||
# 绘制干扰线
|
||
for _ in range(3):
|
||
x1 = random.randint(0, IMAGE_WIDTH)
|
||
y1 = random.randint(0, IMAGE_HEIGHT)
|
||
x2 = random.randint(0, IMAGE_WIDTH)
|
||
y2 = random.randint(0, IMAGE_HEIGHT)
|
||
draw.line([(x1, y1), (x2, y2)], fill=(200, 200, 200), width=1)
|
||
|
||
# 绘制验证码字符
|
||
x_start = 10
|
||
for i, char in enumerate(code):
|
||
# 随机颜色(深色)
|
||
color = (
|
||
random.randint(0, 100),
|
||
random.randint(0, 100),
|
||
random.randint(0, 100)
|
||
)
|
||
|
||
# 随机位置偏移(优化垂直居中)
|
||
x = x_start + i * CHAR_SPACING + random.randint(-3, 3)
|
||
y = random.randint(8, 12) # 调整 y 范围使字符更好地垂直居中
|
||
|
||
# 绘制字符
|
||
draw.text((x, y), char, font=font, fill=color)
|
||
|
||
# 添加噪点
|
||
for _ in range(50):
|
||
x = random.randint(0, IMAGE_WIDTH - 1)
|
||
y = random.randint(0, IMAGE_HEIGHT - 1)
|
||
draw.point((x, y), fill=(random.randint(0, 255), random.randint(0, 255), random.randint(0, 255)))
|
||
|
||
# 轻微模糊
|
||
image = image.filter(ImageFilter.SMOOTH)
|
||
|
||
# 转换为 PNG 字节流
|
||
buffer = io.BytesIO()
|
||
image.save(buffer, format='PNG')
|
||
return buffer.getvalue()
|
||
except RuntimeError:
|
||
# 字体加载失败,直接向上抛出
|
||
raise
|
||
except Exception as e:
|
||
# 图片生成过程中的其他错误
|
||
logger.error(f"验证码图片生成失败: {e}")
|
||
raise
|
||
|
||
@staticmethod
|
||
def _get_captcha_key(captcha_id: str) -> str:
|
||
"""获取验证码存储键"""
|
||
return f"captcha:{captcha_id}"
|
||
|
||
@staticmethod
|
||
def _get_rate_limit_key(ip: str) -> str:
|
||
"""获取 IP 限流存储键"""
|
||
return f"captcha:rate:{ip}"
|
||
|
||
@staticmethod
|
||
def _get_fail_count_key(ip: str) -> str:
|
||
"""获取失败次数存储键"""
|
||
return f"captcha:fail:{ip}"
|
||
|
||
@staticmethod
|
||
def _get_ban_key(ip: str) -> str:
|
||
"""获取 IP 封禁存储键"""
|
||
return f"captcha:ban:{ip}"
|
||
|
||
@classmethod
|
||
async def check_rate_limit(cls, ip: str) -> bool:
|
||
"""
|
||
检查 IP 请求频率限制
|
||
|
||
Args:
|
||
ip: 客户端 IP 地址
|
||
|
||
Returns:
|
||
bool: True 表示超过限制,False 表示未超过
|
||
"""
|
||
rate_key = cls._get_rate_limit_key(ip)
|
||
|
||
# 获取当前请求次数
|
||
count = await RedisService.get(rate_key)
|
||
|
||
if count is None:
|
||
# 首次请求,设置计数为 1
|
||
await RedisService.set(rate_key, "1", CAPTCHA_RATE_WINDOW)
|
||
return False
|
||
|
||
count = int(count)
|
||
|
||
if count >= CAPTCHA_RATE_LIMIT:
|
||
return True
|
||
|
||
# 增加计数
|
||
await RedisService.incr(rate_key)
|
||
return False
|
||
|
||
@classmethod
|
||
async def check_ban(cls, ip: str) -> bool:
|
||
"""
|
||
检查 IP 是否被封禁
|
||
|
||
Args:
|
||
ip: 客户端 IP 地址
|
||
|
||
Returns:
|
||
bool: True 表示已封禁,False 表示未封禁
|
||
"""
|
||
ban_key = cls._get_ban_key(ip)
|
||
return await RedisService.exists(ban_key)
|
||
|
||
@classmethod
|
||
async def record_fail(cls, ip: str) -> None:
|
||
"""
|
||
记录验证失败次数
|
||
|
||
Args:
|
||
ip: 客户端 IP 地址
|
||
"""
|
||
fail_key = cls._get_fail_count_key(ip)
|
||
|
||
# 获取当前失败次数
|
||
count = await RedisService.get(fail_key)
|
||
|
||
if count is None:
|
||
# 首次失败
|
||
await RedisService.set(fail_key, "1", CAPTCHA_FAIL_WINDOW)
|
||
else:
|
||
count = int(count)
|
||
count += 1
|
||
await RedisService.set(fail_key, str(count), CAPTCHA_FAIL_WINDOW)
|
||
|
||
# 如果失败次数超过限制,封禁 IP
|
||
if count >= CAPTCHA_FAIL_LIMIT:
|
||
ban_key = cls._get_ban_key(ip)
|
||
await RedisService.set(ban_key, "1", CAPTCHA_BAN_DURATION)
|
||
logger.warning(f"IP {ip} 因验证失败次数过多被封禁")
|
||
|
||
@classmethod
|
||
async def generate_captcha(cls, ip: str) -> dict:
|
||
"""
|
||
生成图形验证码
|
||
|
||
Args:
|
||
ip: 客户端 IP 地址
|
||
|
||
Returns:
|
||
dict: {
|
||
"captcha_id": str, # UUID
|
||
"image": str, # Base64 编码的图片(data URL 格式)
|
||
"expires_in": int # 过期时间(秒)
|
||
}
|
||
|
||
Raises:
|
||
RuntimeError: 字体加载失败
|
||
Exception: 其他生成失败情况
|
||
"""
|
||
try:
|
||
# 生成验证码
|
||
code = cls._generate_code()
|
||
captcha_id = str(uuid.uuid4())
|
||
|
||
# 生成图片(可能抛出 RuntimeError)
|
||
image_bytes = cls._create_image(code)
|
||
|
||
# Base64 编码
|
||
image_base64 = base64.b64encode(image_bytes).decode('utf-8')
|
||
image_data_url = f"data:image/png;base64,{image_base64}"
|
||
|
||
# 存储到 Redis
|
||
captcha_key = cls._get_captcha_key(captcha_id)
|
||
await RedisService.set(captcha_key, code, CAPTCHA_EXPIRE)
|
||
|
||
logger.info(f"生成验证码成功: captcha_id={captcha_id}, ip={ip}")
|
||
|
||
return {
|
||
"captcha_id": captcha_id,
|
||
"image": image_data_url,
|
||
"expires_in": CAPTCHA_EXPIRE
|
||
}
|
||
except RuntimeError as e:
|
||
# 字体加载失败,直接向上抛出
|
||
logger.error(f"验证码字体加载失败 [IP: {ip}]: {e}")
|
||
raise
|
||
except Exception as e:
|
||
# 其他错误(如 Redis 连接失败、图片编码失败等)
|
||
logger.exception(f"生成验证码失败 [IP: {ip}]: {e}")
|
||
raise
|
||
|
||
@classmethod
|
||
async def verify_captcha(cls, captcha_id: str, code: str) -> bool:
|
||
"""
|
||
验证图形验证码
|
||
|
||
Args:
|
||
captcha_id: 验证码 ID
|
||
code: 用户输入的验证码
|
||
|
||
Returns:
|
||
bool: 验证是否成功
|
||
"""
|
||
captcha_key = cls._get_captcha_key(captcha_id)
|
||
|
||
# 从 Redis 获取验证码
|
||
stored_code = await RedisService.get(captcha_key)
|
||
|
||
if stored_code is None:
|
||
logger.warning(f"验证码不存在或已过期: captcha_id={captcha_id}")
|
||
return False
|
||
|
||
# 不区分大小写比对(虽然当前只有数字,但为未来扩展做准备)
|
||
if stored_code.lower() != code.lower():
|
||
logger.warning(f"验证码错误: captcha_id={captcha_id}")
|
||
return False
|
||
|
||
# 验证成功,删除验证码(一次性使用)
|
||
await RedisService.delete(captcha_key)
|
||
logger.info(f"验证码验证成功: captcha_id={captcha_id}")
|
||
|
||
return True
|