DB-GPT项目其实我们一直都走的比较坚定。 在今年4月初我们定下来整个技术架构到现在,都没有多大的变化。 我们一直坚持想做一件事就是: “Revolutionizing Database Interactions with Private LLM Technology”。对”用大模型技术定义数据库下一代交互方式”。

1. 大模型交互三层架构

通用模型能解决所有问题吗? 需不需要领域模型? 面向未来,多模型之间是怎么协作交互的? 这几个问题其实我们一直都在思考。但截至目前,跟我们5个月之前的认知也没太大的变化。 那就是以后应用落地需要三层的大模型交互架构。 如下图所示,从上往下依次是
  • 通用大语言模型(> 100B)
  • 领域大语言模型(10~70B)
  • 工具类模型(<10B)

架构思考 - 图1

为什么要有这样的思考?因为我们发现,自然语言的理解、语义分析等远比我们想象的复杂,即使今天有非常多的开源模型都可以进行自然语言对话。 但是能够通过确定的Prompt拿到确定结果的模型,少之又少。 只有ChatGPT、GPT-4这样的模型有较好的表现,所以通用大语言模型无论是体量还是入门门槛都非常高,所以天然决定了这一层的生意会变成寡头,最后只能活下来效果最好的极少数几家。而这一层的迭代是符合数据飞轮理论,即谁的效果越好,先发优势越大,谁的模型就能得到更好的训练,形成增长飞轮。 再说为什么要有领域模型层? 领域模型也可以理解为垂域模型。为什么这一层必须存在? 通用大模型真的无法学会所有的领域知识吗? 答案必然是肯定的。但社会的运转不全是”效果”的好坏,还有边界与合理性。 我们看到今天无论哪一家企业都不会把自己的核心数据资产交给通用大语言模型。在模型层内做数据加密与隐私保护又基本已经宣告是伪命题。那我们怎么保护自己数据的同时又兼顾效果呢? 我的答案是引入领域模型层,将领域模型与通用模型在私有数据资产上做解耦。对内核心业务的交互全部放在领域模型层做处理,交互层还是通过通用模型去下发,在通用模型层与领域模型之间做安全与隐私防护。

架构思考 - 图2

那么领域层都有了,为什么还要有第三层工具模型? 这个我的理解主要是高精度与成本。 针对非常细分的场景去做任务的时候,我们其实不太需要模型掌握太多的泛化或者推理的能力。恰恰相反,我们需要的是最低成本确定性完成某一任务的能力。比如生产线上的工人,你需要让他去理解与推敲公司的战略方向吗? 嗯,高质量完成执行就行了。

2. DB-GPT的架构与特点

下图是DB-GPT的架构图,整体结构比较简单。左侧是知识(RAG),右侧是工具(Agents), 中间是多模型管理(SMMF),同时增加了向量存储这样的大模型记忆体,以及各类数据源,在往上是一层通用的交互层面。

架构思考 - 图3

关键特性

  • 私域问答&数据处理&RAG
支持内置、多文件格式上传、插件自抓取等方式自定义构建知识库,对海量结构化,非结构化数据做统一向量存储与检索
  • 多数据源&GBI
支持自然语言与Excel、数据库、数仓等多种数据源交互,并支持分析报告。
  • 多模型管理
海量模型支持,包括开源、API代理等几十种大语言模型。如LLaMA/LLaMA2、Baichuan、ChatGLM、文心、通义、智谱、星火等。
  • 自动化微调
围绕大语言模型、Text2SQL数据集、LoRA/QLoRA/Pturning等微调方法构建的自动化微调轻量框架, 让TextSQL微调像流水线一样方便。
  • Data-Driven Multi-Agents&Plugins
支持自定义插件执行任务,原生支持Auto-GPT插件模型,Agents协议采用Agent Protocol标准
  • 隐私安全
通过私有化大模型、代理脱敏等多种技术保障数据的隐私安全

3.面向未来

