在本案例中,我们将展示如何用AWEL创建一个RAG(Retrieval Augmented Generation) 程序。

首先创建一个名称为first_rag_with_awel.py的python文件。

本案例提供的教程中,我们通过URL来获取知识,并存储到向量数据库当当中。

首先,需要创建环境并安装对应的依赖。 这里需要注意,dbgpt 包SDK的安装,不要安装在当前运行dbgpt服务的虚拟环境下。最好起个新的虚拟环境,如dbgpt_use

  1. conda create -n dbgpt_use python=3.10
  2. conda activate dbgpt_use
  3. pip install dbgpt

准备Embedding模型

将知识处理为向量,我们需要用到向量模型。 DB-GPT支持许多的向量模型,比如代理模型OpenAI Embedding,本地模型text2vec,当然也支持集群模式部署好之后,通过API的方式来使用。下面是几种方式的使用案例。

三种不同的向量模型使用方式

OpenAI Embedding API

  1. from dbgpt.rag.embedding import DefaultEmbeddingFactory
  2. embeddings = DefaultEmbeddingFactory.openai()

本地向量模型

  1. from dbgpt.rag.embedding import DefaultEmbeddingFactory
  2. embeddings = DefaultEmbeddingFactory.default("/data/models/text2vec-large-chinese")

集群部署下API调用

  1. from dbgpt.rag.embedding import DefaultEmbeddingFactory
  2. embeddings = DefaultEmbeddingFactory.remote(
  3. api_url="http://localhost:8100/api/v1/embeddings",
  4. api_key="{your_api_key}",
  5. model_name="text2vec"
  6. )

在本案例中,我们使用本地向量模型来完成相关的操作。

获取知识并存储在向量数据库中

如下的DAG程序可以通过URL获取数据,并将其存储在向量数据库中。

  1. from dbgpt.rag.embedding import DefaultEmbeddingFactory
  2. embeddings = DefaultEmbeddingFactory.default("DB-GPT/models/text2vec-large-chinese")
  3. import asyncio
  4. import shutil
  5. from dbgpt.core.awel import DAG
  6. from dbgpt.rag import ChunkParameters
  7. from dbgpt.rag.knowledge import KnowledgeType
  8. from dbgpt.rag.operators import EmbeddingAssemblerOperator, KnowledgeOperator
  9. from dbgpt.storage.vector_store.chroma_store import ChromaVectorConfig
  10. from dbgpt.storage.vector_store.connector import VectorStoreConnector
  11. # Delete old vector store directory(/tmp/awel_rag_test_vector_store)
  12. shutil.rmtree("/tmp/awel_rag_test_vector_store", ignore_errors=True)
  13. # 2.init vector_connector
  14. vector_connector = VectorStoreConnector.from_default(
  15. "Chroma",
  16. vector_store_config=ChromaVectorConfig(
  17. name="test_vstore",
  18. persist_path="/tmp/awel_rag_test_vector_store",
  19. ),
  20. embedding_fn=embeddings
  21. )
  22. # 3. write awel assembler_task
  23. with DAG("load_knowledge_dag") as knowledge_dag:
  24. # Load knowledge from URL
  25. knowledge_task = KnowledgeOperator(knowledge_type=KnowledgeType.URL.name)
  26. assembler_task = EmbeddingAssemblerOperator(
  27. vector_store_connector=vector_connector,
  28. chunk_parameters=ChunkParameters(chunk_strategy="CHUNK_BY_SIZE")
  29. )
  30. knowledge_task >> assembler_task
  31. chunks = asyncio.run(assembler_task.call("https://docs.dbgpt.site/docs/latest/awel/"))
  32. print(f"Chunk length: {chunks}")

从向量库中检索知识

你可以检索相关的知识,从向量数据库中。

  1. from dbgpt.core.awel import MapOperator
  2. from dbgpt.rag.operators import EmbeddingRetrieverOperator
  3. with DAG("retriever_dag") as retriever_dag:
  4. retriever_task = EmbeddingRetrieverOperator(
  5. top_k=3,
  6. vector_store_connector=vector_connector,
  7. )
  8. content_task = MapOperator(lambda cks: "\n".join(c.content for c in cks))
  9. retriever_task >> content_task
  10. chunks = asyncio.run(content_task.call("What is the AWEL?"))
  11. print(chunks)

准备LLM

可以有几种方式来调用LLM归纳总结,这里我们使用OpenAI的API

  1. pip install openai
  2. export OPENAI_API_KEY=sk-xx
  3. export OPENAI_API_BASE=https://xx:80/v1

使用代理模型Client来连接到OpenAI API

  1. from dbgpt.model.proxy import OpenAILLMClient
  2. llm_client = OpenAILLMClient()

创建RAG程序

最后我们创建一个rag程序,来进行智能问答。

  1. from dbgpt.core.awel import InputOperator, JoinOperator, InputSource
  2. from dbgpt.core.operators import PromptBuilderOperator, RequestBuilderOperator
  3. from dbgpt.model.operators import LLMOperator
  4. prompt = """Based on the known information below, provide users with professional and concise answers to their questions.
  5. If the answer cannot be obtained from the provided content, please say:
  6. "The information provided in the knowledge base is not sufficient to answer this question.".
  7. It is forbidden to make up information randomly. When answering, it is best to summarize according to points 1.2.3.
  8. known information:
  9. {context}
  10. question:
  11. {question}
  12. """
  13. with DAG("llm_rag_dag") as rag_dag:
  14. input_task = InputOperator(input_source=InputSource.from_callable())
  15. retriever_task = EmbeddingRetrieverOperator(
  16. top_k=3,
  17. vector_store_connector=vector_connector,
  18. )
  19. content_task = MapOperator(lambda cks: "\n".join(c.content for c in cks))
  20. merge_task = JoinOperator(lambda context, question: {"context": context, "question": question})
  21. prompt_task = PromptBuilderOperator(prompt)
  22. # The model is gpt-3.5-turbo, you can replace it with other models.
  23. req_build_task = RequestBuilderOperator(model="gpt-3.5-turbo")
  24. llm_task = LLMOperator(llm_client=llm_client)
  25. result_task = MapOperator(lambda r: r.text)
  26. input_task >> retriever_task >> content_task >> merge_task
  27. input_task >> merge_task
  28. merge_task >> prompt_task >> req_build_task >> llm_task >> result_task
  29. print(asyncio.run(result_task.call("What is the AWEL?")))

