一、项目整体架构

1. 系统架构全景

📝 是什么?

swxy 是一个典型的 RAG(Retrieval-Augmented Generation)全栈项目,采用前后端分离架构,后端提供 RESTful API,前端通过 HTTP/SSE 与后端通信。系统核心功能是:用户上传文档 -> 文档解析并存入向量数据库 -> 用户提问时检索相关内容 -> 结合 LLM 生成回答。

💡 为什么?

纯 LLM 受限于训练数据的时效性和领域知识的覆盖面。RAG 通过外挂知识库,让 LLM 在回答时引用真实文档内容,解决了幻觉问题知识更新问题

系统组件一览:

层级技术选型作用
前端React + TypeScript + Ant Design + Vite用户交互界面
后端 APIFastAPI + UvicornRESTful API 服务
关系数据库PostgreSQL 15用户、会话、消息持久化
搜索引擎Elasticsearch 8.11全文检索 + 向量检索
缓存Redis 7快速解析文档临时存储
文档解析DeepDoc (OCR + 布局识别)PDF/DOCX 等多格式解析
NLP自研分词器 (RagTokenizer)中文分词、关键词提取
LLMDeepSeek-R1 / Qwen2.5对话生成、推荐问题生成
EmbeddingDashScope API文本向量化

2. 数据流全链路

怎么做?——以一次完整问答为例
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
用户提问 "国电电力2024年营收如何?"
前端 chat/index.tsx ─── POST /chat_on_docs ───► 后端 chat_rt.py
    │                                              │
    │                                              ▼
    │                                    retrieval.py: retrieve_content()
    │                                              │
    │                                              ▼
    │                                    search_v2.py: Dealer.retrieval()
    │                                    ┌─────────┴─────────┐
    │                                    │                   │
    │                                 全文检索            向量检索
    │                              (Elasticsearch)    (Embedding相似度)
    │                                    └─────────┬─────────┘
    │                                              │ 混合排序 + Rerank
    │                                              ▼
    │                                    chat.py: get_chat_completion()
    │                                    构造 Prompt + 调用 DeepSeek-R1
    │                                              │
    │                                              ▼ SSE 流式响应
    ◄──────────────── text/event-stream ───────────┘
前端实时渲染回答 + 引文标注

二、Docker Compose 基础设施搭建

1. 容器编排概览

📝 是什么?

Docker Compose 是一种用 YAML 文件定义多容器应用的工具。本项目通过 docker-compose.yml 一键启动所有基础设施服务。

💡 为什么?

开发环境需要同时运行 PostgreSQL、Elasticsearch、Redis 等多个服务。Docker Compose 消除了手动安装和配置各个服务的复杂性,确保环境一致性。

怎么做?——项目的 docker-compose.yml 解读
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
version: '3.8'
services:
  swxy_api:
    build:
      context: ./app
      dockerfile: Dockerfile
    environment:
      - DATABASE_URL=postgresql://postgres:pg123456@gsk_pg:5432/gsk
      - ES_HOST=http://es01:9200
      - REDIS_HOST=redis
      - REDIS_PORT=6379
    ports:
      - "8000:8000"
    command: ["uvicorn", "app_main:app", "--host", "0.0.0.0", "--port", "8000"]
    depends_on:
      - gsk_pg
      - es01
      - redis

  es01:
    image: docker.elastic.co/elasticsearch/elasticsearch:8.11.3
    environment:
      - discovery.type=single-node
      - xpack.security.enabled=true
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"

  gsk_pg:
    image: postgres:15-alpine
    environment:
      - POSTGRES_PASSWORD=pg123456
      - POSTGRES_USER=postgres
      - POSTGRES_DB=gsk
    volumes:
      - ./init.sql:/docker-entrypoint-initdb.d/init.sql

  redis:
    image: redis:7-alpine

关键设计决策:

  • depends_on:确保 API 服务在数据库和搜索引擎之后启动
  • volumesinit.sql 挂载到 PostgreSQL 的初始化目录,容器首次启动时自动执行建表语句
  • 网络:所有服务共享 gsk_network 桥接网络,容器间通过服务名(如 gsk_pges01redis)互相访问
  • 单节点 ESdiscovery.type=single-node 适合开发环境,生产环境应配置集群

2. 数据库初始化

📝 是什么?

init.sql 定义了系统的关系数据模型,包含用户表、会话表、消息表和知识库表。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
-- 用户表
CREATE TABLE IF NOT EXISTS users (
    id SERIAL PRIMARY KEY,
    username VARCHAR(50) UNIQUE NOT NULL,
    password_hash VARCHAR(100) NOT NULL,
    created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);

-- 会话表
CREATE TABLE IF NOT EXISTS sessions (
    session_id VARCHAR(16) PRIMARY KEY,
    session_name VARCHAR(255) NOT NULL,
    user_id VARCHAR(255) NOT NULL,
    created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);

-- 消息表
CREATE TABLE IF NOT EXISTS messages (
    message_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    session_id VARCHAR(16) NOT NULL,
    user_question TEXT NOT NULL,
    model_answer TEXT NOT NULL,
    documents TEXT,             -- 检索到的文档引用(JSON字符串)
    recommended_questions TEXT, -- 推荐问题(JSON字符串)
    think TEXT,                 -- 模型思考过程
    created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);

