Kubeflow 快速入门实战(三) - Qwen2.5 微调全流程
本篇将采用Qwen2.5 3b微调全流程在 Kubeflow跑一遍。然后用实战的方式介绍 Kubeflow的各个模块是怎么衔接和协作的。
承接Kubeflow 快速入门实战(一),Kubeflow 快速入门实战(二)。本篇将采用Qwen2.5 1.5b微调全流程跑一遍。然后用实战的方式了解 Kubeflow 的各个模块是怎么衔接和协作的。
Kubeflow 快速入门实战(一) - 简介 / Notebooks-CSDN博客文章浏览阅读442次,点赞19次,收藏6次。本文主要介绍了 Kubeflow 的主要功能和能力,适用场景,基本用法。以及Notebook,piplines,katib,KServer 的入门级示例https://blog.csdn.net/weixin_39403185/article/details/147337813?spm=1001.2014.3001.5502Kubeflow 快速入门实战(二) - Pipelines / Katib / KServer-CSDN博客文章浏览阅读490次,点赞16次,收藏17次。承接前文博客 Kubeflow 快速入门实战(一)。补充Kubeflow pipelines ,katib,KServer,Training Operators (分布式训练)
https://blog.csdn.net/weixin_39403185/article/details/147349105?spm=1001.2014.3001.5502
4.5 基础训练环境准备
a) ECS 基础conda python环境
# ubuntu22.04 环境
# 安装 conda
wget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh -O /data/Miniconda3.sh
bash /data/Miniconda3.sh -b -p /data/miniconda3
echo 'export PATH="/data/miniconda3/bin:$PATH"' >> ~/.bashrc
source /data/miniconda3/bin/activate
source ~/.bashrc
# 安装 conda 训练环境
conda create -n llm python=3.10 -y
conda activate llm
echo 'conda activate llm' >> ~/.bashrc
source ~/.bashrc
## 工具安装
pip install huggingface_hub
pip install kfp
pip install kfp-kubernetes
b) 模型和代码准备
越来越复杂了,现在都不得不画画示意图。这里为了简化,是将本地工作目录,通过 PVC/PV 本地挂载到 Kubernetes 中去,提供给 Pod 使用。要是公司级别使用,最好还是使用专门的存储服务。然后用 gitlab,模型仓库等来管理代码,数据集,基础模型和训练好的推理模型。

huggingface-cli download Qwen/Qwen2.5-1.5B --resume-download --local-dir /data/models/qwen2.5-1.5b
### 将本地目录通过PVC/PV提供官出来给训练Pod使用
apiVersion: v1
kind: PersistentVolume
metadata:
name: qwen-data-pv
namespace: kubeflow-user-example-com
spec:
capacity:
storage: 30Gi
volumeMode: Filesystem
accessModes:
- ReadWriteOnce
persistentVolumeReclaimPolicy: Retain
storageClassName: manual
hostPath:
path: "/data"
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: qwen-data-pvc
namespace: kubeflow-user-example-com
spec:
accessModes:
- ReadWriteOnce
volumeMode: Filesystem
resources:
requests:
storage: 30Gi
storageClassName: manual
代码准备