输出如下:

  1. AWEL stands for Agentic Workflow Expression Language. It is a set of intelligent agent workflow expression language specially designed for large model application development. AWEL provides functionality and flexibility for development of business logic for LLMs applications without the need to focus on cumbersome model and environment details. It adopts a layered API design, consisting of the operator layer, AgentFrame layer, and DSL layer. The operator layer includes basic operation atoms in the LLM application development process such as retrieval, vectorization, model interaction, and prompt processing.

恭喜你,到这里你已经完整的运行完了通过dbgpt sdk来开发一个RAG程序的所有过程。

完整代码

整个程序完整代码如下:

  1. import asyncio
  2. import shutil
  3. from dbgpt.core.awel import DAG, MapOperator, InputOperator, JoinOperator, InputSource
  4. from dbgpt.core.operators import PromptBuilderOperator, RequestBuilderOperator
  5. from dbgpt.rag import ChunkParameters
  6. from dbgpt.rag.knowledge import KnowledgeType
  7. from dbgpt.rag.operators import EmbeddingAssemblerOperator, KnowledgeOperator, EmbeddingRetrieverOperator
  8. from dbgpt.rag.embedding import DefaultEmbeddingFactory
  9. from dbgpt.storage.vector_store.chroma_store import ChromaVectorConfig
  10. from dbgpt.storage.vector_store.connector import VectorStoreConnector
  11. from dbgpt.model.operators import LLMOperator
  12. from dbgpt.model.proxy import OpenAILLMClient
  13. # Here we use the openai embedding model, if you want to use other models, you can
  14. # replace it according to the previous example.
  15. embeddings = DefaultEmbeddingFactory.openai()
  16. # Here we use the openai LLM model, if you want to use other models, you can replace
  17. # it according to the previous example.
  18. llm_client = OpenAILLMClient()
  19. # Delete old vector store directory(/tmp/awel_rag_test_vector_store)
  20. shutil.rmtree("/tmp/awel_rag_test_vector_store", ignore_errors=True)
  21. vector_connector = VectorStoreConnector.from_default(
  22. "Chroma",
  23. vector_store_config=ChromaVectorConfig(
  24. name="test_vstore",
  25. persist_path="/tmp/awel_rag_test_vector_store",
  26. ),
  27. embedding_fn=embeddings
  28. )
  29. with DAG("load_knowledge_dag") as knowledge_dag:
  30. # Load knowledge from URL
  31. knowledge_task = KnowledgeOperator(knowledge_type=KnowledgeType.URL.name)
  32. assembler_task = EmbeddingAssemblerOperator(
  33. vector_store_connector=vector_connector,
  34. chunk_parameters=ChunkParameters(chunk_strategy="CHUNK_BY_SIZE")
  35. )
  36. knowledge_task >> assembler_task
  37. chunks = asyncio.run(assembler_task.call("https://docs.dbgpt.site/docs/latest/awel/"))
  38. print(f"Chunk length: {len(chunks)}\n")
  39. prompt = """Based on the known information below, provide users with professional and concise answers to their questions.
  40. If the answer cannot be obtained from the provided content, please say:
  41. "The information provided in the knowledge base is not sufficient to answer this question.".
  42. It is forbidden to make up information randomly. When answering, it is best to summarize according to points 1.2.3.
  43. known information:
  44. {context}
  45. question:
  46. {question}
  47. """
  48. with DAG("llm_rag_dag") as rag_dag:
  49. input_task = InputOperator(input_source=InputSource.from_callable())
  50. retriever_task = EmbeddingRetrieverOperator(
  51. top_k=3,
  52. vector_store_connector=vector_connector,
  53. )
  54. content_task = MapOperator(lambda cks: "\n".join(c.content for c in cks))
  55. merge_task = JoinOperator(lambda context, question: {"context": context, "question": question})
  56. prompt_task = PromptBuilderOperator(prompt)
  57. # The model is gpt-3.5-turbo, you can replace it with other models.
  58. req_build_task = RequestBuilderOperator(model="gpt-3.5-turbo")
  59. llm_task = LLMOperator(llm_client=llm_client)
  60. result_task = MapOperator(lambda r: r.text)
  61. input_task >> retriever_task >> content_task >> merge_task
  62. input_task >> merge_task
  63. merge_task >> prompt_task >> req_build_task >> llm_task >> result_task
  64. print(asyncio.run(result_task.call("What is the AWEL?")))

可视化DAGs

可以通过如下代码进行DAG可视化

  1. knowledge_dag.visualize_dag()
  2. rag_dag.visualize_dag()

如果在 Jupyter Notebook 中执行代码,可以在笔记本中看到 DAG。

  1. display(knowledge_dag.show())
  2. display(rag_dag.show())

Knowledge_dag可视化图如下:

AWEL RAG开发入门 - 图1

rag_dag可视化图如下AWEL RAG开发入门 - 图2

英文文档地址