-- 知识库表
CREATE TABLE IF NOT EXISTS knowledgebases (
    id SERIAL PRIMARY KEY,
    user_id VARCHAR(255) NOT NULL,
    file_name VARCHAR(255) NOT NULL,
    created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);
⚠️ 设计要点
  • messages 表的 documents 字段存储的是 JSON 字符串而非 jsonb 类型——注释中写了"修改为 jsonb 类型"但实际是 TEXT,这在生产环境中应改为 jsonb 以支持 JSON 查询。
  • session_id 使用 16 位随机字符串(uuid4().replace("-","")[:16]),而非完整 UUID,这是为了缩短 URL。
  • 使用 gen_random_uuid() 生成消息 ID,需要 pgcrypto 扩展。

三、FastAPI 后端架构

1. 应用入口与路由注册

📝 是什么?

FastAPI 是一个高性能的 Python Web 框架,基于 Starlette 和 Pydantic,原生支持异步和 OpenAPI 文档。

怎么做?——app_main.py 的结构
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from router import chat_rt, user_rt, history_rt

app = FastAPI(root_path=os.getenv("ROOT_PATH", "http://localhost:8000"))

# CORS 中间件——允许前端跨域访问
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# 路由注册——三个模块
app.include_router(chat_rt.router)    # 对话相关
app.include_router(user_rt.router)    # 用户认证
app.include_router(history_rt.router) # 历史记录

架构分层:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
router/          ← 路由层:接收请求、参数校验、鉴权
  ├── chat_rt.py
  ├── user_rt.py
  └── history_rt.py
service/         ← 服务层:业务逻辑
  ├── core/
  │   ├── chat.py        ← 对话生成
  │   ├── retrieval.py   ← 检索入口
  │   ├── file_parse.py  ← 文档解析+入库
  │   ├── rag/           ← RAG 核心
  │   │   ├── nlp/       ← 分词、搜索
  │   │   └── app/       ← 分块策略
  │   └── deepdoc/       ← 文档解析引擎
  └── quick_parse_service.py  ← 快速解析服务
models/          ← 数据模型
schemas/         ← 请求/响应 Schema
utils/           ← 工具函数(数据库连接、日志等)

2. 路由层详解——chat_rt.py

💡 为什么要分层?

路由层只负责 HTTP 协议相关的逻辑(参数提取、鉴权、响应格式),业务逻辑全部下沉到 service 层。这样可以让业务逻辑独立于 HTTP 框架进行测试和复用。

核心路由一览:

路由方法功能
/create_sessionPOST创建新对话会话
/chat_on_docsPOST基于知识库的问答(核心)
/quick_parsePOST快速解析小文档
/upload_filesPOST上传文档到知识库
/get_parsed_contentGET获取已解析内容

/chat_on_docs 路由源码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
@router.post("/chat_on_docs")
async def chat_on_docs(
    session_id: str = Query(...),
    request: ChatRequest = Body(...),
    credentials: JwtAuthorizationCredentials = Security(access_security),
):
    user_id = str(credentials.subject.get("user_id"))
    question = request.message

    # 1. 从知识库检索相关内容
    references = []
    try:
        references = retrieve_content(user_id, question)
    except Exception as e:
        references = []  # 没有知识库也不报错

    # 2. 返回 SSE 流式响应
    return StreamingResponse(
        get_chat_completion(session_id, question, references, user_id),
        media_type="text/event-stream"
    )
⚠️ 关键点
  • 使用 StreamingResponse 返回 Server-Sent Events(SSE),实现实时流式输出
  • JWT 鉴权通过 Security(access_security) 注入,每个请求都会校验 token
  • 检索失败时不阻断流程,而是传空列表给 LLM,这样即使没有知识库也能正常对话

3. 用户认证——JWT

📝 是什么?

JWT(JSON Web Token)是一种无状态的身份认证机制。用户登录后获得 token,后续请求携带该 token 即可证明身份。

怎么做?

本项目使用 fastapi_jwt 库实现 JWT 鉴权。路由函数通过依赖注入获取用户身份:

1
2
3
4
5
6
7
8
9
from fastapi_jwt import JwtAuthorizationCredentials
from service.auth import access_security

@router.post("/create_session")
async def create_session(
    credentials: JwtAuthorizationCredentials = Security(access_security),
):
    user_id = credentials.subject.get("user_id")
    # user_id 从 JWT token 的 payload 中提取

JWT 的 subject 字段包含 user_id,这个 ID 同时作为 Elasticsearch 的索引名称,实现了用户级别的数据隔离——每个用户的文档存储在独立的 ES 索引中。


第四章 文档解析流程

4.1 文档上传与解析入口

📝 是什么?

文档解析是 RAG 系统的第一步,将 PDF、DOCX、Excel 等非结构化文档转换为可检索的文本块(chunks)。

💡 为什么?

LLM 无法直接理解二进制文件格式。必须先将文档内容提取为纯文本,再进行分块、向量化后才能用于检索。

怎么做?——file_parse.py 的处理流程
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
def execute_insert_process(file_path: str, file_name: str, index_name: str):
    """文档处理和插入 Elasticsearch 的完整流程"""
    # 第一步:解析文档 -> 得到文本块列表
    documents = parse(file_path)

    # 第二步:批量处理——生成向量、构建元数据
    processed_documents = process_items(documents, file_name, index_name)

    # 第三步:批量插入 Elasticsearch
    es_connection = ESConnection()
    es_connection.insert(documents=processed_documents, indexName=index_name)

