DB-GPT项目其实我们一直都走的比较坚定。 在今年4月初我们定下来整个技术架构到现在,都没有多大的变化。 我们一直坚持想做一件事就是: “Revolutionizing Database Interactions with Private LLM Technology”。对”用大模型技术定义数据库下一代交互方式”。
1. 大模型交互三层架构
通用模型能解决所有问题吗? 需不需要领域模型? 面向未来,多模型之间是怎么协作交互的? 这几个问题其实我们一直都在思考。但截至目前,跟我们5个月之前的认知也没太大的变化。 那就是以后应用落地需要三层的大模型交互架构。 如下图所示,从上往下依次是- 通用大语言模型(> 100B)
- 领域大语言模型(10~70B)
- 工具类模型(<10B)
2. DB-GPT的架构与特点
下图是DB-GPT的架构图,整体结构比较简单。左侧是知识(RAG),右侧是工具(Agents), 中间是多模型管理(SMMF),同时增加了向量存储这样的大模型记忆体,以及各类数据源,在往上是一层通用的交互层面。关键特性
- 私域问答&数据处理&RAG
- 多数据源&GBI
- 多模型管理
- 自动化微调
- Data-Driven Multi-Agents&Plugins
- 隐私安全
3.面向未来
经过这一年的折腾与抽象,我们将DB-GPT未来的架构进行了分层。如下图所示,主要分为以下7层,自上而下以此为:- 可视化层: 可视化层主要的工作是对话、交互、图表显示、可视化编排等能力。
- 应用层: 基于底层能力的应用构建,如GBI应用、ChatDB类应用、ChatData类应用、ChatExcel类应用等。
- 服务层: 服务层主要是对外暴露的服务,比如LLMServer、APIServer、RAGServer、dbgptserver等
- 核心模块层: 核心模块主要有三个分别是,SMMF、RAGs、Agents
- 协议层:协议层主要是指AWEL(Agentic Workflow Expression Language), 即智能体编排语言,是专门为大模型应用开发设计的智能体工作流表达式语言。
- 训练层: 训练层会主要关注Text2SQL、Text2DSL、Text2API方向的微调,提供标准的微调脚手架。
- 运行环境: 运行环境是指整个框架的运行在什么环境当中,我们后期会优先支持基于Ray与Kubernetes的环境。
3.1 多模型管理(SMMF)
前面我们有提到说面向未来是通用大模型、领域模型、小模型一起交互协作的。 那就特别需要对对模型进行管理,而且模型需要是一个个随时可调用的服务。在我们的框架里面叫Service-oriented Multi-model Management Framework(SMMF) 服务化多模型管理框架。 多模型管理框架的本质其实就是模型服务的Serverless化。关于此部分内容更详细的介绍,可以参考文档:多模型介绍 ### 3.2 MS-RAG
import openai
openai.api_key = "EMPTY"
openai.api_base = "http://127.0.0.1:8100/api/v1"
model = "vicuna-13b-v1.5"
completion = openai.ChatCompletion.create(
model=model,
messages=[{"role": "user", "content": "hello"}]
)
# print the completion
print(completion.choices[0].message.content)


