huoyan-enterprise/backend/core/permissions.py

190 lines
7.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
企业版知识库访问控制RBAC + ABAC可见性
规则汇总
--------
- admin : 全企业范围无限制
- leader : 管辖「本部门及所有子孙部门」内的用户与文件
- employee: 只能操作自己的资源;可查看同部门 department/enterprise 可见知识库及库内全部文件
- 越级 : 不在管辖子树内的操作一律拒绝
"""
from typing import List, Literal, Optional
import asyncpg
from models.graph_metadata import GraphRecord
from models.knowledge_base import KnowledgeBase
from models.knowledge_base_file import KnowledgeBaseFile
from models.user import User
UserRole = Literal["admin", "leader", "employee"]
KbVisibility = Literal["private", "department", "enterprise"]
# ──────────────────────────────────────────────
# 部门子树查询(方案 A 核心辅助)
# ──────────────────────────────────────────────
async def get_managed_dept_ids(conn: asyncpg.Connection, user: User) -> List[int]:
"""
返回 user 可管辖的部门 ID 列表。
- admin -> 企业全部部门
- leader -> 本部门及其所有子孙部门(递归 CTE
- employee -> 空列表
"""
if user.enterprise_id is None:
return []
if user.role == "admin":
rows = await conn.fetch(
"SELECT id FROM department WHERE enterprise_id = $1",
user.enterprise_id,
)
return [r["id"] for r in rows]
if user.role == "leader" and user.department_id is not None:
rows = await conn.fetch(
"""
WITH RECURSIVE sub AS (
SELECT id FROM department WHERE id = $1
UNION ALL
SELECT d.id FROM department d JOIN sub ON d.parent_id = sub.id
)
SELECT id FROM sub
""",
user.department_id,
)
return [r["id"] for r in rows]
return []
async def is_subordinate(
conn: asyncpg.Connection, actor: User, target_user_id: int
) -> bool:
"""
判断 target_user_id 是否在 actor 的管辖范围内。
- admin 对同企业全员返回 True
- leader 对子树内(非本人)成员返回 True
"""
if actor.id == target_user_id:
return False
if actor.enterprise_id is None:
return False
# 确认被管辖者在同一企业
row = await conn.fetchrow(
"SELECT department_id, enterprise_id FROM user_list WHERE id = $1",
target_user_id,
)
if not row or row["enterprise_id"] != actor.enterprise_id:
return False
if actor.role == "admin":
return True
if actor.role == "leader":
managed = await get_managed_dept_ids(conn, actor)
return row["department_id"] in managed if row["department_id"] else False
return False
# ──────────────────────────────────────────────
# 知识库级权限(同步,无需数据库)
# ──────────────────────────────────────────────
async def can_view_kb(conn: asyncpg.Connection, user: User, kb: KnowledgeBase) -> bool:
"""判断用户是否可查看该知识库。leader 权限覆盖本部门及所有子孙部门。"""
if user.role == "admin":
return True
if kb.creator_id is not None and user.id == kb.creator_id:
return True
if user.role == "leader" and user.department_id is not None and kb.department_id is not None:
managed = await get_managed_dept_ids(conn, user)
if kb.department_id in managed:
return True
vis = kb.visibility or "private"
if vis == "private":
return False
if vis == "department":
return user.department_id is not None and kb.department_id == user.department_id
if vis == "enterprise":
return user.enterprise_id is not None and kb.enterprise_id == user.enterprise_id
return False
def can_manage_kb(user: User, kb: KnowledgeBase) -> bool:
"""创建者可管理;企业管理员可管理本企业内任意知识库。"""
if user.role == "admin" and user.enterprise_id is not None and kb.enterprise_id == user.enterprise_id:
return True
if kb.creator_id is not None and user.id == kb.creator_id:
return True
return False
# ──────────────────────────────────────────────
# 文件级权限(需要异步 DB 查询)
# ──────────────────────────────────────────────
async def can_delete_file(
conn: asyncpg.Connection,
user: User,
file: KnowledgeBaseFile,
kb: Optional[KnowledgeBase] = None,
) -> bool:
"""
判断用户是否可删除该文件。
- 文件上传者本人
- admin同企业
- leader下级成员上传的文件基于子树管辖范围
"""
# 本人上传
if file.user_id == user.id:
return True
# admin 可删同企业内任意文件
if user.role == "admin":
if kb is not None:
return kb.enterprise_id == user.enterprise_id
return True
# leader 判断上传者是否在子树内
if user.role == "leader":
return await is_subordinate(conn, user, file.user_id)
return False
async def can_upload_to_kb(conn: asyncpg.Connection, user: User, kb: KnowledgeBase) -> bool:
"""
判断用户是否可向知识库上传文件。
- 必须能 view KB
- allow_kb_upload 未被关闭
- leader 或 admin 也受 allow_kb_upload 约束(管理员初始化时均为 True
"""
if not await can_view_kb(conn, user, kb):
return False
return bool(user.allow_kb_upload)
# ──────────────────────────────────────────────
# 知识图谱级权限(保持不变)
# ──────────────────────────────────────────────
def can_view_graph(user: User, g: GraphRecord) -> bool:
"""判断用户是否可查看该知识图谱(规则与知识库一致)。"""
if user.role == "admin":
return True
if g.creator_id is not None and user.id == g.creator_id:
return True
if user.role == "leader" and user.department_id is not None and g.department_id == user.department_id:
return True
vis = g.visibility or "private"
if vis == "private":
return False
if vis == "department":
return user.department_id is not None and g.department_id == user.department_id
if vis == "enterprise":
return user.enterprise_id is not None and g.enterprise_id == user.enterprise_id
return False
def can_manage_graph(user: User, g: GraphRecord) -> bool:
"""创建者可删改;企业管理员可管理本企业内任意图谱。"""
if user.role == "admin" and user.enterprise_id is not None and g.enterprise_id == user.enterprise_id:
return True
if g.creator_id is not None and user.id == g.creator_id:
return True
return False