经过这一年的折腾与抽象,我们将DB-GPT未来的架构进行了分层。如下图所示,主要分为以下7层,自上而下以此为:
  • 可视化层: 可视化层主要的工作是对话、交互、图表显示、可视化编排等能力。
  • 应用层: 基于底层能力的应用构建,如GBI应用、ChatDB类应用、ChatData类应用、ChatExcel类应用等
  • 服务层: 服务层主要是对外暴露的服务,比如LLMServer、APIServer、RAGServer、dbgptserver等
  • 核心模块层: 核心模块主要有三个分别是,SMMF、RAGs、Agents
  • 协议层:协议层主要是指AWEL(Agentic Workflow Expression Language), 即智能体编排语言,是专门为大模型应用开发设计的智能体工作流表达式语言。
  • 训练层: 训练层会主要关注Text2SQL、Text2DSL、Text2API方向的微调,提供标准的微调脚手架。
  • 运行环境: 运行环境是指整个框架的运行在什么环境当中,我们后期会优先支持基于Ray与Kubernetes的环境。

架构思考 - 图4

下面对其中一些关键模块做一些较为详细的介绍。

3.1 多模型管理(SMMF)

前面我们有提到说面向未来是通用大模型、领域模型、小模型一起交互协作的。 那就特别需要对对模型进行管理,而且模型需要是一个个随时可调用的服务。在我们的框架里面叫Service-oriented Multi-model Management Framework(SMMF) 服务化多模型管理框架。 多模型管理框架的本质其实就是模型服务的Serverless化。

架构思考 - 图5

SMMF具体如上图所示: 最上层对应服务与应用层(如DB-GPT WebServer、Agents系统、应用等)。 下一层是模型部署框架层,这层包含了对应用层提供模型服务的APIServer和Model Handle、整个部署框架的元数据管理和控制中心Model Controller和与推理框架和底层环境直接对接的Model Worker。再下一层是推理框架层,这层包含了vLLM、llama.cpp和FastChat(由于DB-GPT直接使用了FastChat的推理接口,这里我们将FastChat也归为推理框架),大语言模型(Vicuna、Llama、Baichuan、ChatGLM)等部署在推理框架中。 最下面一层则是实际部署环境,包括Kubernetes、Ray、AWS、阿里云和私有云等。 为了能够让客户端无感使用各类模型,我们无缝兼容了OpenAI的SDK,即通过一套标准的SDK即可无缝使用各类大语言模型。
  1. import openai
  2. openai.api_key = "EMPTY"
  3. openai.api_base = "http://127.0.0.1:8100/api/v1"
  4. model = "vicuna-13b-v1.5"
  5. completion = openai.ChatCompletion.create(
  6. model=model,
  7. messages=[{"role": "user", "content": "hello"}]
  8. )
  9. # print the completion
  10. print(completion.choices[0].message.content)
关于此部分内容更详细的介绍,可以参考文档:多模型介绍 ### 3.2 MS-RAG 架构思考 - 图6 架构思考 - 图7 MS-RAG指的是多文档检索增强能力,这部分也是DB-GPT中的一个基础核心模块。 当前MS-RAG实现了基本的增强检索的操作,并对多文档、多源数据检索场景做了非常多的定制优化。整个流程中涵盖了知识构建、知识检索、答案生成全链路的能力。关于此部分更细节的内容,我这里暂且按下不表,后面会有专门的文章进行介绍。

3.3 Agents

架构思考 - 图8

Agents也是DB-GPT框架中一个非常核心的模块,我们在之前的DB-GPT早期的Agents框架版本中,进行了全新构建,引入了类AutoGen当中的多角色协同理念。同时为了更好的在数据领域落地,我们提出了数据驱动的Agents理念,在实际的使用场景中,每个agents会有一个DataFream上下文。结合此上下文可以对agents做前后置数据一致性校验与订正。 当前升级版本的代码经过我们最近紧张的开发之后,也已经进入到测试阶段,相信不久后就会与大家见面。 我们也会提供更多的介绍说明与使用案例。

架构思考 - 图9

架构思考 - 图10

3.4 AWEL

AWEL(Agentic Workflow Expression Language)是一套专为大模型应用开发设计的智能体工作流表达语言,它提供了强大的功能和灵活性。通过 AWEL API 您可以专注于大模型应用业务逻辑的开发,而不需要关注繁琐的模型和环境细节,AWEL 采用分层 API 的设计, AWEL 的分层 API 设计架构如下图所示:

架构思考 - 图11

AWEL分层设计

AWEL在设计上分为三个层次,依次为算子层、AgentFrame层以及DSL层,以下对三个层次做简要介绍。
  • 算子层
