huoyan-enterprise/backend/seed_test_data.py

445 lines
16 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
权限方案 A 测试数据种子脚本
运行方式(在 backend 目录下):
python seed_test_data.py
前提:
1. 已执行 update-ddl.txt 中的 ALTER/CREATE 语句
2. .env 文件中的数据库配置正确
数据概览
--------
企业火焰企业id=1若已存在则跳过
部门结构(三级树形):
技术部id 由 serial 自动生成)
├── 前端组
└── 后端组
产品部
角色矩阵:
admin1 admin 无部门
leader_tech leader 技术部(兼管前端组、后端组)
leader_frontend leader 前端组
emp1_frontend employee 前端组 allow_kb_upload=true
emp2_frontend employee 前端组 allow_kb_upload=false被关闭
emp1_backend employee 后端组
leader_product leader 产品部
emp1_product employee 产品部
所有账号密码均为Test@123456
"""
import asyncio
import json
from datetime import datetime, timezone
import asyncpg
from dotenv import load_dotenv
from pathlib import Path
load_dotenv(Path(__file__).resolve().parent.parent / ".env")
from core.config import settings
from core.security import get_password_hash
# ──────────────────────────────────────────────
# 测试数据定义
# ──────────────────────────────────────────────
PASSWORD = "Test@123456"
DEPARTMENTS = [
{"key": "tech", "name": "技术部", "parent_key": None},
{"key": "frontend", "name": "前端组", "parent_key": "tech"},
{"key": "backend", "name": "后端组", "parent_key": "tech"},
{"key": "product", "name": "产品部", "parent_key": None},
]
USERS = [
{
"username": "admin1",
"email": "admin1@test.example",
"phone": "13800000001",
"display_name": "系统管理员",
"role": "admin",
"dept_key": None,
"allow_kb_upload": True,
},
{
"username": "leader_tech",
"email": "leader_tech@test.example",
"phone": "13800000002",
"display_name": "技术部总监",
"role": "leader",
"dept_key": "tech",
"allow_kb_upload": True,
},
{
"username": "leader_frontend",
"email": "leader_frontend@test.example",
"phone": "13800000003",
"display_name": "前端组组长",
"role": "leader",
"dept_key": "frontend",
"allow_kb_upload": True,
},
{
"username": "emp1_frontend",
"email": "emp1_frontend@test.example",
"phone": "13800000004",
"display_name": "前端员工甲",
"role": "employee",
"dept_key": "frontend",
"allow_kb_upload": True,
},
{
"username": "emp2_frontend",
"email": "emp2_frontend@test.example",
"phone": "13800000005",
"display_name": "前端员工乙(上传已禁)",
"role": "employee",
"dept_key": "frontend",
"allow_kb_upload": False, # 领导关闭了上传权限
},
{
"username": "emp1_backend",
"email": "emp1_backend@test.example",
"phone": "13800000006",
"display_name": "后端员工甲",
"role": "employee",
"dept_key": "backend",
"allow_kb_upload": True,
},
{
"username": "leader_product",
"email": "leader_product@test.example",
"phone": "13800000007",
"display_name": "产品部负责人",
"role": "leader",
"dept_key": "product",
"allow_kb_upload": True,
},
{
"username": "emp1_product",
"email": "emp1_product@test.example",
"phone": "13800000008",
"display_name": "产品员工甲",
"role": "employee",
"dept_key": "product",
"allow_kb_upload": True,
},
]
# ──────────────────────────────────────────────
# 种子函数
# ──────────────────────────────────────────────
async def seed():
conn: asyncpg.Connection = await asyncpg.connect(settings.db_uri)
hashed_pw = get_password_hash(PASSWORD)
now = datetime.now(timezone.utc)
print("=== 开始写入测试数据 ===\n")
# 1. 企业
enterprise_id = await conn.fetchval(
"SELECT id FROM enterprise LIMIT 1"
)
if enterprise_id is None:
enterprise_id = await conn.fetchval(
"""
INSERT INTO enterprise (name, code, created_at, updated_at)
VALUES ('火焰企业(测试)', 'huoyan-test', $1, $1)
RETURNING id
""",
now,
)
print(f"✅ 创建企业 id={enterprise_id}")
else:
print(f" 使用已有企业 id={enterprise_id}")
# 2. 部门
dept_map: dict[str, int] = {}
for d in DEPARTMENTS:
existing = await conn.fetchval(
"SELECT id FROM department WHERE enterprise_id = $1 AND name = $2",
enterprise_id, d["name"],
)
if existing:
dept_map[d["key"]] = existing
print(f" 部门「{d['name']}」已存在 id={existing}")
continue
parent_id = dept_map.get(d["parent_key"]) if d["parent_key"] else None
row = await conn.fetchrow(
"""
INSERT INTO department (enterprise_id, name, parent_id, created_at, updated_at)
VALUES ($1, $2, $3, $4, $4)
RETURNING id
""",
enterprise_id, d["name"], parent_id, now,
)
dept_map[d["key"]] = row["id"]
print(f"✅ 创建部门「{d['name']}」id={row['id']} parent={parent_id}")
# 3. 用户
user_map: dict[str, int] = {}
for u in USERS:
existing = await conn.fetchval(
"SELECT id FROM user_list WHERE username = $1", u["username"]
)
if existing:
user_map[u["username"]] = existing
print(f" 用户「{u['username']}」已存在 id={existing}")
continue
dept_id = dept_map.get(u["dept_key"]) if u["dept_key"] else None
row = await conn.fetchrow(
"""
INSERT INTO user_list (
username, email, phone, hashed_password, display_name,
enterprise_id, department_id, role, allow_kb_upload,
is_active, is_first_login, email_verified,
created_at, updated_at
)
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,TRUE,FALSE,TRUE,$10,$10)
RETURNING id
""",
u["username"], u["email"], u["phone"], hashed_pw,
u["display_name"], enterprise_id, dept_id, u["role"],
u["allow_kb_upload"], now,
)
user_map[u["username"]] = row["id"]
print(
f"✅ 创建用户「{u['display_name']}」({u['role']}) id={row['id']}"
f" dept={dept_id} allow_upload={u['allow_kb_upload']}"
)
# 4. 绑定部门负责人leader_user_id
leader_bindings = [
("tech", "leader_tech"),
("frontend", "leader_frontend"),
("product", "leader_product"),
]
for dept_key, leader_key in leader_bindings:
dept_id = dept_map.get(dept_key)
leader_id = user_map.get(leader_key)
if dept_id and leader_id:
await conn.execute(
"UPDATE department SET leader_user_id = $1 WHERE id = $2",
leader_id, dept_id,
)
print(f"🔗 部门 id={dept_id} 负责人 -> user id={leader_id}")
# 5. 知识库
kb_defs = [
{
"name": "前端组公共手册",
"description": "前端组对内共享的技术规范与操作手册",
"visibility": "department",
"creator_key": "leader_frontend",
"dept_key": "frontend",
},
{
"name": "前端员工甲的私人笔记",
"description": "个人学习笔记(私有)",
"visibility": "private",
"creator_key": "emp1_frontend",
"dept_key": "frontend",
},
{
"name": "企业全员知识库",
"description": "全企业可见,由管理员维护",
"visibility": "enterprise",
"creator_key": "admin1",
"dept_key": None,
},
{
"name": "产品部资料库",
"description": "产品部内部共享文档",
"visibility": "department",
"creator_key": "leader_product",
"dept_key": "product",
},
]
kb_map: dict[str, int] = {}
for kb in kb_defs:
creator_id = user_map.get(kb["creator_key"])
dept_id = dept_map.get(kb["dept_key"]) if kb["dept_key"] else None
existing = await conn.fetchval(
"""
SELECT id FROM knowledge_base
WHERE name = $1 AND enterprise_id = $2 AND is_deleted = FALSE
""",
kb["name"], enterprise_id,
)
if existing:
kb_map[kb["name"]] = existing
print(f" 知识库「{kb['name']}」已存在 id={existing}")
continue
row = await conn.fetchrow(
"""
INSERT INTO knowledge_base
(user_id, enterprise_id, department_id, creator_id, name,
description, visibility, created_at, updated_at)
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$8)
RETURNING id
""",
creator_id, enterprise_id, dept_id, creator_id,
kb["name"], kb["description"], kb["visibility"], now,
)
kb_map[kb["name"]] = row["id"]
print(f"✅ 创建知识库「{kb['name']}」({kb['visibility']}) id={row['id']}")
# 6. 文件记录(模拟已处理完成)
file_defs = [
{
"kb_name": "前端组公共手册",
"uploader_key": "leader_frontend",
"file_name": "前端规范v1.0.pdf",
"file_size": 204800,
"file_type": "pdf",
},
{
"kb_name": "前端组公共手册",
"uploader_key": "emp1_frontend",
"file_name": "组件库使用指南.docx",
"file_size": 153600,
"file_type": "docx",
},
{
"kb_name": "前端组公共手册",
"uploader_key": "emp1_backend",
"file_name": "接口文档摘要.txt",
"file_size": 8192,
"file_type": "txt",
},
{
"kb_name": "前端员工甲的私人笔记",
"uploader_key": "emp1_frontend",
"file_name": "Vue3学习笔记.txt",
"file_size": 4096,
"file_type": "txt",
},
{
"kb_name": "企业全员知识库",
"uploader_key": "admin1",
"file_name": "员工手册2026.pdf",
"file_size": 512000,
"file_type": "pdf",
},
{
"kb_name": "产品部资料库",
"uploader_key": "emp1_product",
"file_name": "需求评审记录.xlsx",
"file_size": 65536,
"file_type": "xlsx",
},
]
file_id_map: dict[str, int] = {}
for fd in file_defs:
kb_id = kb_map.get(fd["kb_name"])
uploader_id = user_map.get(fd["uploader_key"])
if not kb_id or not uploader_id:
continue
existing = await conn.fetchval(
"""
SELECT id FROM knowledge_base_file
WHERE knowledge_base_id = $1 AND file_name = $2 AND is_deleted = FALSE
""",
kb_id, fd["file_name"],
)
if existing:
file_id_map[fd["file_name"]] = existing
print(f" 文件「{fd['file_name']}」已存在 id={existing}")
continue
row = await conn.fetchrow(
"""
INSERT INTO knowledge_base_file
(knowledge_base_id, user_id, file_name, file_path, file_size,
file_type, status, chunk_count, created_at, updated_at)
VALUES ($1,$2,$3,$4,$5,$6,'completed',$7,$8,$8)
RETURNING id
""",
kb_id, uploader_id, fd["file_name"],
f"/fake/path/{fd['file_name']}", fd["file_size"],
fd["file_type"], 5, now,
)
file_id_map[fd["file_name"]] = row["id"]
print(f"✅ 创建文件「{fd['file_name']}」id={row['id']} uploader={fd['uploader_key']}")
# 7. 审计日志(模拟历史操作)
audit_entries = [
# 上传操作
{"actor": "emp1_frontend", "action": "upload",
"kb": "前端组公共手册", "file": "组件库使用指南.docx",
"dept_key": "frontend"},
{"actor": "emp1_backend", "action": "upload",
"kb": "前端组公共手册", "file": "接口文档摘要.txt",
"dept_key": "backend"},
# 删除操作(领导删下属文件)
{"actor": "leader_frontend", "action": "delete",
"kb": "前端组公共手册", "file": "接口文档摘要.txt",
"target": "emp1_backend", "dept_key": "frontend"},
# 权限变更
{"actor": "leader_frontend", "action": "permission_change",
"target": "emp2_frontend", "dept_key": "frontend",
"meta": {"field": "allow_kb_upload", "old": True, "new": False}},
# 下载
{"actor": "emp1_frontend", "action": "download",
"kb": "前端组公共手册", "file": "前端规范v1.0.pdf",
"dept_key": "frontend"},
# Admin 全局操作
{"actor": "admin1", "action": "create_kb",
"kb": "企业全员知识库", "dept_key": None},
]
for ae in audit_entries:
actor_id = user_map.get(ae["actor"])
target_id = user_map.get(ae.get("target")) if ae.get("target") else None
kb_id = kb_map.get(ae.get("kb")) if ae.get("kb") else None
file_id_val = file_id_map.get(ae.get("file")) if ae.get("file") else None
dept_id = dept_map.get(ae.get("dept_key")) if ae.get("dept_key") else None
meta_str = json.dumps(ae.get("meta", {})) if ae.get("meta") else None
await conn.execute(
"""
INSERT INTO kb_audit_log
(enterprise_id, actor_id, action, target_user_id, department_id,
kb_id, file_id, ip, metadata, created_at)
VALUES ($1,$2,$3,$4,$5,$6,$7,'127.0.0.1',$8::jsonb,$9)
""",
enterprise_id, actor_id, ae["action"], target_id, dept_id,
kb_id, file_id_val, meta_str, now,
)
print(f"✅ 写入 {len(audit_entries)} 条审计日志")
await conn.close()
print("\n=== 测试数据写入完成 ===")
print(f"\n所有测试账号密码:{PASSWORD}")
print("\n账号列表:")
headers = ["用户名", "角色", "部门", "allow_upload"]
rows_display = [
("admin1", "admin", "", ""),
("leader_tech", "leader", "技术部", ""),
("leader_frontend", "leader", "前端组", ""),
("emp1_frontend", "employee", "前端组", ""),
("emp2_frontend", "employee", "前端组", "✗(已禁)"),
("emp1_backend", "employee", "后端组", ""),
("leader_product", "leader", "产品部", ""),
("emp1_product", "employee", "产品部", ""),
]
col_w = [18, 10, 8, 12]
sep = "+" + "+".join("-" * (w + 2) for w in col_w) + "+"
fmt = "| " + " | ".join(f"{{:<{w}}}" for w in col_w) + " |"
print(sep)
print(fmt.format(*headers))
print(sep)
for r in rows_display:
print(fmt.format(*r))
print(sep)
print("\n知识库:")
for name, kid in kb_map.items():
print(f" id={kid} {name}")
if __name__ == "__main__":
asyncio.run(seed())