在本案例中,我们将展示如何用AWEL创建一个RAG(Retrieval Augmented Generation) 程序。
首先创建一个名称为first_rag_with_awel.py
的python文件。
本案例提供的教程中,我们通过URL来获取知识,并存储到向量数据库当当中。
首先,需要创建环境并安装对应的依赖。 这里需要注意,dbgpt 包SDK的安装,不要安装在当前运行dbgpt服务的虚拟环境下。最好起个新的虚拟环境,如dbgpt_use
conda create -n dbgpt_use python=3.10
conda activate dbgpt_use
pip install dbgpt
准备Embedding模型
将知识处理为向量,我们需要用到向量模型。 DB-GPT支持许多的向量模型,比如代理模型OpenAI Embedding,本地模型text2vec,当然也支持集群模式部署好之后,通过API的方式来使用。下面是几种方式的使用案例。
三种不同的向量模型使用方式
OpenAI Embedding API
from dbgpt.rag.embedding import DefaultEmbeddingFactory
embeddings = DefaultEmbeddingFactory.openai()
本地向量模型
from dbgpt.rag.embedding import DefaultEmbeddingFactory
embeddings = DefaultEmbeddingFactory.default("/data/models/text2vec-large-chinese")
集群部署下API调用
from dbgpt.rag.embedding import DefaultEmbeddingFactory
embeddings = DefaultEmbeddingFactory.remote(
api_url="http://localhost:8100/api/v1/embeddings",
api_key="{your_api_key}",
model_name="text2vec"
)
在本案例中,我们使用本地向量模型来完成相关的操作。
获取知识并存储在向量数据库中
如下的DAG程序可以通过URL获取数据,并将其存储在向量数据库中。
from dbgpt.rag.embedding import DefaultEmbeddingFactory
embeddings = DefaultEmbeddingFactory.default("DB-GPT/models/text2vec-large-chinese")
import asyncio
import shutil
from dbgpt.core.awel import DAG
from dbgpt.rag import ChunkParameters
from dbgpt.rag.knowledge import KnowledgeType
from dbgpt.rag.operators import EmbeddingAssemblerOperator, KnowledgeOperator
from dbgpt.storage.vector_store.chroma_store import ChromaVectorConfig
from dbgpt.storage.vector_store.connector import VectorStoreConnector
# Delete old vector store directory(/tmp/awel_rag_test_vector_store)
shutil.rmtree("/tmp/awel_rag_test_vector_store", ignore_errors=True)
# 2.init vector_connector
vector_connector = VectorStoreConnector.from_default(
"Chroma",
vector_store_config=ChromaVectorConfig(
name="test_vstore",
persist_path="/tmp/awel_rag_test_vector_store",
),
embedding_fn=embeddings
)
# 3. write awel assembler_task
with DAG("load_knowledge_dag") as knowledge_dag:
# Load knowledge from URL
knowledge_task = KnowledgeOperator(knowledge_type=KnowledgeType.URL.name)
assembler_task = EmbeddingAssemblerOperator(
vector_store_connector=vector_connector,
chunk_parameters=ChunkParameters(chunk_strategy="CHUNK_BY_SIZE")
)
knowledge_task >> assembler_task
chunks = asyncio.run(assembler_task.call("https://docs.dbgpt.site/docs/latest/awel/"))
print(f"Chunk length: {chunks}")
从向量库中检索知识
你可以检索相关的知识,从向量数据库中。
from dbgpt.core.awel import MapOperator
from dbgpt.rag.operators import EmbeddingRetrieverOperator
with DAG("retriever_dag") as retriever_dag:
retriever_task = EmbeddingRetrieverOperator(
top_k=3,
vector_store_connector=vector_connector,
)
content_task = MapOperator(lambda cks: "\n".join(c.content for c in cks))
retriever_task >> content_task
chunks = asyncio.run(content_task.call("What is the AWEL?"))
print(chunks)
准备LLM
可以有几种方式来调用LLM归纳总结,这里我们使用OpenAI的API
pip install openai
export OPENAI_API_KEY=sk-xx
export OPENAI_API_BASE=https://xx:80/v1
使用代理模型Client来连接到OpenAI API
from dbgpt.model.proxy import OpenAILLMClient
llm_client = OpenAILLMClient()
创建RAG程序
最后我们创建一个rag程序,来进行智能问答。
from dbgpt.core.awel import InputOperator, JoinOperator, InputSource
from dbgpt.core.operators import PromptBuilderOperator, RequestBuilderOperator
from dbgpt.model.operators import LLMOperator
prompt = """Based on the known information below, provide users with professional and concise answers to their questions.
If the answer cannot be obtained from the provided content, please say:
"The information provided in the knowledge base is not sufficient to answer this question.".
It is forbidden to make up information randomly. When answering, it is best to summarize according to points 1.2.3.
known information:
{context}
question:
{question}
"""
with DAG("llm_rag_dag") as rag_dag:
input_task = InputOperator(input_source=InputSource.from_callable())
retriever_task = EmbeddingRetrieverOperator(
top_k=3,
vector_store_connector=vector_connector,
)
content_task = MapOperator(lambda cks: "\n".join(c.content for c in cks))
merge_task = JoinOperator(lambda context, question: {"context": context, "question": question})
prompt_task = PromptBuilderOperator(prompt)
# The model is gpt-3.5-turbo, you can replace it with other models.
req_build_task = RequestBuilderOperator(model="gpt-3.5-turbo")
llm_task = LLMOperator(llm_client=llm_client)
result_task = MapOperator(lambda r: r.text)
input_task >> retriever_task >> content_task >> merge_task
input_task >> merge_task
merge_task >> prompt_task >> req_build_task >> llm_task >> result_task
print(asyncio.run(result_task.call("What is the AWEL?")))
输出如下:
AWEL stands for Agentic Workflow Expression Language. It is a set of intelligent agent workflow expression language specially designed for large model application development. AWEL provides functionality and flexibility for development of business logic for LLMs applications without the need to focus on cumbersome model and environment details. It adopts a layered API design, consisting of the operator layer, AgentFrame layer, and DSL layer. The operator layer includes basic operation atoms in the LLM application development process such as retrieval, vectorization, model interaction, and prompt processing.
恭喜你,到这里你已经完整的运行完了通过dbgpt sdk来开发一个RAG程序的所有过程。
完整代码
整个程序完整代码如下:
import asyncio
import shutil
from dbgpt.core.awel import DAG, MapOperator, InputOperator, JoinOperator, InputSource
from dbgpt.core.operators import PromptBuilderOperator, RequestBuilderOperator
from dbgpt.rag import ChunkParameters
from dbgpt.rag.knowledge import KnowledgeType
from dbgpt.rag.operators import EmbeddingAssemblerOperator, KnowledgeOperator, EmbeddingRetrieverOperator
from dbgpt.rag.embedding import DefaultEmbeddingFactory
from dbgpt.storage.vector_store.chroma_store import ChromaVectorConfig
from dbgpt.storage.vector_store.connector import VectorStoreConnector
from dbgpt.model.operators import LLMOperator
from dbgpt.model.proxy import OpenAILLMClient
# Here we use the openai embedding model, if you want to use other models, you can
# replace it according to the previous example.
embeddings = DefaultEmbeddingFactory.openai()
# Here we use the openai LLM model, if you want to use other models, you can replace
# it according to the previous example.
llm_client = OpenAILLMClient()
# Delete old vector store directory(/tmp/awel_rag_test_vector_store)
shutil.rmtree("/tmp/awel_rag_test_vector_store", ignore_errors=True)
vector_connector = VectorStoreConnector.from_default(
"Chroma",
vector_store_config=ChromaVectorConfig(
name="test_vstore",
persist_path="/tmp/awel_rag_test_vector_store",
),
embedding_fn=embeddings
)
with DAG("load_knowledge_dag") as knowledge_dag:
# Load knowledge from URL
knowledge_task = KnowledgeOperator(knowledge_type=KnowledgeType.URL.name)
assembler_task = EmbeddingAssemblerOperator(
vector_store_connector=vector_connector,
chunk_parameters=ChunkParameters(chunk_strategy="CHUNK_BY_SIZE")
)
knowledge_task >> assembler_task
chunks = asyncio.run(assembler_task.call("https://docs.dbgpt.site/docs/latest/awel/"))
print(f"Chunk length: {len(chunks)}\n")
prompt = """Based on the known information below, provide users with professional and concise answers to their questions.
If the answer cannot be obtained from the provided content, please say:
"The information provided in the knowledge base is not sufficient to answer this question.".
It is forbidden to make up information randomly. When answering, it is best to summarize according to points 1.2.3.
known information:
{context}
question:
{question}
"""
with DAG("llm_rag_dag") as rag_dag:
input_task = InputOperator(input_source=InputSource.from_callable())
retriever_task = EmbeddingRetrieverOperator(
top_k=3,
vector_store_connector=vector_connector,
)
content_task = MapOperator(lambda cks: "\n".join(c.content for c in cks))
merge_task = JoinOperator(lambda context, question: {"context": context, "question": question})
prompt_task = PromptBuilderOperator(prompt)
# The model is gpt-3.5-turbo, you can replace it with other models.
req_build_task = RequestBuilderOperator(model="gpt-3.5-turbo")
llm_task = LLMOperator(llm_client=llm_client)
result_task = MapOperator(lambda r: r.text)
input_task >> retriever_task >> content_task >> merge_task
input_task >> merge_task
merge_task >> prompt_task >> req_build_task >> llm_task >> result_task
print(asyncio.run(result_task.call("What is the AWEL?")))
可视化DAGs
可以通过如下代码进行DAG可视化
knowledge_dag.visualize_dag()
rag_dag.visualize_dag()
如果在 Jupyter Notebook 中执行代码,可以在笔记本中看到 DAG。
display(knowledge_dag.show())
display(rag_dag.show())
Knowledge_dag
可视化图如下:
rag_dag
可视化图如下