190 lines
7.3 KiB
Python
190 lines
7.3 KiB
Python
"""
|
||
企业版知识库访问控制:RBAC + ABAC(可见性)
|
||
|
||
规则汇总
|
||
--------
|
||
- admin : 全企业范围无限制
|
||
- leader : 管辖「本部门及所有子孙部门」内的用户与文件
|
||
- employee: 只能操作自己的资源;可查看同部门 department/enterprise 可见知识库及库内全部文件
|
||
- 越级 : 不在管辖子树内的操作一律拒绝
|
||
"""
|
||
from typing import List, Literal, Optional
|
||
|
||
import asyncpg
|
||
|
||
from models.graph_metadata import GraphRecord
|
||
from models.knowledge_base import KnowledgeBase
|
||
from models.knowledge_base_file import KnowledgeBaseFile
|
||
from models.user import User
|
||
|
||
UserRole = Literal["admin", "leader", "employee"]
|
||
KbVisibility = Literal["private", "department", "enterprise"]
|
||
|
||
|
||
# ──────────────────────────────────────────────
|
||
# 部门子树查询(方案 A 核心辅助)
|
||
# ──────────────────────────────────────────────
|
||
|
||
async def get_managed_dept_ids(conn: asyncpg.Connection, user: User) -> List[int]:
|
||
"""
|
||
返回 user 可管辖的部门 ID 列表。
|
||
- admin -> 企业全部部门
|
||
- leader -> 本部门及其所有子孙部门(递归 CTE)
|
||
- employee -> 空列表
|
||
"""
|
||
if user.enterprise_id is None:
|
||
return []
|
||
if user.role == "admin":
|
||
rows = await conn.fetch(
|
||
"SELECT id FROM department WHERE enterprise_id = $1",
|
||
user.enterprise_id,
|
||
)
|
||
return [r["id"] for r in rows]
|
||
if user.role == "leader" and user.department_id is not None:
|
||
rows = await conn.fetch(
|
||
"""
|
||
WITH RECURSIVE sub AS (
|
||
SELECT id FROM department WHERE id = $1
|
||
UNION ALL
|
||
SELECT d.id FROM department d JOIN sub ON d.parent_id = sub.id
|
||
)
|
||
SELECT id FROM sub
|
||
""",
|
||
user.department_id,
|
||
)
|
||
return [r["id"] for r in rows]
|
||
return []
|
||
|
||
|
||
async def is_subordinate(
|
||
conn: asyncpg.Connection, actor: User, target_user_id: int
|
||
) -> bool:
|
||
"""
|
||
判断 target_user_id 是否在 actor 的管辖范围内。
|
||
- admin 对同企业全员返回 True
|
||
- leader 对子树内(非本人)成员返回 True
|
||
"""
|
||
if actor.id == target_user_id:
|
||
return False
|
||
if actor.enterprise_id is None:
|
||
return False
|
||
# 确认被管辖者在同一企业
|
||
row = await conn.fetchrow(
|
||
"SELECT department_id, enterprise_id FROM user_list WHERE id = $1",
|
||
target_user_id,
|
||
)
|
||
if not row or row["enterprise_id"] != actor.enterprise_id:
|
||
return False
|
||
if actor.role == "admin":
|
||
return True
|
||
if actor.role == "leader":
|
||
managed = await get_managed_dept_ids(conn, actor)
|
||
return row["department_id"] in managed if row["department_id"] else False
|
||
return False
|
||
|
||
|
||
# ──────────────────────────────────────────────
|
||
# 知识库级权限(同步,无需数据库)
|
||
# ──────────────────────────────────────────────
|
||
|
||
async def can_view_kb(conn: asyncpg.Connection, user: User, kb: KnowledgeBase) -> bool:
|
||
"""判断用户是否可查看该知识库。leader 权限覆盖本部门及所有子孙部门。"""
|
||
if user.role == "admin":
|
||
return True
|
||
if kb.creator_id is not None and user.id == kb.creator_id:
|
||
return True
|
||
if user.role == "leader" and user.department_id is not None and kb.department_id is not None:
|
||
managed = await get_managed_dept_ids(conn, user)
|
||
if kb.department_id in managed:
|
||
return True
|
||
vis = kb.visibility or "private"
|
||
if vis == "private":
|
||
return False
|
||
if vis == "department":
|
||
return user.department_id is not None and kb.department_id == user.department_id
|
||
if vis == "enterprise":
|
||
return user.enterprise_id is not None and kb.enterprise_id == user.enterprise_id
|
||
return False
|
||
|
||
|
||
def can_manage_kb(user: User, kb: KnowledgeBase) -> bool:
|
||
"""创建者可管理;企业管理员可管理本企业内任意知识库。"""
|
||
if user.role == "admin" and user.enterprise_id is not None and kb.enterprise_id == user.enterprise_id:
|
||
return True
|
||
if kb.creator_id is not None and user.id == kb.creator_id:
|
||
return True
|
||
return False
|
||
|
||
|
||
# ──────────────────────────────────────────────
|
||
# 文件级权限(需要异步 DB 查询)
|
||
# ──────────────────────────────────────────────
|
||
|
||
async def can_delete_file(
|
||
conn: asyncpg.Connection,
|
||
user: User,
|
||
file: KnowledgeBaseFile,
|
||
kb: Optional[KnowledgeBase] = None,
|
||
) -> bool:
|
||
"""
|
||
判断用户是否可删除该文件。
|
||
- 文件上传者本人
|
||
- admin(同企业)
|
||
- leader(下级成员上传的文件,基于子树管辖范围)
|
||
"""
|
||
# 本人上传
|
||
if file.user_id == user.id:
|
||
return True
|
||
# admin 可删同企业内任意文件
|
||
if user.role == "admin":
|
||
if kb is not None:
|
||
return kb.enterprise_id == user.enterprise_id
|
||
return True
|
||
# leader 判断上传者是否在子树内
|
||
if user.role == "leader":
|
||
return await is_subordinate(conn, user, file.user_id)
|
||
return False
|
||
|
||
|
||
async def can_upload_to_kb(conn: asyncpg.Connection, user: User, kb: KnowledgeBase) -> bool:
|
||
"""
|
||
判断用户是否可向知识库上传文件。
|
||
- 必须能 view KB
|
||
- allow_kb_upload 未被关闭
|
||
- leader 或 admin 也受 allow_kb_upload 约束(管理员初始化时均为 True)
|
||
"""
|
||
if not await can_view_kb(conn, user, kb):
|
||
return False
|
||
return bool(user.allow_kb_upload)
|
||
|
||
|
||
# ──────────────────────────────────────────────
|
||
# 知识图谱级权限(保持不变)
|
||
# ──────────────────────────────────────────────
|
||
|
||
def can_view_graph(user: User, g: GraphRecord) -> bool:
|
||
"""判断用户是否可查看该知识图谱(规则与知识库一致)。"""
|
||
if user.role == "admin":
|
||
return True
|
||
if g.creator_id is not None and user.id == g.creator_id:
|
||
return True
|
||
if user.role == "leader" and user.department_id is not None and g.department_id == user.department_id:
|
||
return True
|
||
vis = g.visibility or "private"
|
||
if vis == "private":
|
||
return False
|
||
if vis == "department":
|
||
return user.department_id is not None and g.department_id == user.department_id
|
||
if vis == "enterprise":
|
||
return user.enterprise_id is not None and g.enterprise_id == user.enterprise_id
|
||
return False
|
||
|
||
|
||
def can_manage_graph(user: User, g: GraphRecord) -> bool:
|
||
"""创建者可删改;企业管理员可管理本企业内任意图谱。"""
|
||
if user.role == "admin" and user.enterprise_id is not None and g.enterprise_id == user.enterprise_id:
|
||
return True
|
||
if g.creator_id is not None and user.id == g.creator_id:
|
||
return True
|
||
return False
|