""" 企业版知识库访问控制:RBAC + ABAC(可见性) 规则汇总 -------- - admin : 全企业范围无限制 - leader : 管辖「本部门及所有子孙部门」内的用户与文件 - employee: 只能操作自己的资源;可查看同部门 department/enterprise 可见知识库及库内全部文件 - 越级 : 不在管辖子树内的操作一律拒绝 """ from typing import List, Literal, Optional import asyncpg from models.graph_metadata import GraphRecord from models.knowledge_base import KnowledgeBase from models.knowledge_base_file import KnowledgeBaseFile from models.user import User UserRole = Literal["admin", "leader", "employee"] KbVisibility = Literal["private", "department", "enterprise"] # ────────────────────────────────────────────── # 部门子树查询(方案 A 核心辅助) # ────────────────────────────────────────────── async def get_managed_dept_ids(conn: asyncpg.Connection, user: User) -> List[int]: """ 返回 user 可管辖的部门 ID 列表。 - admin -> 企业全部部门 - leader -> 本部门及其所有子孙部门(递归 CTE) - employee -> 空列表 """ if user.enterprise_id is None: return [] if user.role == "admin": rows = await conn.fetch( "SELECT id FROM department WHERE enterprise_id = $1", user.enterprise_id, ) return [r["id"] for r in rows] if user.role == "leader" and user.department_id is not None: rows = await conn.fetch( """ WITH RECURSIVE sub AS ( SELECT id FROM department WHERE id = $1 UNION ALL SELECT d.id FROM department d JOIN sub ON d.parent_id = sub.id ) SELECT id FROM sub """, user.department_id, ) return [r["id"] for r in rows] return [] async def is_subordinate( conn: asyncpg.Connection, actor: User, target_user_id: int ) -> bool: """ 判断 target_user_id 是否在 actor 的管辖范围内。 - admin 对同企业全员返回 True - leader 对子树内(非本人)成员返回 True """ if actor.id == target_user_id: return False if actor.enterprise_id is None: return False # 确认被管辖者在同一企业 row = await conn.fetchrow( "SELECT department_id, enterprise_id FROM user_list WHERE id = $1", target_user_id, ) if not row or row["enterprise_id"] != actor.enterprise_id: return False if actor.role == "admin": return True if actor.role == "leader": managed = await get_managed_dept_ids(conn, actor) return row["department_id"] in managed if row["department_id"] else False return False # ────────────────────────────────────────────── # 知识库级权限(同步,无需数据库) # ────────────────────────────────────────────── async def can_view_kb(conn: asyncpg.Connection, user: User, kb: KnowledgeBase) -> bool: """判断用户是否可查看该知识库。leader 权限覆盖本部门及所有子孙部门。""" if user.role == "admin": return True if kb.creator_id is not None and user.id == kb.creator_id: return True if user.role == "leader" and user.department_id is not None and kb.department_id is not None: managed = await get_managed_dept_ids(conn, user) if kb.department_id in managed: return True vis = kb.visibility or "private" if vis == "private": return False if vis == "department": return user.department_id is not None and kb.department_id == user.department_id if vis == "enterprise": return user.enterprise_id is not None and kb.enterprise_id == user.enterprise_id return False def can_manage_kb(user: User, kb: KnowledgeBase) -> bool: """创建者可管理;企业管理员可管理本企业内任意知识库。""" if user.role == "admin" and user.enterprise_id is not None and kb.enterprise_id == user.enterprise_id: return True if kb.creator_id is not None and user.id == kb.creator_id: return True return False # ────────────────────────────────────────────── # 文件级权限(需要异步 DB 查询) # ────────────────────────────────────────────── async def can_delete_file( conn: asyncpg.Connection, user: User, file: KnowledgeBaseFile, kb: Optional[KnowledgeBase] = None, ) -> bool: """ 判断用户是否可删除该文件。 - 文件上传者本人 - admin(同企业) - leader(下级成员上传的文件,基于子树管辖范围) """ # 本人上传 if file.user_id == user.id: return True # admin 可删同企业内任意文件 if user.role == "admin": if kb is not None: return kb.enterprise_id == user.enterprise_id return True # leader 判断上传者是否在子树内 if user.role == "leader": return await is_subordinate(conn, user, file.user_id) return False async def can_upload_to_kb(conn: asyncpg.Connection, user: User, kb: KnowledgeBase) -> bool: """ 判断用户是否可向知识库上传文件。 - 必须能 view KB - allow_kb_upload 未被关闭 - leader 或 admin 也受 allow_kb_upload 约束(管理员初始化时均为 True) """ if not await can_view_kb(conn, user, kb): return False return bool(user.allow_kb_upload) # ────────────────────────────────────────────── # 知识图谱级权限(与知识库规则完全一致) # ────────────────────────────────────────────── async def can_view_graph(conn: asyncpg.Connection, user: User, g: GraphRecord) -> bool: """判断用户是否可查看该知识图谱。leader 权限覆盖本部门及所有子孙部门(与知识库一致)。""" if user.role == "admin": return True if g.creator_id is not None and user.id == g.creator_id: return True if user.role == "leader" and user.department_id is not None and g.department_id is not None: managed = await get_managed_dept_ids(conn, user) if g.department_id in managed: return True vis = g.visibility or "private" if vis == "private": return False if vis == "department": return user.department_id is not None and g.department_id == user.department_id if vis == "enterprise": return user.enterprise_id is not None and g.enterprise_id == user.enterprise_id return False def can_manage_graph(user: User, g: GraphRecord) -> bool: """创建者可删改;企业管理员可管理本企业内任意图谱。""" if user.role == "admin" and user.enterprise_id is not None and g.enterprise_id == user.enterprise_id: return True if g.creator_id is not None and user.id == g.creator_id: return True return False