85 lines
2.0 KiB
Python
85 lines
2.0 KiB
Python
"""
|
|
Redis 连接管理模块
|
|
|
|
提供 Redis 连接池和基础操作。
|
|
"""
|
|
import redis.asyncio as redis
|
|
from typing import Optional
|
|
|
|
from core.config import settings
|
|
from logger.logging import get_logger
|
|
|
|
logger = get_logger(__name__)
|
|
|
|
# Redis 连接池
|
|
_redis_pool: Optional[redis.Redis] = None
|
|
|
|
|
|
async def get_redis() -> redis.Redis:
|
|
"""获取 Redis 连接"""
|
|
global _redis_pool
|
|
|
|
if _redis_pool is None:
|
|
_redis_pool = redis.Redis(
|
|
host=settings.redis_host,
|
|
port=settings.redis_port,
|
|
password=settings.redis_password or None,
|
|
db=settings.redis_db,
|
|
decode_responses=True,
|
|
)
|
|
logger.info(f"Redis 连接已建立: {settings.redis_host}:{settings.redis_port}")
|
|
|
|
return _redis_pool
|
|
|
|
|
|
async def close_redis():
|
|
"""关闭 Redis 连接"""
|
|
global _redis_pool
|
|
|
|
if _redis_pool is not None:
|
|
await _redis_pool.close()
|
|
_redis_pool = None
|
|
logger.info("Redis 连接已关闭")
|
|
|
|
|
|
class RedisService:
|
|
"""Redis 服务类"""
|
|
|
|
@staticmethod
|
|
async def set(key: str, value: str, expire: int = None) -> bool:
|
|
"""设置键值对"""
|
|
r = await get_redis()
|
|
await r.set(key, value, ex=expire)
|
|
return True
|
|
|
|
@staticmethod
|
|
async def get(key: str) -> Optional[str]:
|
|
"""获取值"""
|
|
r = await get_redis()
|
|
return await r.get(key)
|
|
|
|
@staticmethod
|
|
async def delete(key: str) -> bool:
|
|
"""删除键"""
|
|
r = await get_redis()
|
|
await r.delete(key)
|
|
return True
|
|
|
|
@staticmethod
|
|
async def exists(key: str) -> bool:
|
|
"""检查键是否存在"""
|
|
r = await get_redis()
|
|
return await r.exists(key) > 0
|
|
|
|
@staticmethod
|
|
async def ttl(key: str) -> int:
|
|
"""获取键的剩余过期时间(秒)"""
|
|
r = await get_redis()
|
|
return await r.ttl(key)
|
|
|
|
@staticmethod
|
|
async def incr(key: str) -> int:
|
|
"""递增键的值"""
|
|
r = await get_redis()
|
|
return await r.incr(key)
|