每个 chunk 的数据结构:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
d = {
    "id": chunck_id,                    # xxhash 生成的唯一ID
    "content_ltks": item["content_ltks"],  # 分词后的内容(用于全文检索)
    "content_with_weight": item["content_with_weight"],  # 原始内容
    "content_sm_ltks": item["content_sm_ltks"],  # 细粒度分词
    "kb_id": index_name,                 # 知识库ID
    "docnm_kwd": item["docnm_kwd"],     # 文档名称
    "title_tks": item["title_tks"],      # 标题分词
    "doc_id": xxhash.xxh64(file_name),   # 文档ID
    "q_1024_vec": embedding,             # 1024维向量
}

4.2 多格式解析——Naive 分块策略

📝 是什么?

naive.py 中的 chunk() 函数是文档分块的核心,支持 PDF、DOCX、Excel、TXT、Markdown、HTML、JSON 等多种格式。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
def chunk(filename, binary=None, from_page=0, to_page=100000,
          lang="Chinese", callback=None, **kwargs):
    """
    Naive 分块策略:
    1. 按分隔符切割连续文本
    2. 将相邻片段合并,直到 token 数达到上限
    """
    parser_config = kwargs.get("parser_config", {
        "chunk_token_num": 128,
        "delimiter": "\n!?。;!?",
        "layout_recognize": "DeepDOC"
    })

    # 根据文件扩展名选择解析器
    if re.search(r"\.pdf$", filename, re.IGNORECASE):
        pdf_parser = Pdf()
        sections, tables = pdf_parser(filename, from_page=from_page,
                                       to_page=to_page, callback=callback)
    elif re.search(r"\.docx$", filename, re.IGNORECASE):
        sections, tables = Docx()(filename, binary)
    elif re.search(r"\.xlsx?$", filename, re.IGNORECASE):
        excel_parser = ExcelParser()
        sections = [(_, "") for _ in excel_parser(binary) if _]
    # ... 更多格式

    # 合并分块
    chunks = naive_merge(sections,
                         int(parser_config.get("chunk_token_num", 128)),
                         parser_config.get("delimiter", "\n!?。;!?"))
    # 对分块进行分词处理
    res.extend(tokenize_chunks(chunks, doc, is_english, pdf_parser))
    return res
💡 分块策略的设计权衡
  • chunk_token_num=128:每个块最多 128 个 token。太大会引入噪音,太小会丢失上下文。
  • delimiter="\n!?。;!?":以句号、问号、感叹号等作为切分点,保证语义完整性。

4.3 DeepDoc PDF 解析引擎

📝 是什么?

DeepDoc 是项目中最复杂的文档解析模块,专门处理 PDF 文件。它集成了 OCR(光学字符识别)布局识别表格结构识别三大 AI 能力。

💡 为什么?

PDF 不是简单的文本格式,它包含:扫描图片(需要 OCR)、复杂表格(需要结构识别)、多栏布局(需要版面分析)。直接用 pdfplumber 提取文本会丢失结构信息。

怎么做?——RAGFlowPdfParser 的处理流水线
1
2
3
4
5
6
class RAGFlowPdfParser:
    def __init__(self):
        self.ocr = OCR()                           # OCR 引擎
        self.layouter = LayoutRecognizer("layout")  # 布局识别模型
        self.tbl_det = TableStructureRecognizer()   # 表格结构识别模型
        self.updown_cnt_mdl = xgb.Booster()         # XGBoost 上下文拼接模型

PDF 解析的 5 步流水线(对应 Pdf.__call__ 方法):

1
2
3
4
5
Step 1: __images__()        ── OCR 识别,将每页转为图像并提取文字
Step 2: _layouts_rec()      ── 布局分析,识别标题/正文/表格/图片区域
Step 3: _table_transformer_job()  ── 表格结构分析,识别行列关系
Step 4: _text_merge()       ── 水平文本合并,将同一行的文字拼接
Step 5: _concat_downward()  ── 垂直文本拼接,使用 XGBoost 模型判断是否应该合并

上下文拼接的特征工程_updown_concat_features 方法):

这是一个经典的 ML 特征工程案例。XGBoost 模型需要判断相邻两个文本块是否应该合并,使用了 31 个特征

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
fea = [
    up.get("R", -1) == down.get("R", -1),    # 是否在同一表格行
    y_dis / h,                                  # 垂直距离与行高的比值
    down["page_number"] - up["page_number"],    # 跨页情况
    up["layout_type"] == down["layout_type"],   # 布局类型是否一致
    re.search(r"([。?!;]|[a-z]\.)$", up["text"]),  # 上文是否以句号结尾
    self._match_proj(down),                     # 下文是否是项目符号开头
    len(tks_all) - len(tks_up) - len(tks_down), # 合并后分词数变化
    # ... 共 31 个特征
]
⚠️ 性能注意

DeepDoc 解析一个 PDF 需要加载 OCR、布局识别、表格识别三个深度学习模型,内存占用较大。项目中模型文件存储在 rag/res/deepdoc/ 目录,首次运行时会从 HuggingFace 下载。

4.4 快速解析服务

📝 是什么?

QuickParseService 是一个轻量级的文档解析服务,用于处理小文档(PDF 不超过 4 页,DOCX/TXT 不超过 4000 字符),解析结果存入 Redis 而非 Elasticsearch。

