""" 图形验证码服务模块 提供图形验证码生成和验证功能。 """ import base64 import io import random import string import uuid from pathlib import Path from typing import Optional from PIL import Image, ImageDraw, ImageFont, ImageFilter from core.redis import RedisService from logger.logging import get_logger logger = get_logger(__name__) # 字体文件路径 FONT_DIR = Path(__file__).parent / "fonts" BUILTIN_FONT_PATH = FONT_DIR / "DejaVuSans-Bold.ttf" # 验证码配置 CAPTCHA_LENGTH = 4 # 验证码长度 CAPTCHA_EXPIRE = 300 # 验证码有效期(秒)- 5分钟 CAPTCHA_RATE_LIMIT = 10 # IP 请求频率限制(次/分钟) CAPTCHA_RATE_WINDOW = 60 # 频率限制时间窗口(秒) CAPTCHA_FAIL_LIMIT = 3 # 验证失败次数限制 CAPTCHA_FAIL_WINDOW = 600 # 失败次数统计窗口(秒) CAPTCHA_BAN_DURATION = 600 # IP 封禁时长(秒)- 10分钟 # 图片配置 IMAGE_WIDTH = 120 IMAGE_HEIGHT = 50 FONT_SIZE = 32 # 字体大小(调整为 32px,占图片高度约 64%,满足需求 3.1) CHAR_SPACING = 26 # 字符间距(略微增加以确保字符不重叠,满足需求 3.3) class CaptchaService: """图形验证码服务类""" @staticmethod def _load_font(size: int) -> ImageFont.FreeTypeFont: """ 加载字体文件 优先级: 1. 项目内置字体 2. 系统字体 3. 抛出异常(不再使用默认字体) Args: size: 字体大小 Returns: ImageFont.FreeTypeFont: 字体对象 Raises: RuntimeError: 所有字体加载失败 """ attempted_paths = [] # 1. 尝试加载项目内置字体 if BUILTIN_FONT_PATH.exists(): try: font = ImageFont.truetype(str(BUILTIN_FONT_PATH), size) logger.info(f"使用内置字体: {BUILTIN_FONT_PATH}") return font except Exception as e: logger.warning(f"加载内置字体失败: {e}") attempted_paths.append(str(BUILTIN_FONT_PATH)) else: logger.warning(f"内置字体文件不存在: {BUILTIN_FONT_PATH}") attempted_paths.append(str(BUILTIN_FONT_PATH)) # 2. 尝试系统字体 system_font_paths = [ '/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf', # Linux '/System/Library/Fonts/Helvetica.ttc', # macOS 'C:\\Windows\\Fonts\\arial.ttf', # Windows ] for font_path in system_font_paths: try: font = ImageFont.truetype(font_path, size) logger.info(f"使用系统字体: {font_path}") return font except Exception: attempted_paths.append(font_path) continue # 3. 所有字体加载失败,抛出异常 error_msg = ( f"无法加载任何字体文件。请确保项目包含字体文件或系统已安装字体。" f"尝试的路径:{', '.join(attempted_paths)}" ) logger.error(error_msg) raise RuntimeError(error_msg) @staticmethod def _generate_code(length: int = CAPTCHA_LENGTH) -> str: """ 生成随机验证码字符串 Args: length: 验证码长度 Returns: str: 随机验证码 """ # 只使用数字,避免字母混淆(如 0 和 O) return ''.join(random.choices(string.digits, k=length)) @staticmethod def _create_image(code: str) -> bytes: """ 使用 Pillow 生成验证码图片 Args: code: 验证码字符串 Returns: bytes: PNG 格式的图片数据 Raises: RuntimeError: 字体加载失败 Exception: 图片生成失败 """ try: # 创建图片 image = Image.new('RGB', (IMAGE_WIDTH, IMAGE_HEIGHT), color='white') draw = ImageDraw.Draw(image) # 加载字体(可能抛出 RuntimeError) font = CaptchaService._load_font(FONT_SIZE) # 绘制干扰线 for _ in range(3): x1 = random.randint(0, IMAGE_WIDTH) y1 = random.randint(0, IMAGE_HEIGHT) x2 = random.randint(0, IMAGE_WIDTH) y2 = random.randint(0, IMAGE_HEIGHT) draw.line([(x1, y1), (x2, y2)], fill=(200, 200, 200), width=1) # 绘制验证码字符 x_start = 10 for i, char in enumerate(code): # 随机颜色(深色) color = ( random.randint(0, 100), random.randint(0, 100), random.randint(0, 100) ) # 随机位置偏移(优化垂直居中) x = x_start + i * CHAR_SPACING + random.randint(-3, 3) y = random.randint(8, 12) # 调整 y 范围使字符更好地垂直居中 # 绘制字符 draw.text((x, y), char, font=font, fill=color) # 添加噪点 for _ in range(50): x = random.randint(0, IMAGE_WIDTH - 1) y = random.randint(0, IMAGE_HEIGHT - 1) draw.point((x, y), fill=(random.randint(0, 255), random.randint(0, 255), random.randint(0, 255))) # 轻微模糊 image = image.filter(ImageFilter.SMOOTH) # 转换为 PNG 字节流 buffer = io.BytesIO() image.save(buffer, format='PNG') return buffer.getvalue() except RuntimeError: # 字体加载失败,直接向上抛出 raise except Exception as e: # 图片生成过程中的其他错误 logger.error(f"验证码图片生成失败: {e}") raise @staticmethod def _get_captcha_key(captcha_id: str) -> str: """获取验证码存储键""" return f"captcha:{captcha_id}" @staticmethod def _get_rate_limit_key(ip: str) -> str: """获取 IP 限流存储键""" return f"captcha:rate:{ip}" @staticmethod def _get_fail_count_key(ip: str) -> str: """获取失败次数存储键""" return f"captcha:fail:{ip}" @staticmethod def _get_ban_key(ip: str) -> str: """获取 IP 封禁存储键""" return f"captcha:ban:{ip}" @classmethod async def check_rate_limit(cls, ip: str) -> bool: """ 检查 IP 请求频率限制 Args: ip: 客户端 IP 地址 Returns: bool: True 表示超过限制,False 表示未超过 """ rate_key = cls._get_rate_limit_key(ip) # 获取当前请求次数 count = await RedisService.get(rate_key) if count is None: # 首次请求,设置计数为 1 await RedisService.set(rate_key, "1", CAPTCHA_RATE_WINDOW) return False count = int(count) if count >= CAPTCHA_RATE_LIMIT: return True # 增加计数 await RedisService.incr(rate_key) return False @classmethod async def check_ban(cls, ip: str) -> bool: """ 检查 IP 是否被封禁 Args: ip: 客户端 IP 地址 Returns: bool: True 表示已封禁,False 表示未封禁 """ ban_key = cls._get_ban_key(ip) return await RedisService.exists(ban_key) @classmethod async def record_fail(cls, ip: str) -> None: """ 记录验证失败次数 Args: ip: 客户端 IP 地址 """ fail_key = cls._get_fail_count_key(ip) # 获取当前失败次数 count = await RedisService.get(fail_key) if count is None: # 首次失败 await RedisService.set(fail_key, "1", CAPTCHA_FAIL_WINDOW) else: count = int(count) count += 1 await RedisService.set(fail_key, str(count), CAPTCHA_FAIL_WINDOW) # 如果失败次数超过限制,封禁 IP if count >= CAPTCHA_FAIL_LIMIT: ban_key = cls._get_ban_key(ip) await RedisService.set(ban_key, "1", CAPTCHA_BAN_DURATION) logger.warning(f"IP {ip} 因验证失败次数过多被封禁") @classmethod async def generate_captcha(cls, ip: str) -> dict: """ 生成图形验证码 Args: ip: 客户端 IP 地址 Returns: dict: { "captcha_id": str, # UUID "image": str, # Base64 编码的图片(data URL 格式) "expires_in": int # 过期时间(秒) } Raises: RuntimeError: 字体加载失败 Exception: 其他生成失败情况 """ try: # 生成验证码 code = cls._generate_code() captcha_id = str(uuid.uuid4()) # 生成图片(可能抛出 RuntimeError) image_bytes = cls._create_image(code) # Base64 编码 image_base64 = base64.b64encode(image_bytes).decode('utf-8') image_data_url = f"data:image/png;base64,{image_base64}" # 存储到 Redis captcha_key = cls._get_captcha_key(captcha_id) await RedisService.set(captcha_key, code, CAPTCHA_EXPIRE) logger.info(f"生成验证码成功: captcha_id={captcha_id}, ip={ip}") return { "captcha_id": captcha_id, "image": image_data_url, "expires_in": CAPTCHA_EXPIRE } except RuntimeError as e: # 字体加载失败,直接向上抛出 logger.error(f"验证码字体加载失败 [IP: {ip}]: {e}") raise except Exception as e: # 其他错误(如 Redis 连接失败、图片编码失败等) logger.exception(f"生成验证码失败 [IP: {ip}]: {e}") raise @classmethod async def verify_captcha(cls, captcha_id: str, code: str) -> bool: """ 验证图形验证码 Args: captcha_id: 验证码 ID code: 用户输入的验证码 Returns: bool: 验证是否成功 """ captcha_key = cls._get_captcha_key(captcha_id) # 从 Redis 获取验证码 stored_code = await RedisService.get(captcha_key) if stored_code is None: logger.warning(f"验证码不存在或已过期: captcha_id={captcha_id}") return False # 不区分大小写比对(虽然当前只有数字,但为未来扩展做准备) if stored_code.lower() != code.lower(): logger.warning(f"验证码错误: captcha_id={captcha_id}") return False # 验证成功,删除验证码(一次性使用) await RedisService.delete(captcha_key) logger.info(f"验证码验证成功: captcha_id={captcha_id}") return True