#!/bin/bash
scripts_dir=$(realpath $(dirname $0))
root_dir=$(realpath $(dirname ${scripts_dir}))
save_value="./resume/save_ckpt/"
resume_iteration=-1
status_dir="./resume/status"
org_infer_model_path=""
source ${scripts_dir}/base/envs.sh
source ${scripts_dir}/base/utils.sh
parse_base_conf
export RESUME_TRAIN_CONF_NAME=${TRAIN_CONF_NAME}_resume
export RESUME_INFER_CONF_NAME=${INFER_CONF_NAME}_resume
export START_RESUME_FLAG=true
log_info "============================================================"
log_info "Start exec resume training with checkpoint..."
log_info "[MONITOR_CMD]: ${MONITOR_CMD}"
log_info "[TRAIN_CONF_NAME]: ${TRAIN_CONF_NAME}"
log_info "[INFER_CONF_NAME]: ${INFER_CONF_NAME}"
log_info "[RESUME_TRAIN_CONF_NAME]: ${RESUME_TRAIN_CONF_NAME}"
log_info "[RESUME_INFER_CONF_NAME]: ${RESUME_INFER_CONF_NAME}"
log_info "[MAX_RETRIES]: ${MAX_RETRIES}"
log_info "============================================================"
function clean_old_process()
{
log_info "terminating remaining processes ..."
ray stop --force
ps -ef | grep "python"| grep -v grep | awk '{print $2}' | xargs -t -i kill -9 {};pkill -9 python; pkill -9 torchrun;
ps -ef | grep "defunct"|grep python| awk '{print $3}'|xargs -t -i kill -9 {};ps -ef | grep "defunct"|grep torchrun| awk '{print $3}'|xargs -t -i kill -9 {}
}
function clean_all_checkpoints()
{
local save_checkpoint_dir="$1"
if [ -z "${save_checkpoint_dir}" ]; then
log_warn "save dir ${save_checkpoint_dir} not config"
return
fi
if [ ! -d "${save_checkpoint_dir}" ]; then
log_warn "dir ${save_checkpoint_dir} does not exist."
return
fi
log_info "cleaning checkpoint directory: ${save_checkpoint_dir}"
rm -rf "${save_checkpoint_dir:?}"/*
log_info "checkpoint directory clean succeed"
}
exec_cmd_count=1
train_yaml_file="${root_dir}/configs/train/${TRAIN_CONF_NAME}.yaml"
train_resume_yaml_file="${root_dir}/configs/train/${RESUME_TRAIN_CONF_NAME}.yaml"
infer_yaml_file="${root_dir}/configs/infer/${INFER_CONF_NAME}.yaml"
infer_resume_yaml_file="${root_dir}/configs/infer/${RESUME_INFER_CONF_NAME}.yaml"
function generate_new_train_yaml()
{
if [[ "${exec_cmd_count}" -gt 1 ]]; then
return
fi
touch ${train_resume_yaml_file}
cp -f ${train_yaml_file} ${train_resume_yaml_file}
save_value=$(get_verl_conf_val "${train_resume_yaml_file}" "default_local_dir")
log_info "found default_local_dir: $save_value"
if [[ "${CLEAN_OLD_CKPT}" -eq 1 ]]; then
log_info "starting to clean all checkpoints from ${save_value} ..."
clean_all_checkpoints "${save_value}"
fi
}
is_yaml_replaced=0
function modify_train_reumse_mode()
{
if [ "${exec_cmd_count}" -lt 2 ] || [ "$is_yaml_replaced" -eq 1 ]; then
return
fi
if [[ ! -f "${save_value}/latest_checkpointed_iteration.txt" ]]; then
log_info "latest_checkpointed_iteration.txt does not exist under ${save_value}, cannot resume training"
return
fi
org_resume_mode=$(get_verl_conf_val "${train_resume_yaml_file}" "resume_mode")
log_info "original resume_mode: ${org_resume_mode}"
replace_verl_conf_val "${train_resume_yaml_file}" "resume_mode" "auto"
if [ $? -ne 0 ]; then
log_error "replace yaml config failed, exit train"
exit 1
fi
is_yaml_replaced=1
}
function modify_train_yaml()
{
if [[ "$VC_TASK_INDEX" -ne "$MASTER_TRAIN_INDEX" ]]; then
return
fi
generate_new_train_yaml
modify_train_reumse_mode
}
function generate_new_infer_yaml()
{
if [[ "${exec_cmd_count}" -gt 1 ]]; then
return
fi
touch ${infer_resume_yaml_file}
cp -f ${infer_yaml_file} ${infer_resume_yaml_file}
org_infer_model_path=$(get_verl_conf_val "${infer_resume_yaml_file}" "infer_model_path")
log_info "found original infer_model_path: $org_infer_model_path"
sed -i -e '$a\' -e 'if_waiting: true' -e '$a\' "${infer_resume_yaml_file}"
get_verl_conf_val "${infer_resume_yaml_file}" "if_waiting"
}
function modify_infer_model_path()
{
if [[ "${resume_iteration}" -lt 1 ]]; then
replace_verl_conf_val "${infer_resume_yaml_file}" "if_waiting" "false"
return
fi
converted_infer_model_path="${root_dir}/resume/resume_hf_path/$(basename ${org_infer_model_path%/})_resume_${resume_iteration}"
if [[ ! -d "${converted_infer_model_path}" ]] || [[ ! -f "${converted_infer_model_path}/convert_done" ]]; then
if [[ -d "${converted_infer_model_path}" ]]; then
log_info "convert_done not found, removing incomplete directory..."
rm -rf "${converted_infer_model_path}"
fi
ckpt_path="${save_value}/global_step_${resume_iteration}"
bash ${scripts_dir}/base/verl_merge.sh $ckpt_path $converted_infer_model_path
exit_code=$?
if [ $exit_code -ne 0 ]; then
log_error "Failed merge verl weights for vllm..."
exit $exit_code
fi
touch "${converted_infer_model_path}/convert_done"
else
log_info "infer weights already converted in ${converted_infer_model_path}, skipping..."
fi
replace_verl_conf_val "${infer_resume_yaml_file}" "infer_model_path" "${converted_infer_model_path}"
replace_verl_conf_val "${infer_resume_yaml_file}" "if_waiting" "false"
}
function modify_infer_yaml()
{
if [[ "$VC_TASK_INDEX" -ne "$MASTER_TRAIN_INDEX" ]]; then
while true; do
if grep -q "if_waiting: false" "$infer_resume_yaml_file" 2>/dev/null; then
log_info "Master node finished weight conversion, stop waiting."
break
else
log_info "Waiting for master node to finish inference weight conversion..."
sleep 60
fi
done
return
fi
generate_new_infer_yaml
modify_infer_model_path
}
function get_resume_iteration()
{
if [[ "${exec_cmd_count}" -eq 1 ]]; then
resume_mode=$(get_verl_conf_val "${train_resume_yaml_file}" "resume_mode")
if [[ "${resume_mode}" == "resume_path" ]]; then
resume_from_path=$(get_verl_conf_val "${train_resume_yaml_file}" "resume_from_path")
log_info "resume_path | found resume_from_path: $resume_from_path"
resume_iteration=$(grep_global_step_from_path $resume_from_path)
if [ -z "$resume_iteration" ]; then
log_info "resume_from_path is invalid: $resume_from_path"
exit 1
fi
export RESUME_ITERATION=$resume_iteration
log_info "resume_path | resume iteration:=${resume_iteration}, export RESUME_ITERATION=${RESUME_ITERATION}"
echo "ready resume_path ${exec_cmd_count} ${resume_iteration}" >> "${status_dir}/node_${VC_TASK_INDEX}.status"
fi
return
fi
resume_iteration=0
if [[ "$VC_TASK_INDEX" == "$MASTER_TRAIN_INDEX" ]]; then
if [ $is_yaml_replaced -eq 1 ]; then
resume_iteration=$(cat ${save_value}/latest_checkpointed_iteration.txt | tr -d ' \t\n\r')
fi
else
while true; do
line=$(grep "ready ${exec_cmd_count}" "${status_dir}/node_${MASTER_TRAIN_INDEX}.status" 2>/dev/null)
if [[ -n "$line" ]]; then
resume_iteration=$(echo "$line" | awk '{print $NF}')
log_info "Detected master ready with resume_iteration=$resume_iteration"
break
fi
sleep 10
done
fi
export RESUME_ITERATION=${resume_iteration}
log_info "resume iteration: ${resume_iteration}, export RESUME_ITERATION=${RESUME_ITERATION}"
echo "ready ${exec_cmd_count} ${resume_iteration}" >> "${status_dir}/node_${VC_TASK_INDEX}.status"
}
function resume_train()
{
clean_old_process
exec_cmd_count=$((exec_cmd_count + 1))
if [[ "$VC_TASK_INDEX" == "$MASTER_TRAIN_INDEX" ]]; then
log_warn "waiting for 5 min before restarting ..."
sleep 5m
log_info "update if_waiting to true in $infer_resume_yaml_file before next resume ..."
sed -i 's/^if_waiting:.*/if_waiting: true/' "$infer_resume_yaml_file"
else
log_warn "waiting for 4 min before restarting ..."
sleep 4m
fi
}
function init_group_status()
{
if [[ "$VC_TASK_INDEX" == "$MASTER_TRAIN_INDEX" ]]; then
if [[ -d "${status_dir}" ]]; then
rm -f "${status_dir}"/*.status
log_info "status_dir already exists, cleared old .status files"
else
mkdir -p "${status_dir}"
log_info "created status_dir ${status_dir}"
fi
rm -f ${train_resume_yaml_file}
rm -f ${infer_resume_yaml_file}
else
while true; do
if [[ ! -s "${status_dir}/node_${VC_TASK_INDEX}.status" ]]; then
break
fi
sleep 10
done
fi
echo "init" >> "${status_dir}/node_${VC_TASK_INDEX}.status"
}
function clean_old_files()
{
infer_dir="${scripts_dir}/infer"
if [[ ! -d "${infer_dir}/conf_for_train" ]]; then
return
fi
rm -f ${infer_dir}/conf_for_train/config_done
rm -f ${infer_dir}/conf_for_train/prefill_server_list
rm -f ${infer_dir}/conf_for_train/decode_server_list
rm -f ${infer_dir}/conf_for_train/tensor_parallel_size
rm -f ${infer_dir}/conf_for_train/data_parallel_size
rm -f ${infer_dir}/conf_for_train/enable_expert_parallel
rm -f ${infer_dir}/conf_for_train/vllm_version
}
function main()
{
log_info "train process begin!!!"
init_group_status
while [ ${exec_cmd_count} -lt ${MAX_RETRIES} ]; do
log_info "execute [${MONITOR_CMD}], times: ${exec_cmd_count}"
if [[ "$VC_TASK_INDEX" -eq 0 ]]; then
clean_old_files
echo "clean ${exec_cmd_count}" >> "${status_dir}/node_${VC_TASK_INDEX}.status"
else
while true; do
if grep -q "clean ${exec_cmd_count}" ${status_dir}/node_0.status 2>/dev/null; then
log_info "Detected infer master has cleaned the old configs ..."
break
fi
sleep 5
done
fi
modify_train_yaml
get_resume_iteration
modify_infer_yaml
bash ${scripts_dir}/${MONITOR_CMD} &
main_pid=$!
(
while true; do
if grep -q "fail ${exec_cmd_count}" ${status_dir}/*.status 2>/dev/null; then
log_error "Detected another node failed, killing main process..."
kill -9 $main_pid
exit 1
fi
sleep 120
done
) &
wait $main_pid
exit_code=$?
if [ $exit_code -eq 0 ]; then
log_info "script terminated successfully (code: $exit_code)"
echo "ok ${exec_cmd_count}" >> "${status_dir}/node_${VC_TASK_INDEX}.status"
break
else
log_error "script abnormal exit (code: $exit_code)"
echo "fail ${exec_cmd_count}" >> "${status_dir}/node_${VC_TASK_INDEX}.status"
resume_train
fi
done
log_info "train process complete!!!"
}
main