💡 为什么?

不是所有场景都需要完整的 RAG 检索。当用户上传一份简短文档时,直接将全文放入 LLM 的 prompt 中即可,无需向量化和检索。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
class QuickParseService:
    def __init__(self):
        self.redis_client = redis.Redis(host=self.redis_host, port=self.redis_port)
        self.max_pages = 4               # PDF 页数上限
        self.max_characters = 4000       # 文本字符上限
        self.redis_expire_seconds = 7200 # 2小时过期

    def quick_parse_document(self, session_id, filename, file_content):
        file_extension = self.validate_file_format(filename)

        # 检查会话唯一性
        if self.check_session_exists(session_id):
            raise HTTPException(400, "该会话已有文档")

        # 根据类型解析
        content, count = self.parse_document(file_content, file_extension)

        # 存入 Redis,key=session_id,value=文档全文
        self.store_to_redis(session_id, content)

快速解析 vs 知识库解析的对比:

维度快速解析知识库解析
存储Redis(2小时过期)Elasticsearch(永久)
文档大小小(<4页/<4000字)不限
检索方式全文放入 Prompt向量+关键词混合检索
适用场景临时文档问答长期知识库

第五章 RAG 核心检索流程

5.1 检索入口

📝 是什么?

retrieval.py 是检索的入口文件,它初始化 ES 连接和 Dealer 对象,提供 retrieve_content() 函数供路由层调用。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
from service.core.rag.nlp.search_v2 import Dealer
from service.core.rag.utils.es_conn import ESConnection

es_connection = ESConnection()
dealer = Dealer(dataStore=es_connection)

def retrieve_content(indexNames: str, question: str):
    results = dealer.retrieval(
        question=question,
        embd_mdl=None,
        tenant_ids=indexNames,
        kb_ids=None,
        vector_similarity_weight=0.6,  # 向量权重 60%
        page=1,
        page_size=5                     # 返回 Top-5
    )
    # 提取并格式化结果
    extracted_data = []
    for i, chunk in enumerate(results['chunks'], start=1):
        message = {
            "id": i,
            "document_id": chunk.get('doc_id'),
            "document_name": chunk.get('docnm_kwd').split("/")[-1],
            "content_with_weight": chunk.get('content_with_weight'),
        }
        extracted_data.append(message)
    return extracted_data
💡 关键参数

vector_similarity_weight=0.6 表示在混合检索中向量相似度占 60%,关键词相似度占 40%。这个比例决定了系统是更倾向于语义匹配(向量)还是精确匹配(关键词)。

5.2 混合检索——Dealer.retrieval()

📝 是什么?

Dealer 是搜索引擎的核心类,实现了混合检索(Hybrid Search)——同时使用关键词全文检索和向量语义检索,然后通过加权融合和重排序得到最终结果。

怎么做?——retrieval() 方法的完整流程
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
def retrieval(self, question, embd_mdl, tenant_ids, kb_ids,
              page, page_size, similarity_threshold=0.1,
              vector_similarity_weight=0.3, top=1024, ...):

    # 第一步:构建搜索请求
    req = {
        "question": question,
        "size": max(page_size * 3, 128),  # 先检索大量候选
        "vector": True,
        "topk": top,
        "similarity": similarity_threshold,
    }

    # 第二步:执行搜索(全文 + 向量混合)
    sres = self.search(req, [index_name(tid) for tid in tenant_ids], ...)

    # 第三步:重排序(使用 Rerank 模型)
    if sres.total > 0:
        sim, tsim, vsim = self.rerank_by_model(
            rerank_mdl, sres, question,
            1 - vector_similarity_weight,  # 关键词权重
            vector_similarity_weight        # 向量权重
        )

    # 第四步:按相似度降序排列,取 Top-K
    idx = np.argsort(sim * -1)[(page-1)*page_size : page*page_size]

    # 第五步:构造返回结果
    for i in idx:
        if sim[i] < similarity_threshold:
            break
        ranks["chunks"].append({
            "content_with_weight": chunk["content_with_weight"],
            "similarity": sim[i],
            "vector_similarity": vsim[i],
            "term_similarity": tsim[i],
            # ...
        })
    return ranks

5.3 搜索方法——Dealer.search()

search() 方法是检索的核心,它同时发起全文查询和向量查询:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def search(self, req, idx_names, kb_ids, emb_mdl=None, ...):
    qst = req.get("question", "")

    # 1. 全文检索:对问题进行分词,构建匹配表达式
    matchText, keywords = self.qryr.question(qst, min_match=0.3)

    # 2. 向量检索:将问题转为向量
    matchDense = self.get_vector(qst, emb_mdl, topk, ...)

    # 3. 加权融合:全文 5% + 向量 95%
    fusionExpr = FusionExpr("weighted_sum", topk, {"weights": "0.05, 0.95"})

    # 4. 发送到 Elasticsearch 执行
    res = self.dataStore.search(src, highlightFields, filters,
                                 [matchText, matchDense, fusionExpr], ...)

    # 如果结果为空,降低匹配阈值重试
    if total == 0:
        matchText, _ = self.qryr.question(qst, min_match=0.1)
        matchDense.extra_options["similarity"] = 0.17
        res = self.dataStore.search(...)

    return SearchResult(total=total, ids=ids, query_vector=q_vec, ...)