3.3 Agents
3.4 AWEL
AWEL(Agentic Workflow Expression Language)是一套专为大模型应用开发设计的智能体工作流表达语言,它提供了强大的功能和灵活性。通过 AWEL API 您可以专注于大模型应用业务逻辑的开发,而不需要关注繁琐的模型和环境细节,AWEL 采用分层 API 的设计, AWEL 的分层 API 设计架构如下图所示:AWEL分层设计
AWEL在设计上分为三个层次,依次为算子层、AgentFrame层以及DSL层,以下对三个层次做简要介绍。- 算子层
- AgentFream层
- DSL层
使用案例
AWEL初步的版本也已经在V0.4.2发布,我们内置提供了一些使用样例。算子层API-RAG例子
源码在项目中位置 examples/awel/simple_rag_example.py
位运算会将整个过程以DAG的形式编排。
with DAG("simple_rag_example") as dag:
trigger_task = HttpTrigger(
"/examples/simple_rag", methods="POST", request_body=ConversationVo
)
req_parse_task = RequestParseOperator()
# TODO should register prompt template first
prompt_task = PromptManagerOperator()
history_storage_task = ChatHistoryStorageOperator()
history_task = ChatHistoryOperator()
embedding_task = EmbeddingEngingOperator()
chat_task = BaseChatOperator()
model_task = ModelOperator()
output_parser_task = MapOperator(lambda out: out.to_dict()["text"])
(
trigger_task
>> req_parse_task
>> prompt_task
>> history_storage_task
>> history_task
>> embedding_task
>> chat_task
>> model_task
>> output_parser_task
)
算子层API调用模型+缓存例子
AgentFream层API样例
af = AgentFream(HttpSource("/examples/run_code", method = "post"))
result = (
af
.text2vec(model="text2vec")
.filter(vstore, store = "chromadb", db="default")
.llm(model="vicuna-13b", temperature=0.7)
.map(code_parse_func)
.map(run_sql_func)
.reduce(lambda a, b: a + b)
)
result.write_to_sink(type='source_slink')
DSL层API样例
DSL 采用ANTLR4 / Lark解析器
CREATE WORKFLOW RAG AS
BEGIN
DATA requestData = RECEIVE REQUEST FROM
http_source("/examples/rags", method = "post");
DATA processedData = TRANSFORM requestData USING embedding(model = "text2vec");
DATA retrievedData = RETRIEVE DATA
FROM vstore(database = "chromadb", key = processedData)
ON ERROR FAIL;
DATA modelResult = APPLY LLM "vicuna-13b"
WITH DATA retrievedData AND PARAMETERS (temperature = 0.7)
ON ERROR RETRY 2 TIMES;
RESPOND TO http_source WITH modelResult
ON ERROR LOG "Failed to respond to request";
END;
3.5 Train
作为DB-GPT项目的一部分,我们提供了Text2SQL微调相关的代码,目前已经按照独立的pypi包发布,可以直接安装进行使用。
pip install dbgpt_hub
查看模型基线
from dbgpt_hub.baseline import show_scores
show_scores()
微调
更多信息可以参考我们的开源代码 ### 3.6 Benchmark 简介:所有用户,相同 prompt (内容、input length 和 output length 相同),同一时间发起推理请求,在同一硬件条件下的首字延迟、推理延迟和吞吐量。
from dbgpt_hub.data_process import preprocess_sft_data
from dbgpt_hub.train import start_sft
from dbgpt_hub.predict import start_predict
from dbgpt_hub.eval import start_evaluate
data_folder = "dbgpt_hub/data"
data_info = [
{
"data_source": "spider",
"train_file": ["train_spider.json", "train_others.json"],
"dev_file": ["dev.json"],
"tables_file": "tables.json",
"db_id_name": "db_id",
"is_multiple_turn": False,
"train_output": "spider_train.json",
"dev_output": "spider_dev.json",
}
]
train_args = {
"model_name_or_path": "codellama/CodeLlama-13b-Instruct-hf",
"do_train": True,
"dataset": "example_text2sql_train",
"max_source_length": 2048,
"max_target_length": 512,
"finetuning_type": "lora",
"lora_target": "q_proj,v_proj",
"template": "llama2",
"lora_rank": 64,
"lora_alpha": 32,
"output_dir": "dbgpt_hub/output/adapter/CodeLlama-13b-sql-lora",
"overwrite_cache": True,
"overwrite_output_dir": True,
"per_device_train_batch_size": 1,
"gradient_accumulation_steps": 16,
"lr_scheduler_type": "cosine_with_restarts",
"logging_steps": 50,
"save_steps": 2000,
"learning_rate": 2e-4,
"num_train_epochs": 8,
"plot_loss": True,
"bf16": True,
}
predict_args = {
"model_name_or_path": "codellama/CodeLlama-13b-Instruct-hf",
"template": "llama2",
"finetuning_type": "lora",
"checkpoint_dir": "dbgpt_hub/output/adapter/CodeLlama-13b-sql-lora",
"predict_file_path": "dbgpt_hub/data/eval_data/dev_sql.json",
"predict_out_dir": "dbgpt_hub/output/",
"predicted_out_filename": "pred_sql.sql",
}
evaluate_args = {
"input": "./dbgpt_hub/output/pred/pred_sql_dev_skeleton.sql",
"gold": "./dbgpt_hub/data/eval_data/gold.txt",
"gold_natsql": "./dbgpt_hub/data/eval_data/gold_natsql2sql.txt",
"db": "./dbgpt_hub/data/spider/database",
"table": "./dbgpt_hub/data/eval_data/tables.json",
"table_natsql": "./dbgpt_hub/data/eval_data/tables_for_natsql2sql.json",
"etype": "exec",
"plug_value": True,
"keep_distict": False,
"progress_bar_for_each_datapoint": False,
"natsql": False,
}
preprocess_sft_data(
data_folder = data_folder,
data_info = data_info
)
start_sft(train_args)
start_predict(predict_args)
start_evaluate(evaluate_args)
指标
1. 首字延迟(First Token Latency)
单位:毫秒 从 DB-GPT 模型部署框架接收到模型推理请求到推理框架得到第一个 token 的时间(prefill 时间 + 第一个decoding token 的时间)延迟(**Latency**)
单位:毫秒 从 DB-GPT 模型部署框架接收到模型推理请求到生成完整的响应的时间。2. 吞吐量(**Throughput**)
单位:tokens 每秒 DB-GPT 模型部署框架每秒中处理的所有用户和所有请求的 token 数量。硬件配置
GPU: NVIDIA A100-PCIE-40GB * 1
CPU: Intel Xeon Processor (Skylake, IBRS) 40核80线程 2992.953 MHz
内存:629 GB
硬盘: 1T HDD
推理框架和模型
推理框架:vllm 和 huggingface transformers 模型:Qwen-7b-Chat、Qwen-14b-ChatVLLM
hg
Benchmark代码地址: https://github.com/eosphoros-ai/DB-GPT/blob/main/dbgpt/util/benchmarks
async def run_model(wh: WorkerManager) -> None:
global result_csv_file
if not result_csv_file:
result_csv_file = get_result_csv_file()
if os.path.exists(result_csv_file):
now = datetime.now()
now_str = now.strftime("%Y-%m-%d")
os.rename(result_csv_file, f"{result_csv_file}.bak_{now_str}.csv")
for parallel_num in parallel_nums:
for input_len, output_len in zip(input_lens, output_lens):
try:
await run_batch(
wh, input_len, output_len, parallel_num, result_csv_file
)
except Exception:
msg = traceback.format_exc()
logging.error(
f"Run benchmarks error, input_len: {input_len}, output_len: {output_len}, parallel_num: {parallel_num}, error message: {msg}"
)
if "torch.cuda.OutOfMemoryError" in msg:
return
sys.exit(0)
async def run_batch(
wh: WorkerManager,
input_len: int,
output_len: int,
parallel_num: int,
output_file: str,
):
tasks = []
prompt = read_prompt_from_file("11k")
if model_type == "vllm":
max_input_str_len = input_len
if "baichuan" in model_name:
# TODO prompt handle first
max_input_str_len *= 2
prompt = prompt[-max_input_str_len:]
# Warmup first
params = build_param(input_len, output_len, prompt, system_prompt="")
await wh.generate(params)
for _ in range(parallel_num):
params = build_param(input_len, output_len, prompt, system_prompt="")
tasks.append(wh.generate(params))
print(
f"Begin run benchmarks, model name: {model_name}, input_len: {input_len}, output_len: {output_len}, parallel_num: {parallel_num}, save result to {output_file}"
)
start_time_ms = time.time_ns() // 1_000_000
results: List[ModelOutput] = await asyncio.gather(*tasks)
end_time_ms = time.time_ns() // 1_000_000
test_time_cost_ms = end_time_ms - start_time_ms
test_total_tokens = 0
first_token_latency_ms = 0
latency_ms = 0
gpu_nums = 0
avg_gpu_mem = 0
rows = []
for r in results:
metrics = r.metrics
if isinstance(metrics, dict):
metrics = ModelInferenceMetrics(**metrics)
print(r)
test_total_tokens += metrics.total_tokens
first_token_latency_ms += metrics.first_token_time_ms - metrics.start_time_ms
latency_ms += metrics.end_time_ms - metrics.start_time_ms
row_data = metrics.to_dict()
del row_data["collect_index"]
if "avg_gpu_infos" in row_data:
avg_gpu_infos = row_data["avg_gpu_infos"]
gpu_nums = len(avg_gpu_infos)
avg_gpu_mem = (
sum(i["allocated_memory_gb"] for i in avg_gpu_infos) / gpu_nums
)
del row_data["avg_gpu_infos"]
del row_data["current_gpu_infos"]
rows.append(row_data)
avg_test_speed_per_second = test_total_tokens / (test_time_cost_ms / 1000.0)
avg_first_token_latency_ms = first_token_latency_ms / len(results)
avg_latency_ms = latency_ms / len(results)
with open(output_file, "a", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=METRICS_HEADERS)
if f.tell() == 0:
# Fist time
writer.writeheader()
for row in rows:
row["model_name"] = model_name
row["parallel_nums"] = parallel_num
row["input_length"] = input_len
row["output_length"] = output_len
row["test_time_cost_ms"] = test_time_cost_ms
row["test_total_tokens"] = test_total_tokens
row["avg_test_speed_per_second(tokens/s)"] = avg_test_speed_per_second
row["avg_first_token_latency_ms"] = avg_first_token_latency_ms
row["avg_latency_ms"] = avg_latency_ms
row["gpu_nums"] = gpu_nums
row["gpu_mem(GiB)"] = avg_gpu_mem
writer.writerow(row)
print(
f"input_len: {input_len}, output_len: {output_len}, parallel_num: {parallel_num}, save result to {output_file}"
)