算子层是指LLM应用开发过程中一个个最基本的操作原子,比如在一个RAG应用开发时。 检索、向量化、模型交互、Prompt处理等都是一个个基础算子。 在后续的发展中,框架会进一步对算子进行抽象与标准化设计。 可以根据标准API快速实现一组算子。
  • AgentFream层
DataFream层将算子做进一步封装,可以基于算子做链式计算。 这一层链式计算也支持分布式,支持如filter、join、map、reduce等一套链式计算操作。 后续也将支持更多的计算逻辑。
  • DSL层
DSL层提供一套标准的结构化表示语言,可以通过写DSL语句完成AgentFrame与算子的操作,让围绕数据编写大模型应用更具确定性,避免通过自然语言编写的不确定性,使得围绕数据与大模型的应用编程变为确定性应用编程。

使用案例

AWEL初步的版本也已经在V0.4.2发布,我们内置提供了一些使用样例。
算子层API-RAG例子

源码在项目中位置 examples/awel/simple_rag_example.py

  1. with DAG("simple_rag_example") as dag:
  2. trigger_task = HttpTrigger(
  3. "/examples/simple_rag", methods="POST", request_body=ConversationVo
  4. )
  5. req_parse_task = RequestParseOperator()
  6. # TODO should register prompt template first
  7. prompt_task = PromptManagerOperator()
  8. history_storage_task = ChatHistoryStorageOperator()
  9. history_task = ChatHistoryOperator()
  10. embedding_task = EmbeddingEngingOperator()
  11. chat_task = BaseChatOperator()
  12. model_task = ModelOperator()
  13. output_parser_task = MapOperator(lambda out: out.to_dict()["text"])
  14. (
  15. trigger_task
  16. >> req_parse_task
  17. >> prompt_task
  18. >> history_storage_task
  19. >> history_task
  20. >> embedding_task
  21. >> chat_task
  22. >> model_task
  23. >> output_parser_task
  24. )
位运算会将整个过程以DAG的形式编排。

架构思考 - 图12

算子层API调用模型+缓存例子

架构思考 - 图13

AgentFream层API样例
  1. af = AgentFream(HttpSource("/examples/run_code", method = "post"))
  2. result = (
  3. af
  4. .text2vec(model="text2vec")
  5. .filter(vstore, store = "chromadb", db="default")
  6. .llm(model="vicuna-13b", temperature=0.7)
  7. .map(code_parse_func)
  8. .map(run_sql_func)
  9. .reduce(lambda a, b: a + b)
  10. )
  11. result.write_to_sink(type='source_slink')
DSL层API样例
DSL 采用ANTLR4 / Lark解析器
  1. CREATE WORKFLOW RAG AS
  2. BEGIN
  3. DATA requestData = RECEIVE REQUEST FROM
  4. http_source("/examples/rags", method = "post");
  5. DATA processedData = TRANSFORM requestData USING embedding(model = "text2vec");
  6. DATA retrievedData = RETRIEVE DATA
  7. FROM vstore(database = "chromadb", key = processedData)
  8. ON ERROR FAIL;
  9. DATA modelResult = APPLY LLM "vicuna-13b"
  10. WITH DATA retrievedData AND PARAMETERS (temperature = 0.7)
  11. ON ERROR RETRY 2 TIMES;
  12. RESPOND TO http_source WITH modelResult
  13. ON ERROR LOG "Failed to respond to request";
  14. END;

3.5 Train

作为DB-GPT项目的一部分,我们提供了Text2SQL微调相关的代码,目前已经按照独立的pypi包发布,可以直接安装进行使用。

架构思考 - 图14

  1. pip install dbgpt_hub

查看模型基线

  1. from dbgpt_hub.baseline import show_scores
  2. show_scores()

架构思考 - 图15