⚠️ 融合权重的两层设计

注意这里有两层权重:

  1. ES 层融合FusionExpr):全文 0.05 + 向量 0.95 —— 这是初步检索阶段
  2. Rerank 层融合rerank_by_model):根据 vector_similarity_weight 参数调整 —— 这是精排阶段

初步检索偏重向量是为了尽可能多地召回语义相关的候选;精排阶段再通过关键词相似度进行精确校准。

5.4 重排序机制

📝 是什么?

Rerank(重排序)是在初步检索后,对候选结果进行更精确的相关性评估。本项目提供两种重排序方式。

方式一:基于混合相似度的 Rerank

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
def rerank(self, sres, query, tkweight=0.3, vtweight=0.7, ...):
    # 提取每个 chunk 的分词(标题权重x2,重要词权重x5,问题权重x6)
    for i in sres.ids:
        tks = content_ltks + title_tks * 2 + important_kwd * 5 + question_tks * 6

    # 混合相似度 = 关键词相似度 * tkweight + 向量相似度 * vtweight
    sim, tksim, vtsim = self.qryr.hybrid_similarity(
        sres.query_vector, ins_embd, keywords, ins_tw, tkweight, vtweight
    )
    return sim + rank_fea, tksim, vtsim

方式二:基于 Rerank 模型的精排

1
2
3
4
def rerank_by_model(self, rerank_mdl, sres, query, tkweight=0.3, vtweight=0.7, ...):
    tksim = self.qryr.token_similarity(keywords, ins_tw)
    vtsim, _ = rerank_similarity(query, [rmSpace(" ".join(tks)) for tks in ins_tw])
    return tkweight * (np.array(tksim) + rank_fea) + vtweight * vtsim, tksim, vtsim
💡 权重加成的设计哲学
1
tks = content_ltks + title_tks * 2 + important_kwd * 5 + question_tks * 6

这行代码体现了领域知识:标题比正文重要(x2),关键词比普通词重要(x5),如果 chunk 本身就是一个问答对的问题部分,则最重要(x6)。


第六章 NLP 处理

6.1 中文分词器——RagTokenizer

📝 是什么?

RagTokenizer 是项目自研的中文分词器,基于 Trie 树(前缀树)和双向最大匹配算法,专门为 RAG 场景优化。

💡 为什么不用 jieba?

jieba 是通用分词器,而 RAG 场景需要:

  1. 更细粒度的分词(fine_grained_tokenize)用于检索召回
  2. 词频和词性标注用于权重计算
  3. 英文词干化(Porter Stemmer)和词形还原(WordNet Lemmatizer)
  4. 全角转半角、繁体转简体等预处理
怎么做?——分词流程
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
class RagTokenizer:
    def __init__(self):
        self.DIR_ = os.path.join(get_project_base_directory(), "rag/res", "huqie")
        self.stemmer = PorterStemmer()          # 英文词干提取
        self.lemmatizer = WordNetLemmatizer()    # 英文词形还原
        # 加载 Trie 树词典
        self.trie_ = datrie.Trie.load(trie_file_name)

    def tokenize(self, line):
        line = self._strQ2B(line).lower()        # 全角转半角 + 小写
        line = self._tradi2simp(line)             # 繁体转简体

        # 对中文部分:双向最大匹配 + DFS 求最优分词
        tks, s = self.maxForward_(L)              # 正向最大匹配
        tks1, s1 = self.maxBackward_(L)           # 逆向最大匹配
        # 不一致的部分用 DFS 深度搜索最优解
        self.dfs_(chars, 0, [], tkslist)

        # 对英文部分:词干化 + 词形还原
        res = self.english_normalize_(res)
        return self.merge_(res)

评分函数(选择最优分词方案):

1
2
3
4
5
6
7
8
def score_(self, tfts):
    B = 30
    F, L = 0, 0
    for tk, (freq, tag) in tfts:
        F += freq      # 词频得分
        L += 0 if len(tk) < 2 else 1  # 长词奖励
    L /= len(tks)
    return tks, B / len(tks) + L + F   # 综合评分 = 分词数惩罚 + 长词奖励 + 词频
💡 评分公式解读
  • B / len(tks):分词数越少得分越高(鼓励长词匹配)
  • L:长词(≥2字符)的比例越高越好
  • F:词频越高的分词方案越好

6.2 细粒度分词

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
def fine_grained_tokenize(self, tks):
    """将粗粒度分词进一步细分,用于提高检索召回率"""
    for tk in tks:
        if len(tk) < 3:
            res.append(tk)      # 短词不再细分
            continue
        # 用 DFS 找到次优分词方案
        tkslist = []
        self.dfs_(tk, 0, [], tkslist)
        stk = self.sortTks_(tkslist)[1][0]  # 注意取 [1] 即第二优方案
        res.append(" ".join(stk))
📝 为什么取第二优方案?

第一优方案(最优分词)已经用于主分词字段 content_ltks。细粒度分词取第二优方案,可以生成不同的词组合,存入 content_sm_ltks 字段,增加检索时的命中机会。


第七章 对话管理

7.1 对话服务——chat.py

📝 是什么?

chat.py 是对话生成的核心模块,负责:构造 Prompt、调用 LLM、流式输出、生成推荐问题、持久化对话记录。

