一、项目整体架构
1. 系统架构全景
swxy 是一个典型的 RAG(Retrieval-Augmented Generation)全栈项目,采用前后端分离架构,后端提供 RESTful API,前端通过 HTTP/SSE 与后端通信。系统核心功能是:用户上传文档 -> 文档解析并存入向量数据库 -> 用户提问时检索相关内容 -> 结合 LLM 生成回答。
纯 LLM 受限于训练数据的时效性和领域知识的覆盖面。RAG 通过外挂知识库,让 LLM 在回答时引用真实文档内容,解决了幻觉问题和知识更新问题。
系统组件一览:
| 层级 | 技术选型 | 作用 |
|---|
| 前端 | React + TypeScript + Ant Design + Vite | 用户交互界面 |
| 后端 API | FastAPI + Uvicorn | RESTful API 服务 |
| 关系数据库 | PostgreSQL 15 | 用户、会话、消息持久化 |
| 搜索引擎 | Elasticsearch 8.11 | 全文检索 + 向量检索 |
| 缓存 | Redis 7 | 快速解析文档临时存储 |
| 文档解析 | DeepDoc (OCR + 布局识别) | PDF/DOCX 等多格式解析 |
| NLP | 自研分词器 (RagTokenizer) | 中文分词、关键词提取 |
| LLM | DeepSeek-R1 / Qwen2.5 | 对话生成、推荐问题生成 |
| Embedding | DashScope API | 文本向量化 |
2. 数据流全链路
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
| 用户提问 "国电电力2024年营收如何?"
│
▼
前端 chat/index.tsx ─── POST /chat_on_docs ───► 后端 chat_rt.py
│ │
│ ▼
│ retrieval.py: retrieve_content()
│ │
│ ▼
│ search_v2.py: Dealer.retrieval()
│ ┌─────────┴─────────┐
│ │ │
│ 全文检索 向量检索
│ (Elasticsearch) (Embedding相似度)
│ └─────────┬─────────┘
│ │ 混合排序 + Rerank
│ ▼
│ chat.py: get_chat_completion()
│ 构造 Prompt + 调用 DeepSeek-R1
│ │
│ ▼ SSE 流式响应
◄──────────────── text/event-stream ───────────┘
前端实时渲染回答 + 引文标注
|
二、Docker Compose 基础设施搭建
1. 容器编排概览
Docker Compose 是一种用 YAML 文件定义多容器应用的工具。本项目通过 docker-compose.yml 一键启动所有基础设施服务。
开发环境需要同时运行 PostgreSQL、Elasticsearch、Redis 等多个服务。Docker Compose 消除了手动安装和配置各个服务的复杂性,确保环境一致性。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
| version: '3.8'
services:
swxy_api:
build:
context: ./app
dockerfile: Dockerfile
environment:
- DATABASE_URL=postgresql://postgres:pg123456@gsk_pg:5432/gsk
- ES_HOST=http://es01:9200
- REDIS_HOST=redis
- REDIS_PORT=6379
ports:
- "8000:8000"
command: ["uvicorn", "app_main:app", "--host", "0.0.0.0", "--port", "8000"]
depends_on:
- gsk_pg
- es01
- redis
es01:
image: docker.elastic.co/elasticsearch/elasticsearch:8.11.3
environment:
- discovery.type=single-node
- xpack.security.enabled=true
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
gsk_pg:
image: postgres:15-alpine
environment:
- POSTGRES_PASSWORD=pg123456
- POSTGRES_USER=postgres
- POSTGRES_DB=gsk
volumes:
- ./init.sql:/docker-entrypoint-initdb.d/init.sql
redis:
image: redis:7-alpine
|
关键设计决策:
depends_on:确保 API 服务在数据库和搜索引擎之后启动volumes:init.sql 挂载到 PostgreSQL 的初始化目录,容器首次启动时自动执行建表语句- 网络:所有服务共享
gsk_network 桥接网络,容器间通过服务名(如 gsk_pg、es01、redis)互相访问 - 单节点 ES:
discovery.type=single-node 适合开发环境,生产环境应配置集群
2. 数据库初始化
init.sql 定义了系统的关系数据模型,包含用户表、会话表、消息表和知识库表。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
| -- 用户表
CREATE TABLE IF NOT EXISTS users (
id SERIAL PRIMARY KEY,
username VARCHAR(50) UNIQUE NOT NULL,
password_hash VARCHAR(100) NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);
-- 会话表
CREATE TABLE IF NOT EXISTS sessions (
session_id VARCHAR(16) PRIMARY KEY,
session_name VARCHAR(255) NOT NULL,
user_id VARCHAR(255) NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);
-- 消息表
CREATE TABLE IF NOT EXISTS messages (
message_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
session_id VARCHAR(16) NOT NULL,
user_question TEXT NOT NULL,
model_answer TEXT NOT NULL,
documents TEXT, -- 检索到的文档引用(JSON字符串)
recommended_questions TEXT, -- 推荐问题(JSON字符串)
think TEXT, -- 模型思考过程
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);
-- 知识库表
CREATE TABLE IF NOT EXISTS knowledgebases (
id SERIAL PRIMARY KEY,
user_id VARCHAR(255) NOT NULL,
file_name VARCHAR(255) NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);
|
messages 表的 documents 字段存储的是 JSON 字符串而非 jsonb 类型——注释中写了"修改为 jsonb 类型"但实际是 TEXT,这在生产环境中应改为 jsonb 以支持 JSON 查询。session_id 使用 16 位随机字符串(uuid4().replace("-","")[:16]),而非完整 UUID,这是为了缩短 URL。- 使用
gen_random_uuid() 生成消息 ID,需要 pgcrypto 扩展。
三、FastAPI 后端架构
1. 应用入口与路由注册
FastAPI 是一个高性能的 Python Web 框架,基于 Starlette 和 Pydantic,原生支持异步和 OpenAPI 文档。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from router import chat_rt, user_rt, history_rt
app = FastAPI(root_path=os.getenv("ROOT_PATH", "http://localhost:8000"))
# CORS 中间件——允许前端跨域访问
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# 路由注册——三个模块
app.include_router(chat_rt.router) # 对话相关
app.include_router(user_rt.router) # 用户认证
app.include_router(history_rt.router) # 历史记录
|
架构分层:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
| router/ ← 路由层:接收请求、参数校验、鉴权
├── chat_rt.py
├── user_rt.py
└── history_rt.py
service/ ← 服务层:业务逻辑
├── core/
│ ├── chat.py ← 对话生成
│ ├── retrieval.py ← 检索入口
│ ├── file_parse.py ← 文档解析+入库
│ ├── rag/ ← RAG 核心
│ │ ├── nlp/ ← 分词、搜索
│ │ └── app/ ← 分块策略
│ └── deepdoc/ ← 文档解析引擎
└── quick_parse_service.py ← 快速解析服务
models/ ← 数据模型
schemas/ ← 请求/响应 Schema
utils/ ← 工具函数(数据库连接、日志等)
|
2. 路由层详解——chat_rt.py
路由层只负责 HTTP 协议相关的逻辑(参数提取、鉴权、响应格式),业务逻辑全部下沉到 service 层。这样可以让业务逻辑独立于 HTTP 框架进行测试和复用。
核心路由一览:
| 路由 | 方法 | 功能 |
|---|
/create_session | POST | 创建新对话会话 |
/chat_on_docs | POST | 基于知识库的问答(核心) |
/quick_parse | POST | 快速解析小文档 |
/upload_files | POST | 上传文档到知识库 |
/get_parsed_content | GET | 获取已解析内容 |
/chat_on_docs 路由源码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
| @router.post("/chat_on_docs")
async def chat_on_docs(
session_id: str = Query(...),
request: ChatRequest = Body(...),
credentials: JwtAuthorizationCredentials = Security(access_security),
):
user_id = str(credentials.subject.get("user_id"))
question = request.message
# 1. 从知识库检索相关内容
references = []
try:
references = retrieve_content(user_id, question)
except Exception as e:
references = [] # 没有知识库也不报错
# 2. 返回 SSE 流式响应
return StreamingResponse(
get_chat_completion(session_id, question, references, user_id),
media_type="text/event-stream"
)
|
- 使用
StreamingResponse 返回 Server-Sent Events(SSE),实现实时流式输出 - JWT 鉴权通过
Security(access_security) 注入,每个请求都会校验 token - 检索失败时不阻断流程,而是传空列表给 LLM,这样即使没有知识库也能正常对话
3. 用户认证——JWT
JWT(JSON Web Token)是一种无状态的身份认证机制。用户登录后获得 token,后续请求携带该 token 即可证明身份。
本项目使用 fastapi_jwt 库实现 JWT 鉴权。路由函数通过依赖注入获取用户身份:
1
2
3
4
5
6
7
8
9
| from fastapi_jwt import JwtAuthorizationCredentials
from service.auth import access_security
@router.post("/create_session")
async def create_session(
credentials: JwtAuthorizationCredentials = Security(access_security),
):
user_id = credentials.subject.get("user_id")
# user_id 从 JWT token 的 payload 中提取
|
JWT 的 subject 字段包含 user_id,这个 ID 同时作为 Elasticsearch 的索引名称,实现了用户级别的数据隔离——每个用户的文档存储在独立的 ES 索引中。
第四章 文档解析流程
4.1 文档上传与解析入口
文档解析是 RAG 系统的第一步,将 PDF、DOCX、Excel 等非结构化文档转换为可检索的文本块(chunks)。
LLM 无法直接理解二进制文件格式。必须先将文档内容提取为纯文本,再进行分块、向量化后才能用于检索。
1
2
3
4
5
6
7
8
9
10
11
| def execute_insert_process(file_path: str, file_name: str, index_name: str):
"""文档处理和插入 Elasticsearch 的完整流程"""
# 第一步:解析文档 -> 得到文本块列表
documents = parse(file_path)
# 第二步:批量处理——生成向量、构建元数据
processed_documents = process_items(documents, file_name, index_name)
# 第三步:批量插入 Elasticsearch
es_connection = ESConnection()
es_connection.insert(documents=processed_documents, indexName=index_name)
|
每个 chunk 的数据结构:
1
2
3
4
5
6
7
8
9
10
11
| d = {
"id": chunck_id, # xxhash 生成的唯一ID
"content_ltks": item["content_ltks"], # 分词后的内容(用于全文检索)
"content_with_weight": item["content_with_weight"], # 原始内容
"content_sm_ltks": item["content_sm_ltks"], # 细粒度分词
"kb_id": index_name, # 知识库ID
"docnm_kwd": item["docnm_kwd"], # 文档名称
"title_tks": item["title_tks"], # 标题分词
"doc_id": xxhash.xxh64(file_name), # 文档ID
"q_1024_vec": embedding, # 1024维向量
}
|
4.2 多格式解析——Naive 分块策略
naive.py 中的 chunk() 函数是文档分块的核心,支持 PDF、DOCX、Excel、TXT、Markdown、HTML、JSON 等多种格式。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
| def chunk(filename, binary=None, from_page=0, to_page=100000,
lang="Chinese", callback=None, **kwargs):
"""
Naive 分块策略:
1. 按分隔符切割连续文本
2. 将相邻片段合并,直到 token 数达到上限
"""
parser_config = kwargs.get("parser_config", {
"chunk_token_num": 128,
"delimiter": "\n!?。;!?",
"layout_recognize": "DeepDOC"
})
# 根据文件扩展名选择解析器
if re.search(r"\.pdf$", filename, re.IGNORECASE):
pdf_parser = Pdf()
sections, tables = pdf_parser(filename, from_page=from_page,
to_page=to_page, callback=callback)
elif re.search(r"\.docx$", filename, re.IGNORECASE):
sections, tables = Docx()(filename, binary)
elif re.search(r"\.xlsx?$", filename, re.IGNORECASE):
excel_parser = ExcelParser()
sections = [(_, "") for _ in excel_parser(binary) if _]
# ... 更多格式
# 合并分块
chunks = naive_merge(sections,
int(parser_config.get("chunk_token_num", 128)),
parser_config.get("delimiter", "\n!?。;!?"))
# 对分块进行分词处理
res.extend(tokenize_chunks(chunks, doc, is_english, pdf_parser))
return res
|
chunk_token_num=128:每个块最多 128 个 token。太大会引入噪音,太小会丢失上下文。delimiter="\n!?。;!?":以句号、问号、感叹号等作为切分点,保证语义完整性。
4.3 DeepDoc PDF 解析引擎
DeepDoc 是项目中最复杂的文档解析模块,专门处理 PDF 文件。它集成了 OCR(光学字符识别)、布局识别、表格结构识别三大 AI 能力。
PDF 不是简单的文本格式,它包含:扫描图片(需要 OCR)、复杂表格(需要结构识别)、多栏布局(需要版面分析)。直接用 pdfplumber 提取文本会丢失结构信息。
1
2
3
4
5
6
| class RAGFlowPdfParser:
def __init__(self):
self.ocr = OCR() # OCR 引擎
self.layouter = LayoutRecognizer("layout") # 布局识别模型
self.tbl_det = TableStructureRecognizer() # 表格结构识别模型
self.updown_cnt_mdl = xgb.Booster() # XGBoost 上下文拼接模型
|
PDF 解析的 5 步流水线(对应 Pdf.__call__ 方法):
1
2
3
4
5
| Step 1: __images__() ── OCR 识别,将每页转为图像并提取文字
Step 2: _layouts_rec() ── 布局分析,识别标题/正文/表格/图片区域
Step 3: _table_transformer_job() ── 表格结构分析,识别行列关系
Step 4: _text_merge() ── 水平文本合并,将同一行的文字拼接
Step 5: _concat_downward() ── 垂直文本拼接,使用 XGBoost 模型判断是否应该合并
|
上下文拼接的特征工程(_updown_concat_features 方法):
这是一个经典的 ML 特征工程案例。XGBoost 模型需要判断相邻两个文本块是否应该合并,使用了 31 个特征:
1
2
3
4
5
6
7
8
9
10
| fea = [
up.get("R", -1) == down.get("R", -1), # 是否在同一表格行
y_dis / h, # 垂直距离与行高的比值
down["page_number"] - up["page_number"], # 跨页情况
up["layout_type"] == down["layout_type"], # 布局类型是否一致
re.search(r"([。?!;]|[a-z]\.)$", up["text"]), # 上文是否以句号结尾
self._match_proj(down), # 下文是否是项目符号开头
len(tks_all) - len(tks_up) - len(tks_down), # 合并后分词数变化
# ... 共 31 个特征
]
|
DeepDoc 解析一个 PDF 需要加载 OCR、布局识别、表格识别三个深度学习模型,内存占用较大。项目中模型文件存储在 rag/res/deepdoc/ 目录,首次运行时会从 HuggingFace 下载。
4.4 快速解析服务
QuickParseService 是一个轻量级的文档解析服务,用于处理小文档(PDF 不超过 4 页,DOCX/TXT 不超过 4000 字符),解析结果存入 Redis 而非 Elasticsearch。
不是所有场景都需要完整的 RAG 检索。当用户上传一份简短文档时,直接将全文放入 LLM 的 prompt 中即可,无需向量化和检索。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| class QuickParseService:
def __init__(self):
self.redis_client = redis.Redis(host=self.redis_host, port=self.redis_port)
self.max_pages = 4 # PDF 页数上限
self.max_characters = 4000 # 文本字符上限
self.redis_expire_seconds = 7200 # 2小时过期
def quick_parse_document(self, session_id, filename, file_content):
file_extension = self.validate_file_format(filename)
# 检查会话唯一性
if self.check_session_exists(session_id):
raise HTTPException(400, "该会话已有文档")
# 根据类型解析
content, count = self.parse_document(file_content, file_extension)
# 存入 Redis,key=session_id,value=文档全文
self.store_to_redis(session_id, content)
|
快速解析 vs 知识库解析的对比:
| 维度 | 快速解析 | 知识库解析 |
|---|
| 存储 | Redis(2小时过期) | Elasticsearch(永久) |
| 文档大小 | 小(<4页/<4000字) | 不限 |
| 检索方式 | 全文放入 Prompt | 向量+关键词混合检索 |
| 适用场景 | 临时文档问答 | 长期知识库 |
第五章 RAG 核心检索流程
5.1 检索入口
retrieval.py 是检索的入口文件,它初始化 ES 连接和 Dealer 对象,提供 retrieve_content() 函数供路由层调用。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
| from service.core.rag.nlp.search_v2 import Dealer
from service.core.rag.utils.es_conn import ESConnection
es_connection = ESConnection()
dealer = Dealer(dataStore=es_connection)
def retrieve_content(indexNames: str, question: str):
results = dealer.retrieval(
question=question,
embd_mdl=None,
tenant_ids=indexNames,
kb_ids=None,
vector_similarity_weight=0.6, # 向量权重 60%
page=1,
page_size=5 # 返回 Top-5
)
# 提取并格式化结果
extracted_data = []
for i, chunk in enumerate(results['chunks'], start=1):
message = {
"id": i,
"document_id": chunk.get('doc_id'),
"document_name": chunk.get('docnm_kwd').split("/")[-1],
"content_with_weight": chunk.get('content_with_weight'),
}
extracted_data.append(message)
return extracted_data
|
vector_similarity_weight=0.6 表示在混合检索中向量相似度占 60%,关键词相似度占 40%。这个比例决定了系统是更倾向于语义匹配(向量)还是精确匹配(关键词)。
5.2 混合检索——Dealer.retrieval()
Dealer 是搜索引擎的核心类,实现了混合检索(Hybrid Search)——同时使用关键词全文检索和向量语义检索,然后通过加权融合和重排序得到最终结果。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
| def retrieval(self, question, embd_mdl, tenant_ids, kb_ids,
page, page_size, similarity_threshold=0.1,
vector_similarity_weight=0.3, top=1024, ...):
# 第一步:构建搜索请求
req = {
"question": question,
"size": max(page_size * 3, 128), # 先检索大量候选
"vector": True,
"topk": top,
"similarity": similarity_threshold,
}
# 第二步:执行搜索(全文 + 向量混合)
sres = self.search(req, [index_name(tid) for tid in tenant_ids], ...)
# 第三步:重排序(使用 Rerank 模型)
if sres.total > 0:
sim, tsim, vsim = self.rerank_by_model(
rerank_mdl, sres, question,
1 - vector_similarity_weight, # 关键词权重
vector_similarity_weight # 向量权重
)
# 第四步:按相似度降序排列,取 Top-K
idx = np.argsort(sim * -1)[(page-1)*page_size : page*page_size]
# 第五步:构造返回结果
for i in idx:
if sim[i] < similarity_threshold:
break
ranks["chunks"].append({
"content_with_weight": chunk["content_with_weight"],
"similarity": sim[i],
"vector_similarity": vsim[i],
"term_similarity": tsim[i],
# ...
})
return ranks
|
5.3 搜索方法——Dealer.search()
search() 方法是检索的核心,它同时发起全文查询和向量查询:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
| def search(self, req, idx_names, kb_ids, emb_mdl=None, ...):
qst = req.get("question", "")
# 1. 全文检索:对问题进行分词,构建匹配表达式
matchText, keywords = self.qryr.question(qst, min_match=0.3)
# 2. 向量检索:将问题转为向量
matchDense = self.get_vector(qst, emb_mdl, topk, ...)
# 3. 加权融合:全文 5% + 向量 95%
fusionExpr = FusionExpr("weighted_sum", topk, {"weights": "0.05, 0.95"})
# 4. 发送到 Elasticsearch 执行
res = self.dataStore.search(src, highlightFields, filters,
[matchText, matchDense, fusionExpr], ...)
# 如果结果为空,降低匹配阈值重试
if total == 0:
matchText, _ = self.qryr.question(qst, min_match=0.1)
matchDense.extra_options["similarity"] = 0.17
res = self.dataStore.search(...)
return SearchResult(total=total, ids=ids, query_vector=q_vec, ...)
|
注意这里有两层权重:
- ES 层融合(
FusionExpr):全文 0.05 + 向量 0.95 —— 这是初步检索阶段 - Rerank 层融合(
rerank_by_model):根据 vector_similarity_weight 参数调整 —— 这是精排阶段
初步检索偏重向量是为了尽可能多地召回语义相关的候选;精排阶段再通过关键词相似度进行精确校准。
5.4 重排序机制
Rerank(重排序)是在初步检索后,对候选结果进行更精确的相关性评估。本项目提供两种重排序方式。
方式一:基于混合相似度的 Rerank
1
2
3
4
5
6
7
8
9
10
| def rerank(self, sres, query, tkweight=0.3, vtweight=0.7, ...):
# 提取每个 chunk 的分词(标题权重x2,重要词权重x5,问题权重x6)
for i in sres.ids:
tks = content_ltks + title_tks * 2 + important_kwd * 5 + question_tks * 6
# 混合相似度 = 关键词相似度 * tkweight + 向量相似度 * vtweight
sim, tksim, vtsim = self.qryr.hybrid_similarity(
sres.query_vector, ins_embd, keywords, ins_tw, tkweight, vtweight
)
return sim + rank_fea, tksim, vtsim
|
方式二:基于 Rerank 模型的精排
1
2
3
4
| def rerank_by_model(self, rerank_mdl, sres, query, tkweight=0.3, vtweight=0.7, ...):
tksim = self.qryr.token_similarity(keywords, ins_tw)
vtsim, _ = rerank_similarity(query, [rmSpace(" ".join(tks)) for tks in ins_tw])
return tkweight * (np.array(tksim) + rank_fea) + vtweight * vtsim, tksim, vtsim
|
1
| tks = content_ltks + title_tks * 2 + important_kwd * 5 + question_tks * 6
|
这行代码体现了领域知识:标题比正文重要(x2),关键词比普通词重要(x5),如果 chunk 本身就是一个问答对的问题部分,则最重要(x6)。
第六章 NLP 处理
6.1 中文分词器——RagTokenizer
RagTokenizer 是项目自研的中文分词器,基于 Trie 树(前缀树)和双向最大匹配算法,专门为 RAG 场景优化。
jieba 是通用分词器,而 RAG 场景需要:
- 更细粒度的分词(
fine_grained_tokenize)用于检索召回 - 词频和词性标注用于权重计算
- 英文词干化(Porter Stemmer)和词形还原(WordNet Lemmatizer)
- 全角转半角、繁体转简体等预处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
| class RagTokenizer:
def __init__(self):
self.DIR_ = os.path.join(get_project_base_directory(), "rag/res", "huqie")
self.stemmer = PorterStemmer() # 英文词干提取
self.lemmatizer = WordNetLemmatizer() # 英文词形还原
# 加载 Trie 树词典
self.trie_ = datrie.Trie.load(trie_file_name)
def tokenize(self, line):
line = self._strQ2B(line).lower() # 全角转半角 + 小写
line = self._tradi2simp(line) # 繁体转简体
# 对中文部分:双向最大匹配 + DFS 求最优分词
tks, s = self.maxForward_(L) # 正向最大匹配
tks1, s1 = self.maxBackward_(L) # 逆向最大匹配
# 不一致的部分用 DFS 深度搜索最优解
self.dfs_(chars, 0, [], tkslist)
# 对英文部分:词干化 + 词形还原
res = self.english_normalize_(res)
return self.merge_(res)
|
评分函数(选择最优分词方案):
1
2
3
4
5
6
7
8
| def score_(self, tfts):
B = 30
F, L = 0, 0
for tk, (freq, tag) in tfts:
F += freq # 词频得分
L += 0 if len(tk) < 2 else 1 # 长词奖励
L /= len(tks)
return tks, B / len(tks) + L + F # 综合评分 = 分词数惩罚 + 长词奖励 + 词频
|
B / len(tks):分词数越少得分越高(鼓励长词匹配)L:长词(≥2字符)的比例越高越好F:词频越高的分词方案越好
6.2 细粒度分词
1
2
3
4
5
6
7
8
9
10
11
| def fine_grained_tokenize(self, tks):
"""将粗粒度分词进一步细分,用于提高检索召回率"""
for tk in tks:
if len(tk) < 3:
res.append(tk) # 短词不再细分
continue
# 用 DFS 找到次优分词方案
tkslist = []
self.dfs_(tk, 0, [], tkslist)
stk = self.sortTks_(tkslist)[1][0] # 注意取 [1] 即第二优方案
res.append(" ".join(stk))
|
第一优方案(最优分词)已经用于主分词字段 content_ltks。细粒度分词取第二优方案,可以生成不同的词组合,存入 content_sm_ltks 字段,增加检索时的命中机会。
第七章 对话管理
7.1 对话服务——chat.py
chat.py 是对话生成的核心模块,负责:构造 Prompt、调用 LLM、流式输出、生成推荐问题、持久化对话记录。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
| def get_chat_completion(session_id, question, retrieved_content, user_id):
# 第一步:获取快速解析的文档内容(来自 Redis)
quick_parse_content = get_quick_parse_content(session_id)
# 第二步:构建参考内容
reference_parts = []
if retrieved_content:
for ref in retrieved_content:
reference_parts.append(f"[{id}] {ref['content_with_weight']}")
if quick_parse_content:
truncated_content = quick_parse_content[:4000]
reference_parts.append(f"**当前会话文档内容:**\n[{id}] {truncated_content}")
# 第三步:构造 Prompt
prompt = f"""
你是一个专业的智能助手...
**参考内容:**
{formatted_references}
**用户问题:**
{question}
"""
# 第四步:调用 DeepSeek-R1 流式生成
client = OpenAI(api_key=..., base_url=...)
completion = client.chat.completions.create(
model="deepseek-r1",
messages=[{"role": "user", "content": prompt}],
stream=True,
)
# 第五步:先发送文档引用信息
yield f"event: message\ndata: {json.dumps({'documents': all_documents})}\n\n"
# 第六步:流式输出内容
for chunk in completion:
if chunk.choices[0].finish_reason == "stop":
# 生成推荐问题
recommended_questions = generate_recommended_questions(question, ...)
yield f"event: message\ndata: {json.dumps({'recommended_questions': ...})}\n\n"
yield "event: end\ndata: [DONE]\n\n"
# 持久化到数据库
write_chat_to_db(session_id, question, model_answer, ...)
update_session_name(session_id, question, user_id)
else:
delta = chunk.choices[0].delta
if delta.content:
# 正常回答内容
yield f"event: message\ndata: {json.dumps({'content': delta.content, 'thinking': False})}\n\n"
else:
# DeepSeek-R1 的思考过程
yield f"event: message\ndata: {json.dumps({'content': delta.reasoning_content, 'thinking': True})}\n\n"
|
7.2 SSE 协议格式
Server-Sent Events(SSE)是一种服务器向客户端推送数据的协议,基于 HTTP,比 WebSocket 更轻量。
本项目 SSE 消息格式:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
| # 第一条:发送文档引用
event: message
data: {"documents": [{"document_id": "...", "document_name": "...", "content_with_weight": "..."}]}
# 中间多条:流式输出内容
event: message
data: {"role": "assistant", "content": "国电电力", "thinking": false}
event: message
data: {"role": "assistant", "content": "的思考过程...", "thinking": true}
# 推荐问题
event: message
data: {"recommended_questions": ["问题1", "问题2", "问题3"]}
# 结束标志
event: end
data: [DONE]
|
7.3 引用标注机制
系统在 Prompt 中要求 LLM 使用 ##编号$$ 格式标注引用来源,前端解析后可以高亮显示引用的文档段落。
Prompt 中的关键指令:
1
2
| 每一块内容都必须标注引用的来源,格式为:##引用编号$$。
例如:##1$$ 表示引用自第1条参考内容。
|
7.4 推荐问题生成
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
| def generate_recommended_questions(user_question, retrieved_content=None, session_id=None):
"""使用 Qwen2.5-7B 生成 3 个推荐问题"""
prompt = f"""
用户问题:{user_question}
当前对话基于这些文档:{', '.join(document_topics)}
生成3个相关的推荐问题...
输出格式:{{"recommended_questions": ["问题1", "问题2", "问题3"]}}
"""
client = OpenAI(api_key=..., base_url=...)
completion = client.chat.completions.create(
model="qwen2.5-7b-instruct",
response_format={"type": "json_object"},
stream=False,
timeout=30,
)
|
- 主对话用 DeepSeek-R1(强推理能力,支持思维链)
- 推荐问题用 Qwen2.5-7B(轻量快速,JSON 输出稳定)
- 会话命名用 Qwen2.5-72B(语言理解好,生成简洁标题)
7.5 会话管理
1
2
3
4
5
6
7
8
9
10
11
12
13
| def update_session_name(session_id: str, question: str, user_id: str):
"""首次对话时,根据用户问题自动生成会话名称"""
query_result = db.execute(
text("SELECT session_name FROM sessions WHERE session_id = :session_id"),
{"session_id": session_id}
).fetchone()
if not query_result: # 首次对话
session_name = generate_session_name(question)
db.execute(text("""
INSERT INTO sessions (session_id, user_id, session_name)
VALUES (:session_id, :user_id, :session_name)
"""), {...})
|
第八章 前端 React 架构
8.1 技术栈概览
前端采用 React + TypeScript + Vite 构建,使用 Ant Design 作为 UI 组件库,Valtio 作为状态管理方案。
| 技术 | 作用 |
|---|
| React 18 | UI 框架 |
| TypeScript | 类型安全 |
| Vite | 构建工具 |
| Ant Design | UI 组件库 |
| Valtio (proxy) | 响应式状态管理 |
| ahooks | React Hooks 工具库 |
| Axios | HTTP 请求 |
8.2 聊天页面组件结构
1
2
3
4
5
6
7
8
9
10
11
| ComPageLayout(页面布局)
├── sender(底部输入区域)
│ ├── Source(文档来源展示)
│ └── ComSender(消息输入框)
├── right(右侧面板)
│ ├── ChatDrawer > Citations(引文面板)
│ └── ChatDrawer > Contracts(文档面板)
└── children(主内容区)
├── Header(对话标题)
├── ChatMessage(消息列表)
└── Drawer(文档详情抽屉)
|
8.3 状态管理——Valtio
1
2
3
4
5
6
| const [chat] = useState(() => {
return proxy({
list: [] as API.ChatItem[],
})
})
const { list } = useSnapshot(chat) as { list: API.ChatItem[] }
|
Valtio 是一个基于 Proxy 的状态管理库。proxy() 创建可变的响应式对象,useSnapshot() 创建不可变的快照用于渲染。直接修改 chat.list(如 push、赋值属性)会自动触发组件重渲染。
在流式输出场景中,每收到一个 token 就需要更新状态。Valtio 允许直接修改对象属性(target.content += delta.content),比 Redux 的 action/reducer 模式更简洁高效。
8.4 SSE 流式数据处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
| const sendChat = useCallback(async (target: API.ChatItem, message: string) => {
// 发起 SSE 请求
const res = await api.session.chat({ id: id!, message })
const reader = res.data.getReader()
async function read(reader: ReadableStreamDefaultReader<any>) {
let temp = ''
const decoder = new TextDecoder('utf-8')
while (true) {
const { value, done } = await reader.read()
temp += decoder.decode(value)
// 按行解析 SSE 数据
while (true) {
const index = temp.indexOf('\n')
if (index === -1) break
const slice = temp.slice(0, index)
temp = temp.slice(index + 1)
if (slice.startsWith('data: ')) {
parseData(slice) // 解析 data 行
scrollToBottom() // 自动滚动
}
}
if (done) break
}
}
function parseData(slice: string) {
const str = slice.trim().replace(/^data\: /, '').trim()
if (str === '[DONE]') return
const json = JSON.parse(str)
// 区分思考内容和回答内容
if (json?.content) {
if (json.thinking) {
target.think = `${target.think || ''}${json.content || ''}`
} else {
target.content = `${target.content || ''}${json.content || ''}`
}
}
// 处理文档引用
if (json?.documents?.length) {
target.reference = json.documents
// 去重,构建文档列表
const map = new Map<string, API.Document>()
json.documents.forEach((chunk: API.Reference) => {
map.set(chunk.document_id, { ... })
})
target.documents = Array.from(map.values())
}
// 处理推荐问题
if (json?.recommended_questions?.length) {
target.recommended_questions = json.recommended_questions
}
}
}, [chat])
|
8.5 API 层——session.ts
session.ts 封装了所有与后端对话相关的 API 调用,使用 Axios 作为 HTTP 客户端。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
| // 创建会话
export function create(params?: {}) {
return request.post<API.Result<{ session_id: string }>>('/create_session', params)
}
// 发起对话(SSE 流式)
export function chat(params: { id: string; message: string }) {
return request.post<ReadableStream>('/chat_on_docs', { message: params.message }, {
headers: { Accept: 'text/event-stream' },
responseType: 'stream',
adapter: 'fetch', // 使用 fetch adapter 以支持 ReadableStream
params: { session_id: params.id },
})
}
// 快速解析文档
export function quickParse(params: { session_id: string; file: File }) {
const formData = new FormData()
formData.append('file', params.file)
return request.post('/quick_parse', formData, {
params: { session_id: params.session_id },
})
}
|
adapter: 'fetch' 告诉 Axios 使用浏览器原生 Fetch API 而非 XMLHttpRequest,因为只有 Fetch API 支持 ReadableStream,这是实现前端 SSE 流式读取的必要条件。
第九章 系统集成与完整链路
9.1 文档入库完整链路
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| 用户上传文件
│
▼ POST /upload_files
chat_rt.py: upload_files()
│
├── 保存文件到本地 storage/file/{session_id}/
│
├── file_parse.py: execute_insert_process()
│ │
│ ├── naive.py: chunk() ← 文档解析 + 分块
│ │ ├── pdf_parser.py ← PDF: OCR + 布局识别
│ │ ├── Docx() ← DOCX 解析
│ │ └── ExcelParser() ← Excel 解析
│ │
│ ├── rag_tokenizer.tokenize() ← 对每个 chunk 分词
│ ├── generate_embedding() ← 生成向量 (1024维)
│ └── ESConnection.insert() ← 写入 Elasticsearch
│
└── insert_knowledgebase() ← 记录到 PostgreSQL
|
9.2 问答检索完整链路
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
| 用户提问
│
▼ POST /chat_on_docs
chat_rt.py
│
├── retrieve_content(user_id, question)
│ │
│ ▼ Dealer.retrieval()
│ ├── FulltextQueryer.question() ← 问题分词,构建查询
│ ├── generate_embedding(question) ← 问题向量化
│ ├── ES search (全文 + 向量) ← 混合检索
│ ├── rerank_by_model() ← Rerank 精排
│ └── 返回 Top-5 chunks
│
└── get_chat_completion(session_id, question, references, user_id)
│
├── get_quick_parse_content() ← 检查 Redis 快速解析内容
├── 构造 Prompt (参考内容 + 用户问题)
├── DeepSeek-R1 流式生成 ← LLM 推理
├── SSE 流式输出 ← 实时返回前端
├── generate_recommended_questions() ← Qwen2.5-7B 生成推荐问题
├── write_chat_to_db() ← 持久化到 PostgreSQL
└── update_session_name() ← 首次对话自动命名
|
9.3 双通道文档接入
系统同时支持两种文档接入方式,在 get_chat_completion 中合并:
- 知识库通道:文档 -> 解析 -> 分块 -> 向量化 -> ES 存储 -> 检索
- 快速解析通道:小文档 -> 全文 -> Redis 缓存 -> 直接放入 Prompt
两个通道的内容在 Prompt 中以编号标注合并,LLM 统一引用。
第十章 小测验
1. 本项目使用哪个 Python Web 框架?它相比 Flask 的主要优势是什么?
2. docker-compose.yml 中 depends_on 的作用是什么?它能保证依赖服务完全就绪吗?
3. 在 retrieval.py 中,vector_similarity_weight=0.6 意味着什么?关键词检索权重是多少?
4. RagTokenizer 的 tokenize() 方法使用了哪两种匹配算法?当两种算法结果不一致时如何处理?
5. fine_grained_tokenize() 为什么要取排序后的第二优分词方案而非最优方案?
6. DeepDoc PDF 解析的 5 步流水线是什么?每一步的作用分别是什么?
7. _updown_concat_features() 使用什么机器学习模型来判断文本块是否应该合并?为什么不用简单的规则?
8. SSE(Server-Sent Events)和 WebSocket 的区别是什么?本项目为什么选择 SSE?
9. 在 chat.py 的 Prompt 中,引用标注格式 ##编号$$ 的设计目的是什么?
10. 快速解析服务(QuickParseService)和知识库解析的适用场景分别是什么?它们的存储介质有何不同?
11. 前端使用 adapter: 'fetch' 的原因是什么?如果改为默认的 XMLHttpRequest 会怎样?
12. 在 Rerank 阶段,为什么标题的权重是 x2、关键词的权重是 x5、问答对的问题权重是 x6?
13. generate_recommended_questions() 使用 Qwen2.5-7B 而非 DeepSeek-R1 的原因是什么?
14. Dealer.search() 中 FusionExpr("weighted_sum", topk, {"weights": "0.05, 0.95"}) 的 0.05 和 0.95 分别代表什么?
15. 前端使用 Valtio 的 proxy() 和 useSnapshot() 分别起什么作用?为什么在流式输出场景中这比 Redux 更合适?
第十一章 思维导图结构建议
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
| RAG 全栈项目实战
├── 1. 项目架构
│ ├── 前后端分离
│ ├── 技术选型(FastAPI / React / ES / Redis / PG)
│ └── 数据流全链路
├── 2. 基础设施
│ ├── Docker Compose 编排
│ ├── PostgreSQL 数据模型
│ ├── Elasticsearch 配置
│ └── Redis 缓存
├── 3. 后端架构
│ ├── 路由层(chat_rt / user_rt / history_rt)
│ ├── 服务层(chat / retrieval / file_parse)
│ ├── JWT 认证
│ └── SSE 流式响应
├── 4. 文档解析
│ ├── 多格式支持(PDF / DOCX / Excel / TXT / MD)
│ ├── DeepDoc 引擎
│ │ ├── OCR
│ │ ├── 布局识别
│ │ ├── 表格结构识别
│ │ └── XGBoost 上下文拼接
│ ├── Naive 分块策略
│ └── 快速解析(Redis 通道)
├── 5. RAG 检索
│ ├── 混合检索(全文 + 向量)
│ ├── Dealer 类
│ │ ├── search()
│ │ ├── rerank()
│ │ └── rerank_by_model()
│ ├── 两层权重融合
│ └── Rerank 权重加成
├── 6. NLP 处理
│ ├── RagTokenizer
│ │ ├── Trie 树词典
│ │ ├── 双向最大匹配
│ │ ├── DFS 最优分词
│ │ └── 评分函数
│ ├── 细粒度分词
│ └── 英文处理(Stemmer + Lemmatizer)
├── 7. 对话管理
│ ├── Prompt 构造(知识库 + 快速解析合并)
│ ├── LLM 调用(DeepSeek-R1 流式)
│ ├── 引用标注(##编号$$)
│ ├── 推荐问题(Qwen2.5-7B)
│ ├── 会话命名(Qwen2.5-72B)
│ └── 持久化(PostgreSQL)
└── 8. 前端架构
├── React + TypeScript + Vite
├── Valtio 状态管理
├── SSE 流式解析
├── 组件结构(ChatMessage / Citations / Source)
└── API 层(Axios + Fetch adapter)
|
- 先跑起来:使用
docker-compose up 启动全部服务,上传一份文档并进行问答 - 断点调试:在
retrieve_content() 和 get_chat_completion() 中加日志,观察检索和生成过程 - 对比实验:调整
vector_similarity_weight 参数(如 0.3 vs 0.9),观察检索结果的变化 - 阅读原理:配合目录中的《RAG综述.pdf》理解 RAG 范式的理论基础