微调

  1. from dbgpt_hub.data_process import preprocess_sft_data
  2. from dbgpt_hub.train import start_sft
  3. from dbgpt_hub.predict import start_predict
  4. from dbgpt_hub.eval import start_evaluate
  5. data_folder = "dbgpt_hub/data"
  6. data_info = [
  7. {
  8. "data_source": "spider",
  9. "train_file": ["train_spider.json", "train_others.json"],
  10. "dev_file": ["dev.json"],
  11. "tables_file": "tables.json",
  12. "db_id_name": "db_id",
  13. "is_multiple_turn": False,
  14. "train_output": "spider_train.json",
  15. "dev_output": "spider_dev.json",
  16. }
  17. ]
  18. train_args = {
  19. "model_name_or_path": "codellama/CodeLlama-13b-Instruct-hf",
  20. "do_train": True,
  21. "dataset": "example_text2sql_train",
  22. "max_source_length": 2048,
  23. "max_target_length": 512,
  24. "finetuning_type": "lora",
  25. "lora_target": "q_proj,v_proj",
  26. "template": "llama2",
  27. "lora_rank": 64,
  28. "lora_alpha": 32,
  29. "output_dir": "dbgpt_hub/output/adapter/CodeLlama-13b-sql-lora",
  30. "overwrite_cache": True,
  31. "overwrite_output_dir": True,
  32. "per_device_train_batch_size": 1,
  33. "gradient_accumulation_steps": 16,
  34. "lr_scheduler_type": "cosine_with_restarts",
  35. "logging_steps": 50,
  36. "save_steps": 2000,
  37. "learning_rate": 2e-4,
  38. "num_train_epochs": 8,
  39. "plot_loss": True,
  40. "bf16": True,
  41. }
  42. predict_args = {
  43. "model_name_or_path": "codellama/CodeLlama-13b-Instruct-hf",
  44. "template": "llama2",
  45. "finetuning_type": "lora",
  46. "checkpoint_dir": "dbgpt_hub/output/adapter/CodeLlama-13b-sql-lora",
  47. "predict_file_path": "dbgpt_hub/data/eval_data/dev_sql.json",
  48. "predict_out_dir": "dbgpt_hub/output/",
  49. "predicted_out_filename": "pred_sql.sql",
  50. }
  51. evaluate_args = {
  52. "input": "./dbgpt_hub/output/pred/pred_sql_dev_skeleton.sql",
  53. "gold": "./dbgpt_hub/data/eval_data/gold.txt",
  54. "gold_natsql": "./dbgpt_hub/data/eval_data/gold_natsql2sql.txt",
  55. "db": "./dbgpt_hub/data/spider/database",
  56. "table": "./dbgpt_hub/data/eval_data/tables.json",
  57. "table_natsql": "./dbgpt_hub/data/eval_data/tables_for_natsql2sql.json",
  58. "etype": "exec",
  59. "plug_value": True,
  60. "keep_distict": False,
  61. "progress_bar_for_each_datapoint": False,
  62. "natsql": False,
  63. }
  64. preprocess_sft_data(
  65. data_folder = data_folder,
  66. data_info = data_info
  67. )
  68. start_sft(train_args)
  69. start_predict(predict_args)
  70. start_evaluate(evaluate_args)
更多信息可以参考我们的开源代码 ### 3.6 Benchmark 简介:所有用户,相同 prompt (内容、input length 和 output length 相同),同一时间发起推理请求,在同一硬件条件下的首字延迟、推理延迟和吞吐量。

指标

1. 首字延迟(First Token Latency)