怎么做?——get_chat_completion() 的完整流程
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
def get_chat_completion(session_id, question, retrieved_content, user_id):
    # 第一步:获取快速解析的文档内容(来自 Redis)
    quick_parse_content = get_quick_parse_content(session_id)

    # 第二步:构建参考内容
    reference_parts = []
    if retrieved_content:
        for ref in retrieved_content:
            reference_parts.append(f"[{id}] {ref['content_with_weight']}")
    if quick_parse_content:
        truncated_content = quick_parse_content[:4000]
        reference_parts.append(f"**当前会话文档内容:**\n[{id}] {truncated_content}")

    # 第三步:构造 Prompt
    prompt = f"""
    你是一个专业的智能助手...
    **参考内容:**
    {formatted_references}
    **用户问题:**
    {question}
    """

    # 第四步:调用 DeepSeek-R1 流式生成
    client = OpenAI(api_key=..., base_url=...)
    completion = client.chat.completions.create(
        model="deepseek-r1",
        messages=[{"role": "user", "content": prompt}],
        stream=True,
    )

    # 第五步:先发送文档引用信息
    yield f"event: message\ndata: {json.dumps({'documents': all_documents})}\n\n"

    # 第六步:流式输出内容
    for chunk in completion:
        if chunk.choices[0].finish_reason == "stop":
            # 生成推荐问题
            recommended_questions = generate_recommended_questions(question, ...)
            yield f"event: message\ndata: {json.dumps({'recommended_questions': ...})}\n\n"
            yield "event: end\ndata: [DONE]\n\n"

            # 持久化到数据库
            write_chat_to_db(session_id, question, model_answer, ...)
            update_session_name(session_id, question, user_id)
        else:
            delta = chunk.choices[0].delta
            if delta.content:
                # 正常回答内容
                yield f"event: message\ndata: {json.dumps({'content': delta.content, 'thinking': False})}\n\n"
            else:
                # DeepSeek-R1 的思考过程
                yield f"event: message\ndata: {json.dumps({'content': delta.reasoning_content, 'thinking': True})}\n\n"

7.2 SSE 协议格式

📝 是什么?

Server-Sent Events(SSE)是一种服务器向客户端推送数据的协议,基于 HTTP,比 WebSocket 更轻量。

本项目 SSE 消息格式:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
# 第一条:发送文档引用
event: message
data: {"documents": [{"document_id": "...", "document_name": "...", "content_with_weight": "..."}]}

# 中间多条:流式输出内容
event: message
data: {"role": "assistant", "content": "国电电力", "thinking": false}

event: message
data: {"role": "assistant", "content": "的思考过程...", "thinking": true}

# 推荐问题
event: message
data: {"recommended_questions": ["问题1", "问题2", "问题3"]}

# 结束标志
event: end
data: [DONE]

7.3 引用标注机制

📝 是什么?

系统在 Prompt 中要求 LLM 使用 ##编号$$ 格式标注引用来源,前端解析后可以高亮显示引用的文档段落。

Prompt 中的关键指令:

1
2
每一块内容都必须标注引用的来源,格式为:##引用编号$$。
例如:##1$$ 表示引用自第1条参考内容。

7.4 推荐问题生成

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
def generate_recommended_questions(user_question, retrieved_content=None, session_id=None):
    """使用 Qwen2.5-7B 生成 3 个推荐问题"""
    prompt = f"""
    用户问题:{user_question}
    当前对话基于这些文档:{', '.join(document_topics)}
    生成3个相关的推荐问题...
    输出格式:{{"recommended_questions": ["问题1", "问题2", "问题3"]}}
    """
    client = OpenAI(api_key=..., base_url=...)
    completion = client.chat.completions.create(
        model="qwen2.5-7b-instruct",
        response_format={"type": "json_object"},
        stream=False,
        timeout=30,
    )
💡 模型选择策略
  • 主对话用 DeepSeek-R1(强推理能力,支持思维链)
  • 推荐问题用 Qwen2.5-7B(轻量快速,JSON 输出稳定)
  • 会话命名用 Qwen2.5-72B(语言理解好,生成简洁标题)

7.5 会话管理

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
def update_session_name(session_id: str, question: str, user_id: str):
    """首次对话时,根据用户问题自动生成会话名称"""
    query_result = db.execute(
        text("SELECT session_name FROM sessions WHERE session_id = :session_id"),
        {"session_id": session_id}
    ).fetchone()

    if not query_result:  # 首次对话
        session_name = generate_session_name(question)
        db.execute(text("""
            INSERT INTO sessions (session_id, user_id, session_name)
            VALUES (:session_id, :user_id, :session_name)
        """), {...})

第八章 前端 React 架构

8.1 技术栈概览

📝 是什么?

前端采用 React + TypeScript + Vite 构建,使用 Ant Design 作为 UI 组件库,Valtio 作为状态管理方案。

技术作用
React 18UI 框架
TypeScript类型安全
Vite构建工具
Ant DesignUI 组件库
Valtio (proxy)响应式状态管理
ahooksReact Hooks 工具库
AxiosHTTP 请求

8.2 聊天页面组件结构

怎么做?——chat/index.tsx 的组件组织
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
ComPageLayout(页面布局)
├── sender(底部输入区域)
│   ├── Source(文档来源展示)
│   └── ComSender(消息输入框)
├── right(右侧面板)
│   ├── ChatDrawer > Citations(引文面板)
│   └── ChatDrawer > Contracts(文档面板)
└── children(主内容区)
    ├── Header(对话标题)
    ├── ChatMessage(消息列表)
    └── Drawer(文档详情抽屉)