# /data/code/fast_finetune.py
import argparse
from transformers import (
AutoModelForCausalLM,
AutoTokenizer,
TrainingArguments,
Trainer,
DataCollatorForLanguageModeling
)
from peft import LoraConfig, get_peft_model, TaskType
from datasets import load_dataset
import torch
import os
def parse_args():
parser = argparse.ArgumentParser(description="Fine-tune a Qwen model using PEFT LoRA.")
parser.add_argument(
"--model_path",
type=str,
required=True,
help="Path to the base model directory.",
)
parser.add_argument(
"--data_path",
type=str,
required=True,
help="Path to the training data JSON file.",
)
parser.add_argument(
"--output_dir",
type=str,
required=True,
help="Directory to save the fine-tuned adapters and logs.",
)
# --- Optional Training Hyperparameters ---
parser.add_argument("--lora_r", type=int, default=8, help="LoRA r dimension.")
parser.add_argument("--lora_alpha", type=int, default=16, help="LoRA alpha.")
parser.add_argument("--lora_dropout", type=float, default=0.05, help="LoRA dropout.")
parser.add_argument("--target_modules", nargs='+', default=["q_proj", "v_proj"], help="Modules to apply LoRA to.")
parser.add_argument("--batch_size", type=int, default=2, help="Per device train batch size.")
parser.add_argument("--gradient_accumulation_steps", type=int, default=1, help="Gradient accumulation steps.")
parser.add_argument("--learning_rate", type=float, default=5e-5, help="Learning rate.")
parser.add_argument("--max_steps", type=int, default=10, help="Total number of training steps.")
parser.add_argument("--logging_steps", type=int, default=1, help="Log every X updates steps.")
parser.add_argument("--save_steps", type=int, default=5, help="Save checkpoint every X updates steps.")
parser.add_argument("--max_length", type=int, default=256, help="Max sequence length for tokenization.")
args = parser.parse_args()
return args
def preprocess_function(examples, tokenizer, max_length):
"""Preprocesses the data into Alpaca instruction format."""
texts = []
for instruction, input_text, output in zip(
examples["instruction"],
examples["input"],
examples["output"]
):
if input_text:
text = f"Instruction: {instruction}\nInput: {input_text}\nResponse: {output}"
else:
text = f"Instruction: {instruction}\nResponse: {output}"
# Append EOS token for training
texts.append(text + tokenizer.eos_token)
tokenized = tokenizer(
texts,
truncation=True,
max_length=max_length,
padding="max_length", # Pad to max_length
# return_tensors="pt" # Trainer handles tensor conversion
)
# Labels are the same as input_ids for language modeling
tokenized["labels"] = tokenized["input_ids"].copy()
return tokenized
def main():
args = parse_args()
print("--- Configuration ---")
print(f"Model Path: {args.model_path}")
print(f"Data Path: {args.data_path}")
print(f"Output Dir: {args.output_dir}")
print(f"Max Steps: {args.max_steps}")
print(f"Learning Rate: {args.learning_rate}")
print("--------------------")
# 1. Load model and tokenizer
print("Loading model and tokenizer...")
model = AutoModelForCausalLM.from_pretrained(
args.model_path,
device_map="auto", # Use available device (GPU preferred)
torch_dtype=torch.float16,
trust_remote_code=True
)
tokenizer = AutoTokenizer.from_pretrained(args.model_path, trust_remote_code=True)
if tokenizer.pad_token is None:
tokenizer.pad_token = tokenizer.eos_token # Set pad token if missing
print("Model and tokenizer loaded.")
# 2. Prepare data
print("Loading and preprocessing data...")
dataset = load_dataset("json", data_files=args.data_path, split="train")
dataset = dataset.map(
lambda examples: preprocess_function(examples, tokenizer, args.max_length),
batched=True,
remove_columns=["instruction", "input", "output"] # Remove original columns
)
print(f"Dataset loaded with {len(dataset)} examples.")
print(f"Sample preprocessed example: {tokenizer.decode(dataset[0]['input_ids'])}")
# 3. LoRA Configuration
print("Configuring PEFT LoRA...")
peft_config = LoraConfig(
r=args.lora_r,
lora_alpha=args.lora_alpha,
target_modules=args.target_modules,
lora_dropout=args.lora_dropout,
task_type=TaskType.CAUSAL_LM, # Explicitly set task type
inference_mode=False
)
# Apply PEFT to the model
model = get_peft_model(model, peft_config)
model.print_trainable_parameters() # Show trainable parameters
print("PEFT LoRA configured.")
# 4. Training Arguments
print("Setting training arguments...")
training_args = TrainingArguments(
output_dir=args.output_dir,
per_device_train_batch_size=args.batch_size,
gradient_accumulation_steps=args.gradient_accumulation_steps,
learning_rate=args.learning_rate,
max_steps=args.max_steps,
logging_steps=args.logging_steps,
save_steps=args.save_steps,
fp16=True,
optim="adamw_torch",
report_to="none",
remove_unused_columns=True,
save_total_limit=1, # Only keep the latest checkpoint
# load_best_model_at_end=True, # Optional: Load best model at end
# evaluation_strategy="steps", # If you have an eval dataset
# eval_steps=args.save_steps, # Evaluate at the same frequency as saving
)
print("Training arguments set.")
# 5. Data Collator
data_collator = DataCollatorForLanguageModeling(
tokenizer=tokenizer,
mlm=False # Causal LM, not Masked LM
)
# 6. Create Trainer
print("Creating Trainer instance...")
trainer = Trainer(
model=model,
args=training_args,
train_dataset=dataset,
# eval_dataset=eval_dataset, # Add if you have an eval dataset
data_collator=data_collator,
tokenizer=tokenizer # Pass tokenizer for padding/saving convenience
)
print("Trainer created.")
# 7. Start Training
print("--- Starting Training ---")
trainer.train()
print("--- Training Finished ---")
# 8. Save the final adapter
final_adapter_path = os.path.join(args.output_dir, "final_adapter")
print(f"Saving final adapter model to {final_adapter_path}...")
model.save_pretrained(final_adapter_path)
tokenizer.save_pretrained(final_adapter_path)
print("Final adapter model saved.")
if __name__ == "__main__":
main()
/data/code/specialized_alpaca.json
[
{
"instruction": "用海盗风格回答",
"input": "今天的天气怎么样?",
"output": "嘿嘿,今天的天空晴朗得像被我们洗劫过的甲板!太阳亮得晃眼,伙计!"
},
{
"instruction": "用海盗风格回答",
"input": "你最喜欢的食物是什么?",
"output": "当然是朗姆酒腌制的烤鱼啦!比那些陆地上的破面包强十倍,哈哈哈!"
},
{
"instruction": "用海盗风格回答",
"input": "如何学习编程?",
"output": "哟吼!学习编程就像征服七大洋,你需要:\n1. 找本'海盗法典'(编程书)\n2. 跟老船员学(找导师)\n3. 多打几场硬仗(多实践)\n记住,菜鸟永远当不了船长!"
},
{
"instruction": "用海盗风格回答",
"input": "给我讲个笑话",
"output": "为什么海盗不爱玩扑克?\n因为他们的船长总是站在甲板(deck)上!\n哈哈哈!(拍腿大笑)"
},
{
"instruction": "用海盗风格回答",
"input": "推荐一部电影",
"output": "当然是《加勒比海盗》啦!虽然那个杰克船长花里胡哨的,但抢船的手法还算专业!"
}
]
c) 模型训练环境镜像制作
FROM nvidia/cuda:12.1.1-devel-ubuntu22.04
ENV DEBIAN_FRONTEND=noninteractive
# Install Python and Pip
RUN apt-get update && \
apt-get install -y --no-install-recommends \
python3.10 python3-pip python3.10-venv git \
&& rm -rf /var/lib/apt/lists/*
# Create a virtual environment (optional but recommended)
RUN python3.10 -m venv /opt/venv
ENV PATH="/opt/venv/bin:$PATH"
# Upgrade pip and install dependencies
COPY requirements.txt /app/requirements.txt
RUN pip install --no-cache-dir --upgrade pip && \
pip install --no-cache-dir -r /app/requirements.txt
# Set working directory
WORKDIR /app
# Add user and switch - good practice to not run as root
RUN useradd --create-home appuser
USER appuser
# Entrypoint or CMD can be added if needed, but KFP often overrides it
# ENTRYPOINT ["python3"]
## 制作镜像和推送镜像到远端私有仓库 (阿里云个人仓库)
docker build -t registry.ap-southeast-5.aliyuncs.com/xxx/qwen-finetune:cu121 .
docker build -t registry.ap-southeast-5.aliyuncs.com/xxx/qwen-kserve:0.5b-cu121 .
依然是使用阿里云的私有镜像仓库。注意 push 的操作似乎是要收费的(ECS流量出去收费,进来不收费)。然后就是私有镜像仓库,相同区域 VPC 拉不收费。但是公布到公网被别的网络拉取可能是要收费。主要测试完毕了,及时把镜像设置为私有。
d) pipelines准备
# pipeline.py
import kfp
from kfp import dsl
from kfp.dsl import Input, Output, Dataset, Model, Artifact # Import Artifact
from typing import List
from kfp import kubernetes
# ---- Pipeline Configuration ----
FINETUNE_IMAGE = 'registry.ap-southeast-5.aliyuncs.com/xxx/qwen-finetune:cu121'
PVC_NAME = 'qwen-data-pvc'
VOLUME_MOUNT_PATH = '/data'
# ---- Fine-tuning Component Definition ----
@dsl.component(
base_image=FINETUNE_IMAGE,
)
def finetune_qwen_component(
script_path_in_volume: str, # e.g., 'code/fast_finetune.py'
base_model_path_in_volume: str, # e.g., 'models/qwen2.5-1.5b'
data_file_path_in_volume: str, # e.g., 'code/specialized_alpaca.json'
output_dir_in_volume: str, # e.g., 'output'
trained_adapter: Output[Dataset],
max_steps: int = 100,
learning_rate: float = 2e-5,
batch_size: int = 1,
lora_r: int = 8,
lora_alpha: int = 16,
target_modules: List[str] = ["q_proj", "v_proj"],
):
"""
Executes the parameterized Qwen fine-tuning script.
Outputs the path to the final adapter as a KFP artifact.
"""
import subprocess
import os
import sys # To ensure python executable is found correctly
# Construct absolute paths inside the container based on the mount point
script_abs_path = os.path.join("/data", script_path_in_volume)
model_abs_path = os.path.join("/data", base_model_path_in_volume)
data_abs_path = os.path.join("/data", data_file_path_in_volume)
output_abs_path = os.path.join("/data", output_dir_in_volume)
# Create the output directory if it doesn't exist
os.makedirs(output_abs_path, exist_ok=True)
print(f"Executing script: {script_abs_path}")
print(f"Using base model: {model_abs_path}")
print(f"Using data file: {data_abs_path}")
print(f"Saving output to: {output_abs_path}")
print(f"KFP Artifact Path (before training): {trained_adapter.path}") # KFP provides this path
# Get the python executable path correctly
python_executable = sys.executable
cmd = [
python_executable, # Use the python from the environment
script_abs_path,
'--model_path', model_abs_path,
'--data_path', data_abs_path,
'--output_dir', output_abs_path, # Script saves intermediate checkpoints here
'--max_steps', str(max_steps),
'--learning_rate', str(learning_rate),
'--batch_size', str(batch_size),
'--lora_r', str(lora_r),
'--lora_alpha', str(lora_alpha),
'--target_modules', *target_modules,
]
print(f"Executing command: {' '.join(cmd)}")
# Execute the script
process = subprocess.run(cmd, capture_output=True, text=True, check=False)
print("---- Script Standard Output ----")
print(process.stdout)
print("---- Script Standard Error ----")
print(process.stderr)
if process.returncode != 0:
raise RuntimeError(f"Fine-tuning script failed with return code {process.returncode}")
else:
print("Fine-tuning script completed successfully.")
# --- Copy final adapter to KFP artifact path ---
final_adapter_in_output = os.path.join(output_abs_path, "final_adapter")
if os.path.exists(final_adapter_in_output):
print(f"Copying final adapter from {final_adapter_in_output} to KFP artifact path {trained_adapter.path}")
# Use copytree to copy the entire directory content
from shutil import copytree, rmtree
# KFP artifact path might exist, remove it first if necessary
if os.path.exists(trained_adapter.path):
rmtree(trained_adapter.path) # Remove existing directory/file
copytree(final_adapter_in_output, trained_adapter.path) # Copy the adapter directory
print("Adapter copied to KFP artifact path successfully.")
# Add metadata to the artifact (optional)
trained_adapter.metadata['base_model'] = base_model_path_in_volume
trained_adapter.metadata['data_file'] = data_file_path_in_volume
trained_adapter.metadata['max_steps'] = max_steps
else:
print(f"Warning: Final adapter directory not found at {final_adapter_in_output}. Cannot copy to KFP artifact.")
# ---- Pipeline Definition ----
@dsl.pipeline(
name='Qwen 1.5B Fine-tune Pipeline (Optimized)',
description='Fine-tunes Qwen 1.5B using PEFT LoRA with parameterized paths and outputs.'
)
def qwen_finetune_pipeline_optimized(
# --- Pipeline Parameters ---
script_path: str = 'code/fast_finetune.py',
base_model_path: str = 'models/qwen2.5-1.5b',
data_file: str = 'code/specialized_alpaca.json',
output_dir: str = 'output',
# Hyperparameters exposed at pipeline level
max_steps: int = 100,
learning_rate: float = 2e-5,
batch_size: int = 1,
):
# --- Run the Fine-tuning Component ---
finetune_task = finetune_qwen_component(
script_path_in_volume=script_path,
base_model_path_in_volume=base_model_path,
data_file_path_in_volume=data_file,
output_dir_in_volume=output_dir,
max_steps=max_steps,
learning_rate=learning_rate,
batch_size=batch_size,
).set_display_name("Fine-tune Qwen 1.5B")
kubernetes.mount_pvc(
task=finetune_task,
pvc_name=PVC_NAME,
mount_path="/data"
)
# --- Request GPU Resources ---
finetune_task.set_gpu_limit(1) # Request 1 GPU
# ---- Compile the Pipeline ----
if __name__ == '__main__':
# Ensure you are in an environment with kfp installed
kfp.compiler.Compiler().compile(
pipeline_func=qwen_finetune_pipeline_optimized,
package_path='qwen_finetune_pipeline_optimized.yaml' # Output file name
)
print("Optimized pipeline compiled to qwen_finetune_pipeline_optimized.yaml")
转换成 pipe yaml
python pipelines.py
手动在控制台上传 yaml pipelines 模块就可以了。
e) 开始训练



f) 直接在 notebook 中测试




测试代码
from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline
from peft import PeftModel
import torch
# 加载原始模型
print("加载原始模型...")
base_model = AutoModelForCausalLM.from_pretrained(
"/data/models/qwen2.5-1.5b",
device_map="auto",
torch_dtype=torch.float16,
trust_remote_code=True
)
tokenizer = AutoTokenizer.from_pretrained(
"/data/models/qwen2.5-1.5b",
trust_remote_code=True
)
orig_pipe = pipeline("text-generation", model=base_model, tokenizer=tokenizer)
# 加载微调模型
print("加载微调模型...")
finetuned_model = PeftModel.from_pretrained(base_model, "/data/output/run1//final_adapter")
finetuned_model = finetuned_model.merge_and_unload()
ft_pipe = pipeline("text-generation", model=finetuned_model, tokenizer=tokenizer)
# 测试案例
test_cases = [
"今天的天气怎么样?",
"你最喜欢的食物是什么?",
"如何学习编程?",
"给我讲个笑话",
"推荐一部电影",
"Python是最好的语言吗?" # 未在训练中出现的问题
]
for question in test_cases:
prompt = f"Instruction: 用海盗风格回答\nInput: {question}\nResponse:"
print(f"\n{'='*50}")
print(f"问题: {question}")
# 原始模型
orig_output = orig_pipe(
prompt,
do_sample=True
)[0]['generated_text'].split("Response:")[1].strip()
print(f"\n[原始模型]\n{orig_output}")
# 微调模型
ft_output = ft_pipe(
prompt,
do_sample=True
)[0]['generated_text'].split("Response:")[1].strip()
print(f"\n[微调模型]\n{ft_output}")
在容器里面测试:

在 notebook 中测试


可以看到顺利的加载了模型并且,回复了预期的数据内容。
更多推荐



所有评论(0)