aiWithDoc中间层部分是采用Python语言实现,为什么需要弄这么一个中间层呢?因为和AI相关的组件及第三方库基本都是python或者NodeJS提供的,当然也能用java写这些逻辑,但是代价会非常大。本python代码包括文档的切片、向量化、向量数据库存储、调用OpenAI等等部分。用到的第三方库包括Flask,LangChain,ChromaDB等等
本文档的作用只是起到抛砖引玉的作用,供广大爱好者或者相关行业工作者学习或借鉴。
D:\sosWorkspaceGit\aiwithdoc-middle
├── app.py #程序主入口,FastAPI启动web server
├── chainApi.py #与文档对话的接口
├── db #向量数据库文件
| └── index
├── debug.log #debug日志
├── model.py #模型
├── qiqiqichain #封装调用LongChain的组件库
| ├── chunkers #各种类型文档的切分方式
| ├── config #组件的配置
| ├── data_formatter #组件封装的入口
| ├── loaders #各种文档的loader
| ├── qiqiqichain.py #组件的主入口
| ├── utils.py #util组件
| ├── vectordb #向量数据库
| ├── version.py #版本信息
| └── __init__.py
├── README.md
├── requirements.txt #允许此中间层需要安装的python包
└── uploads #各种文档的原始文件存放文件夹
这些时间,随着ChatGPT的爆火,各种基于大语言模型的应用如何雨后春笋般出现,同时也诞生了一些非常优秀的组件,这其中就包括LangChain以及LLamaIndex等等组件。通过LangChain可以让很多调用LLM的应用更简单且更规范,LangChain功能非常强大,建议各位爱好者抽时间学习学习。本项目只是用到了LangChain的很小一部分功能,同时基于网上的各种开源代码,封装了qiqiqichain这个LongChain调用的组件。
未来,企起期团队还会不断完善qiqiqichain这个组件,让后续关于AI大模型的使用更简便。本项目中,封装了对pdf,word,txt,md文件的处理,不同文档格式需要调用不同的python第三方包。说简单点,LangChain只是把这些调用封装了,而我们又把LangChain的一些功能封装了。对于文档的切片也是调用各自第三方提供的相关转文本的组件进行切片。
以下通过文档上传和文档对话的代码进行大致讲解。
由于中间层python这边仅仅是处理与LLM相关的AI部分,因此中间层需要暴露最起码二个接口,一个是为了保存文件到中间层,另一个是文档训练及文档对话。Python包含很多非常优秀的第三方组件提供web服务,本例采用的是fastapi+uvicorn。FastAPI是一个基于Python的现代、快速(高性能)的Web框架,用于构建Web应用程序和API,uvicorn是一个基于ASGI(异步服务器网关接口)的Python Web服务器,用于运行ASGI应用程序。它是一个轻量级、高性能的服务器,适用于处理高并发的Web请求。
代码如下:
import os
from fastapi import FastAPI, Request, File, UploadFile
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import HTMLResponse, JSONResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
import api
import chainApi
from model import Message, MessageTurbo, ChatWithDocMessage
import json
app = FastAPI()
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
FILE_SIZE_LIMIT = 15 * 1024 * 1024 # 15MB
ALLOWED_EXTENSIONS = ['txt', 'markdown', 'md', 'pdf', 'html', 'htm', 'xlsx', 'docx']
async def catch_exceptions_middleware(request: Request, call_next):
try:
return await call_next(request)
except Exception as exc:
return JSONResponse(content={"code": 500, "error": {"message": f"{type(exc)} {exc}"}})
app.middleware('http')(catch_exceptions_middleware)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.post("/upload")
async def fileUpload(request: Request,file: UploadFile = File(...)):
# check file
if not file:
raise Exception("没有发现文件")
#if len(file) > 1:
# raise Exception("只能处理一个文件")
file_content = await file.read()
file_size = len(file_content)
if file_size > FILE_SIZE_LIMIT:
raise Exception("文件大小超过了15M,请上传小于15M的文件")
extension = file.filename.split('.')[-1]
if extension not in ALLOWED_EXTENSIONS:
raise Exception("不支持该文件格式!")
# save file to storage
file_path = f"{BASE_DIR}/uploads/{file.filename}" # 保存文件的路径,可以根据需要修改
with open(file_path, "wb") as f:
f.write(file_content)
#api_key = request.headers.get('api_key')
#res = await api.completions_turbo(message, api_key=api_key)
return {"code":"200","filename": file.filename}
if __name__ == '__main__':
import uvicorn
uvicorn.run("app:app", host="127.0.0.1", port=8123, reload=True)
以上代码首先导入了所需的模块和库,包括os、FastAPI、Request、File、UploadFile等,然后,创建了一个FastAPI应用程序实例app。
定义了一些常量,包括BASE_DIR(当前文件所在目录的绝对路径)、FILE_SIZE_LIMIT(文件大小限制为15MB)、ALLOWED_EXTENSIONS(允许上传的文件格式列表)。
接下来,定义了一个中间件函数catch_exceptions_middleware,用于捕获应用程序中的异常并返回JSON格式的错误响应。将该中间件函数添加到应用程序的中间件列表中,添加了一个CORS中间件,用于处理跨域请求。
定义了一个post("/upload")的url用来处理文件上传,用于处理文件上传请求。该函数接收一个Request对象和一个UploadFile对象作为参数。在函数内部,首先检查是否存在文件,如果不存在则抛出异常;然后,读取文件内容并获取文件大小,如果文件大小超过了限制,则抛出异常;接着,获取文件的扩展名,并检查是否在允许的文件格式列表中,如果不在则抛出异常,将文件保存到指定的路径;最后,返回一个JSON响应,包含上传成功的消息和文件名。
最后,通过调用uvicorn.run函数来运行应用程序,监听本地主机的8123端口,并启用自动重载。
通过封装的qiqiqichain进行文档训练和对话,文档的切片及向量化的逻辑都在qiqiqichain组件里面。代码如下:
import os
from qiqiqichain import App,OpenSourceApp
from qiqiqichain.config import InitConfig,ChatConfig
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
os.environ["OPENAI_API_KEY"] = "sk-apiKEY"
API_KEY = os.environ["OPENAI_API_KEY"]
if proxy := os.environ.get('HTTPS_PROXY'):
PROXIES = {"https://": proxy}
else:
PROXIES = None
chat_bot = {}
async def chatWithDoc(message):
try:
print(message)
fileName = message.dict().get('fileName')
print(f'filename={fileName}')
bot = None
if fileName in chat_bot:
print(f'已经存在机器人')
bot = chat_bot.get(fileName)
else:
print(f'不存在该机器人,初始化一个机器人')
file_path = f"{BASE_DIR}/uploads/{fileName}" # 文件的路径,可以根据需要修改
bot = App()
suffix = message.dict().get('suffix')
if suffix=='doc' or suffix=='docx':
bot.add_local('docx',file_path)
if suffix=='pdf':
bot.add_local('pdf_file',file_path)
if suffix=='txt':
bot.add_local('text',file_path)
if suffix=='md':
bot.add_local('text',file_path)
chat_bot[fileName] = bot
print(f'文档向量化已经完成')
chatConfig = ChatConfig(model="gpt-3.5-turbo-16k",temperature=0.8)
rtnMsg = bot.chat(input_query=message.dict().get('msg'),config=chatConfig)
return {"code":"200","message": rtnMsg}
except Exception as ex:
print('发现异常,异常信息如下')
print(ex)
return {"code":"200","message": str(ex)}
1. 从qiqiqichain模块中导入了App,以及从qiqiqichain.config模块中导入了InitConfig和ChatConfig类。
2.定义了一个变量BASE_DIR,用于存储当前文件的目录路径。
3.设置了一个环境变量OPENAI_API_KEY,此处对应openAI的apiKey,如果用其他开源模型则可以不需要openAI的key,但从目前的测试结果看,openAI提供的服务效果最好。
4.使用条件表达式来判断是否存在名为HTTPS_PROXY的环境变量。如果存在,则将其值赋给变量proxy,并将PROXIES设置为一个字典,键为"https://",值为proxy;如果不存在,则将PROXIES设置为None。此处的功能是为了设置代理,这样能进行科学上网。
5.定义了一个空字典chat_bot,用于存储聊天机器人的实例。为了防止重复进行机器人初始化的动作,定义一个全局变量,如果机器人存在则直接get机器人进行服务,为了提供性能。
6.定义了一个异步函数chatWithDoc,该函数接受一个参数message。从message的字典中获取fileName的值。然后,它定义了一个变量bot,并将其初始化为None。接着,它通过判断fileName是否存在于chat_bot字典中来确定是否已经存在一个聊天机器人实例。如果存在,则将bot赋值为chat_bot中对应的实例;如果不存在,则根据文件的后缀名来初始化一个新的聊天机器人实例,并将其添加到chat_bot字典中。接下来根据需要设置聊天配置chatConfig。然后,调用聊天机器人实例的chat方法,传入输入的查询消息和聊天配置,并将返回的结果赋给变量rtnMsg。最后,返回一个字典,包含键"code"和"message",分别对应返回的状态码和消息。如果在执行过程中发生异常,返回一个包含异常信息的字典。java端接收到该消息后会返回给前端进行展示。
通过上述代码可以很方便的进行文档训练和文档对话,但文档训练的逻辑和文档对话的逻辑封装到了qiqiqichain的组件中。以下用pdf文件进行示例简单讲解文档训练和文档对话。
所谓的文档训练实际上是指按照一定的要求对文档进行切片,向量化,然后把向量化的结果存储到对应的向量数据库中。
from langchain.document_loaders import PyPDFLoader
from qiqiqichain.utils import clean_string
class PdfFileLoader:
def load_data(self, url):
"""Load data from a PDF file."""
loader = PyPDFLoader(url)
output = []
pages = loader.load_and_split()
if not len(pages):
raise ValueError("No data found")
for page in pages:
content = page.page_content
content = clean_string(content)
meta_data = page.metadata
meta_data["url"] = url
output.append(
{
"content": content,
"meta_data": meta_data,
}
)
return output
以上代码通过langchain封装的PyPDFLoader加载pdf文件,并通过longchain封装的方法把pdf文件进行切片。切片后的结果放入到output数组中,数组包括该切片对应的文本内容以及对应的元数据。
from typing import Optional
from langchain.text_splitter import RecursiveCharacterTextSplitter
from qiqiqichain.chunkers.base_chunker import BaseChunker
from qiqiqichain.config.AddConfig import ChunkerConfig
TEXT_SPLITTER_CHUNK_PARAMS = {
"chunk_size": 1000,
"chunk_overlap": 0,
"length_function": len,
}
class PdfFileChunker(BaseChunker):
"""Chunker for PDF file."""
def __init__(self, config: Optional[ChunkerConfig] = None):
if config is None:
config = TEXT_SPLITTER_CHUNK_PARAMS
text_splitter = RecursiveCharacterTextSplitter(**config)
super().__init__(text_splitter)
以上代码是定义pdf文件的切片规则,如果参数中不包括自定义规则,则使用默认规则进行设置。默认的规则是,切片大小为1000个字符,chunk_overlap是指滑动窗口大小,为了使上下文匹配更准确。
def create_chunks(self, loader, src):
"""
Loads data and chunks it.
:param loader: The loader which's `load_data` method is used to create
the raw data.
:param src: The data to be handled by the loader. Can be a URL for
remote sources or local content for local loaders.
"""
documents = []
ids = []
idMap = {}
datas = loader.load_data(src)
metadatas = []
for data in datas:
content = data["content"]
meta_data = data["meta_data"]
url = meta_data["url"]
chunks = self.get_chunks(content)
for chunk in chunks:
chunk_id = hashlib.sha256((chunk + url).encode()).hexdigest()
if idMap.get(chunk_id) is None:
idMap[chunk_id] = True
ids.append(chunk_id)
documents.append(chunk)
metadatas.append(meta_data)
return {
"documents": documents,
"ids": ids,
"metadatas": metadatas,
}
以上函数的作用是,对langchain进行的切分,再按照自定义的规则再一次进行切分,返回的结果包装到一个dict变量中。
def _set_embedding_function_to_default(self):
"""
Sets embedding function to default (`text-embedding-ada-002`).
:raises ValueError: If the template is not valid as template should contain
$context and $query
"""
if os.getenv("OPENAI_API_KEY") is None and os.getenv("OPENAI_ORGANIZATION") is None:
raise ValueError("OPENAI_API_KEY or OPENAI_ORGANIZATION environment variables not provided") # noqa:E501
self.ef = embedding_functions.OpenAIEmbeddingFunction(
api_key=os.getenv("OPENAI_API_KEY"),
model_name="text-embedding-ada-002",
)
return
def _set_db_to_default(self):
"""
Sets database to default (`ChromaDb`).
"""
from qiqiqichain.vectordb.chroma_db import ChromaDB
self.db = ChromaDB(ef=self.ef, host=self.host, port=self.port)
_set_embedding_function_to_default代码用来设置默认的embedFunction,默认用OpenAI的"text-embedding-ada-002"进行向量化。
_set_db_to_default用来设置默认的向量数据库,默认的向量数据库用ChromaDB。
所谓的文档对话实际上是把用户提的问题进行向量化,然后用这个向量化的数据到向量数据库中进行匹配,按照设置返回对应的切片数据。再按照一定的格式把问题及对应的文档切片数据作为参数传递给大语言模型(本项目中用的是目前最新的gpt-3.5-turbo-16k模型),OpenAI处理完后,会返回对应的聊天内容,再把这些内容回传给Java端,Java端再吐给前端进行展示。
chatConfig = ChatConfig(model="gpt-3.5-turbo-16k",temperature=0.8)
rtnMsg = bot.chat(input_query=message.dict().get('msg'),config=chatConfig)
return {"code":"200","message": rtnMsg}
以上代码就是封装好的文档对话,核心的逻辑在qiqiqichain组件中的chat函数中。
def chat(self, input_query, config: ChatConfig = None):
if config is None:
config = ChatConfig()
if self.is_code_docs_instance:
config.template = CODE_DOCS_PAGE_PROMPT_TEMPLATE
config.number_documents = 5
contexts = self.retrieve_from_database(input_query, config)
global memory
chat_history = memory.load_memory_variables({})["history"]
if chat_history:
config.set_history(chat_history)
prompt = self.generate_prompt(input_query, contexts, config)
logging.info(f"Prompt: {prompt}")
answer = self.get_answer_from_llm(prompt, config)
memory.chat_memory.add_user_message(input_query)
if isinstance(answer, str):
memory.chat_memory.add_ai_message(answer)
logging.info(f"Answer: {answer}")
return answer
else:
# this is a streamed response and needs to be handled differently.
return self._stream_chat_response(answer)
以上代码定义chat方法,它接受三个参数:self(表示类的实例本身),input_query(表示输入的查询消息),和config(表示聊天配置,类型为ChatConfig,默认值为None)。
1.判断检查config是否为None,如果是,则将其初始化为一个新的ChatConfig实例。
2.判断当前实例是否为代码文档实例(is_code_docs_instance),如果是,则将聊天配置的模板设置为CODE_DOCS_PAGE_PROMPT_TEMPLATE,并将文档数量设置为5。
3.调用retrieve_from_database方法,传入输入的查询消息和聊天配置,从数据库中检索相关的上下文信息,并将结果赋给contexts变量。
4.使用全局变量memory加载聊天历史记录,并将历史记录中的聊天历史设置为聊天配置的历史记录。
5.调用generate_prompt方法,传入输入的查询消息、上下文信息和聊天配置,生成一个提示(prompt)。
6.调用get_answer_from_llm方法,传入生成的提示(prompt)和聊天配置,获取答案。此处会调用默认配置的OpenAI大语言模型进行问题回答。
7.将用户输入的查询消息添加到聊天内存中。
8.代码检查答案是否为字符串类型,如果是,则将答案添加到聊天内存中,并将答案返回。如果答案不是字符串类型,则表示这是一个流式响应,需要以不同的方式处理,代码将调用_stream_chat_response方法来处理流式响应,并将结果返回。
扫码关注不迷路!!!
郑州升龙商业广场B座25层
service@iqiqiqi.cn
联系电话:400-8049-474
联系电话:187-0363-0315
联系电话:199-3777-5101