8.3 状态管理——Valtio

1
2
3
4
5
6
const [chat] = useState(() => {
  return proxy({
    list: [] as API.ChatItem[],
  })
})
const { list } = useSnapshot(chat) as { list: API.ChatItem[] }
📝 是什么?

Valtio 是一个基于 Proxy 的状态管理库。proxy() 创建可变的响应式对象,useSnapshot() 创建不可变的快照用于渲染。直接修改 chat.list(如 push、赋值属性)会自动触发组件重渲染。

💡 为什么用 Valtio 而非 Redux?

在流式输出场景中,每收到一个 token 就需要更新状态。Valtio 允许直接修改对象属性(target.content += delta.content),比 Redux 的 action/reducer 模式更简洁高效。

8.4 SSE 流式数据处理

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
const sendChat = useCallback(async (target: API.ChatItem, message: string) => {
  // 发起 SSE 请求
  const res = await api.session.chat({ id: id!, message })
  const reader = res.data.getReader()

  async function read(reader: ReadableStreamDefaultReader<any>) {
    let temp = ''
    const decoder = new TextDecoder('utf-8')

    while (true) {
      const { value, done } = await reader.read()
      temp += decoder.decode(value)

      // 按行解析 SSE 数据
      while (true) {
        const index = temp.indexOf('\n')
        if (index === -1) break
        const slice = temp.slice(0, index)
        temp = temp.slice(index + 1)

        if (slice.startsWith('data: ')) {
          parseData(slice)  // 解析 data 行
          scrollToBottom()   // 自动滚动
        }
      }

      if (done) break
    }
  }

  function parseData(slice: string) {
    const str = slice.trim().replace(/^data\: /, '').trim()
    if (str === '[DONE]') return

    const json = JSON.parse(str)

    // 区分思考内容和回答内容
    if (json?.content) {
      if (json.thinking) {
        target.think = `${target.think || ''}${json.content || ''}`
      } else {
        target.content = `${target.content || ''}${json.content || ''}`
      }
    }

    // 处理文档引用
    if (json?.documents?.length) {
      target.reference = json.documents
      // 去重,构建文档列表
      const map = new Map<string, API.Document>()
      json.documents.forEach((chunk: API.Reference) => {
        map.set(chunk.document_id, { ... })
      })
      target.documents = Array.from(map.values())
    }

    // 处理推荐问题
    if (json?.recommended_questions?.length) {
      target.recommended_questions = json.recommended_questions
    }
  }
}, [chat])

8.5 API 层——session.ts

📝 是什么?

session.ts 封装了所有与后端对话相关的 API 调用,使用 Axios 作为 HTTP 客户端。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 创建会话
export function create(params?: {}) {
  return request.post<API.Result<{ session_id: string }>>('/create_session', params)
}

// 发起对话(SSE 流式)
export function chat(params: { id: string; message: string }) {
  return request.post<ReadableStream>('/chat_on_docs', { message: params.message }, {
    headers: { Accept: 'text/event-stream' },
    responseType: 'stream',
    adapter: 'fetch',     // 使用 fetch adapter 以支持 ReadableStream
    params: { session_id: params.id },
  })
}

// 快速解析文档
export function quickParse(params: { session_id: string; file: File }) {
  const formData = new FormData()
  formData.append('file', params.file)
  return request.post('/quick_parse', formData, {
    params: { session_id: params.session_id },
  })
}
⚠️ 关键技术点

adapter: 'fetch' 告诉 Axios 使用浏览器原生 Fetch API 而非 XMLHttpRequest,因为只有 Fetch API 支持 ReadableStream,这是实现前端 SSE 流式读取的必要条件。


第九章 系统集成与完整链路

9.1 文档入库完整链路

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
用户上传文件
    
     POST /upload_files
chat_rt.py: upload_files()
    
    ├── 保存文件到本地 storage/file/{session_id}/
    
    ├── file_parse.py: execute_insert_process()
           
           ├── naive.py: chunk()           文档解析 + 分块
                ├── pdf_parser.py         PDF: OCR + 布局识别
                ├── Docx()                DOCX 解析
                └── ExcelParser()         Excel 解析
           
           ├── rag_tokenizer.tokenize()    对每个 chunk 分词
           ├── generate_embedding()        生成向量 (1024)
           └── ESConnection.insert()       写入 Elasticsearch
    
    └── insert_knowledgebase()              记录到 PostgreSQL

9.2 问答检索完整链路

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
用户提问
    ▼ POST /chat_on_docs
chat_rt.py
    ├── retrieve_content(user_id, question)
    │       │
    │       ▼ Dealer.retrieval()
    │       ├── FulltextQueryer.question()   ← 问题分词,构建查询
    │       ├── generate_embedding(question)  ← 问题向量化
    │       ├── ES search (全文 + 向量)       ← 混合检索
    │       ├── rerank_by_model()             ← Rerank 精排
    │       └── 返回 Top-5 chunks
    └── get_chat_completion(session_id, question, references, user_id)
            ├── get_quick_parse_content()     ← 检查 Redis 快速解析内容
            ├── 构造 Prompt (参考内容 + 用户问题)
            ├── DeepSeek-R1 流式生成           ← LLM 推理
            ├── SSE 流式输出                   ← 实时返回前端
            ├── generate_recommended_questions() ← Qwen2.5-7B 生成推荐问题
            ├── write_chat_to_db()             ← 持久化到 PostgreSQL
            └── update_session_name()          ← 首次对话自动命名