单位:毫秒 从 DB-GPT 模型部署框架接收到模型推理请求到推理框架得到第一个 token 的时间(prefill 时间 + 第一个decoding token 的时间)
延迟(**Latency**
单位:毫秒 从 DB-GPT 模型部署框架接收到模型推理请求到生成完整的响应的时间。
2. 吞吐量(**Throughput**
单位:tokens 每秒 DB-GPT 模型部署框架每秒中处理的所有用户和所有请求的 token 数量。

硬件配置

GPU: NVIDIA A100-PCIE-40GB * 1

CPU: Intel Xeon Processor (Skylake, IBRS) 40核80线程 2992.953 MHz

内存:629 GB

硬盘: 1T HDD

推理框架和模型

推理框架:vllm 和 huggingface transformers 模型:Qwen-7b-Chat、Qwen-14b-Chat
VLLM

架构思考 - 图16

hg

架构思考 - 图17

Benchmark代码地址: https://github.com/eosphoros-ai/DB-GPT/blob/main/dbgpt/util/benchmarks

  1. async def run_model(wh: WorkerManager) -> None:
  2. global result_csv_file
  3. if not result_csv_file:
  4. result_csv_file = get_result_csv_file()
  5. if os.path.exists(result_csv_file):
  6. now = datetime.now()
  7. now_str = now.strftime("%Y-%m-%d")
  8. os.rename(result_csv_file, f"{result_csv_file}.bak_{now_str}.csv")
  9. for parallel_num in parallel_nums:
  10. for input_len, output_len in zip(input_lens, output_lens):
  11. try:
  12. await run_batch(
  13. wh, input_len, output_len, parallel_num, result_csv_file
  14. )
  15. except Exception:
  16. msg = traceback.format_exc()
  17. logging.error(
  18. f"Run benchmarks error, input_len: {input_len}, output_len: {output_len}, parallel_num: {parallel_num}, error message: {msg}"
  19. )
  20. if "torch.cuda.OutOfMemoryError" in msg:
  21. return
  22. sys.exit(0)
  23. async def run_batch(
  24. wh: WorkerManager,
  25. input_len: int,
  26. output_len: int,
  27. parallel_num: int,
  28. output_file: str,
  29. ):
  30. tasks = []
  31. prompt = read_prompt_from_file("11k")
  32. if model_type == "vllm":
  33. max_input_str_len = input_len
  34. if "baichuan" in model_name:
  35. # TODO prompt handle first
  36. max_input_str_len *= 2
  37. prompt = prompt[-max_input_str_len:]
  38. # Warmup first
  39. params = build_param(input_len, output_len, prompt, system_prompt="")
  40. await wh.generate(params)
  41. for _ in range(parallel_num):
  42. params = build_param(input_len, output_len, prompt, system_prompt="")
  43. tasks.append(wh.generate(params))
  44. print(
  45. f"Begin run benchmarks, model name: {model_name}, input_len: {input_len}, output_len: {output_len}, parallel_num: {parallel_num}, save result to {output_file}"
  46. )
  47. start_time_ms = time.time_ns() // 1_000_000
  48. results: List[ModelOutput] = await asyncio.gather(*tasks)
  49. end_time_ms = time.time_ns() // 1_000_000
  50. test_time_cost_ms = end_time_ms - start_time_ms
  51. test_total_tokens = 0
  52. first_token_latency_ms = 0
  53. latency_ms = 0
  54. gpu_nums = 0
  55. avg_gpu_mem = 0
  56. rows = []
  57. for r in results:
  58. metrics = r.metrics
  59. if isinstance(metrics, dict):
  60. metrics = ModelInferenceMetrics(**metrics)
  61. print(r)
  62. test_total_tokens += metrics.total_tokens
  63. first_token_latency_ms += metrics.first_token_time_ms - metrics.start_time_ms
  64. latency_ms += metrics.end_time_ms - metrics.start_time_ms
  65. row_data = metrics.to_dict()
  66. del row_data["collect_index"]
  67. if "avg_gpu_infos" in row_data:
  68. avg_gpu_infos = row_data["avg_gpu_infos"]
  69. gpu_nums = len(avg_gpu_infos)
  70. avg_gpu_mem = (
  71. sum(i["allocated_memory_gb"] for i in avg_gpu_infos) / gpu_nums
  72. )
  73. del row_data["avg_gpu_infos"]
  74. del row_data["current_gpu_infos"]
  75. rows.append(row_data)
  76. avg_test_speed_per_second = test_total_tokens / (test_time_cost_ms / 1000.0)
  77. avg_first_token_latency_ms = first_token_latency_ms / len(results)
  78. avg_latency_ms = latency_ms / len(results)
  79. with open(output_file, "a", newline="", encoding="utf-8") as f:
  80. writer = csv.DictWriter(f, fieldnames=METRICS_HEADERS)
  81. if f.tell() == 0:
  82. # Fist time
  83. writer.writeheader()
  84. for row in rows:
  85. row["model_name"] = model_name
  86. row["parallel_nums"] = parallel_num
  87. row["input_length"] = input_len
  88. row["output_length"] = output_len
  89. row["test_time_cost_ms"] = test_time_cost_ms
  90. row["test_total_tokens"] = test_total_tokens
  91. row["avg_test_speed_per_second(tokens/s)"] = avg_test_speed_per_second
  92. row["avg_first_token_latency_ms"] = avg_first_token_latency_ms
  93. row["avg_latency_ms"] = avg_latency_ms
  94. row["gpu_nums"] = gpu_nums
  95. row["gpu_mem(GiB)"] = avg_gpu_mem
  96. writer.writerow(row)
  97. print(
  98. f"input_len: {input_len}, output_len: {output_len}, parallel_num: {parallel_num}, save result to {output_file}"
  99. )

附录