9.3 双通道文档接入

📝 设计亮点

系统同时支持两种文档接入方式,在 get_chat_completion 中合并:

  1. 知识库通道:文档 -> 解析 -> 分块 -> 向量化 -> ES 存储 -> 检索
  2. 快速解析通道:小文档 -> 全文 -> Redis 缓存 -> 直接放入 Prompt

两个通道的内容在 Prompt 中以编号标注合并,LLM 统一引用。


第十章 小测验

💡 以下 15 道题覆盖了本教程的核心知识点,答案见 教程_RAG项目实战_答案.md

1. 本项目使用哪个 Python Web 框架?它相比 Flask 的主要优势是什么?

2. docker-compose.ymldepends_on 的作用是什么?它能保证依赖服务完全就绪吗?

3.retrieval.py 中,vector_similarity_weight=0.6 意味着什么?关键词检索权重是多少?

4. RagTokenizertokenize() 方法使用了哪两种匹配算法?当两种算法结果不一致时如何处理?

5. fine_grained_tokenize() 为什么要取排序后的第二优分词方案而非最优方案?

6. DeepDoc PDF 解析的 5 步流水线是什么?每一步的作用分别是什么?

7. _updown_concat_features() 使用什么机器学习模型来判断文本块是否应该合并?为什么不用简单的规则?

8. SSE(Server-Sent Events)和 WebSocket 的区别是什么?本项目为什么选择 SSE?

9.chat.py 的 Prompt 中,引用标注格式 ##编号$$ 的设计目的是什么?

10. 快速解析服务(QuickParseService)和知识库解析的适用场景分别是什么?它们的存储介质有何不同?

11. 前端使用 adapter: 'fetch' 的原因是什么?如果改为默认的 XMLHttpRequest 会怎样?

12. 在 Rerank 阶段,为什么标题的权重是 x2、关键词的权重是 x5、问答对的问题权重是 x6?

13. generate_recommended_questions() 使用 Qwen2.5-7B 而非 DeepSeek-R1 的原因是什么?

14. Dealer.search()FusionExpr("weighted_sum", topk, {"weights": "0.05, 0.95"}) 的 0.05 和 0.95 分别代表什么?

15. 前端使用 Valtio 的 proxy()useSnapshot() 分别起什么作用?为什么在流式输出场景中这比 Redux 更合适?


第十一章 思维导图结构建议

📝 以下结构可在 Obsidian 中使用 Markmap 插件或 XMind 等工具绘制
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
RAG 全栈项目实战
├── 1. 项目架构
│   ├── 前后端分离
│   ├── 技术选型(FastAPI / React / ES / Redis / PG)
│   └── 数据流全链路
├── 2. 基础设施
│   ├── Docker Compose 编排
│   ├── PostgreSQL 数据模型
│   ├── Elasticsearch 配置
│   └── Redis 缓存
├── 3. 后端架构
│   ├── 路由层(chat_rt / user_rt / history_rt)
│   ├── 服务层(chat / retrieval / file_parse)
│   ├── JWT 认证
│   └── SSE 流式响应
├── 4. 文档解析
│   ├── 多格式支持(PDF / DOCX / Excel / TXT / MD)
│   ├── DeepDoc 引擎
│   │   ├── OCR
│   │   ├── 布局识别
│   │   ├── 表格结构识别
│   │   └── XGBoost 上下文拼接
│   ├── Naive 分块策略
│   └── 快速解析(Redis 通道)
├── 5. RAG 检索
│   ├── 混合检索(全文 + 向量)
│   ├── Dealer 类
│   │   ├── search()
│   │   ├── rerank()
│   │   └── rerank_by_model()
│   ├── 两层权重融合
│   └── Rerank 权重加成
├── 6. NLP 处理
│   ├── RagTokenizer
│   │   ├── Trie 树词典
│   │   ├── 双向最大匹配
│   │   ├── DFS 最优分词
│   │   └── 评分函数
│   ├── 细粒度分词
│   └── 英文处理(Stemmer + Lemmatizer)
├── 7. 对话管理
│   ├── Prompt 构造(知识库 + 快速解析合并)
│   ├── LLM 调用(DeepSeek-R1 流式)
│   ├── 引用标注(##编号$$)
│   ├── 推荐问题(Qwen2.5-7B)
│   ├── 会话命名(Qwen2.5-72B)
│   └── 持久化(PostgreSQL)
└── 8. 前端架构
    ├── React + TypeScript + Vite
    ├── Valtio 状态管理
    ├── SSE 流式解析
    ├── 组件结构(ChatMessage / Citations / Source)
    └── API 层(Axios + Fetch adapter)

💡 学习建议
  1. 先跑起来:使用 docker-compose up 启动全部服务,上传一份文档并进行问答
  2. 断点调试:在 retrieve_content()get_chat_completion() 中加日志,观察检索和生成过程
  3. 对比实验:调整 vector_similarity_weight 参数(如 0.3 vs 0.9),观察检索结果的变化
  4. 阅读原理:配合目录中的《RAG综述.pdf》理解 RAG 范式的理论基础