diff --git a/services/cloudbrain/cloudbrainTask/train.go b/services/cloudbrain/cloudbrainTask/train.go index 5e2cce1ff..c3eb256f2 100644 --- a/services/cloudbrain/cloudbrainTask/train.go +++ b/services/cloudbrain/cloudbrainTask/train.go @@ -13,6 +13,8 @@ import ( "strconv" "strings" + "code.gitea.io/gitea/modules/urfs_client/urchin" + "code.gitea.io/gitea/modules/timeutil" "code.gitea.io/gitea/modules/notification" @@ -382,7 +384,7 @@ func GrampusTrainJobGpuCreate(ctx *context.Context, option api.CreateTrainJobOpt //prepare command preTrainModelPath := getPreTrainModelPath(option.PreTrainModelUrl, option.CkptName) - command, err := generateCommand(repo.Name, grampus.ProcessorTypeGPU, codeMinioPath+cloudbrain.DefaultBranchName+".zip", datasetRemotePath, bootFile, params, setting.CBCodePathPrefix+jobName+cloudbrain.ModelMountPath+"/", allFileName, preTrainModelPath, option.CkptName) + command, err := generateCommand(repo.Name, grampus.ProcessorTypeGPU, codeMinioPath+cloudbrain.DefaultBranchName+".zip", datasetRemotePath, bootFile, params, setting.CBCodePathPrefix+jobName+cloudbrain.ModelMountPath+"/", allFileName, preTrainModelPath, option.CkptName, "") if err != nil { log.Error("Failed to generateCommand: %s (%v)", displayJobName, err, ctx.Data["MsgID"]) ctx.JSON(http.StatusOK, models.BaseErrorMessageApi("Create task failed, internal error")) @@ -611,7 +613,7 @@ func GrampusTrainJobNpuCreate(ctx *context.Context, option api.CreateTrainJobOpt //prepare command preTrainModelPath := getPreTrainModelPath(option.PreTrainModelUrl, option.CkptName) - command, err := generateCommand(repo.Name, grampus.ProcessorTypeNPU, codeObsPath+cloudbrain.DefaultBranchName+".zip", datasetRemotePath, bootFile, params, setting.CodePathPrefix+jobName+modelarts.OutputPath, allFileName, preTrainModelPath, option.CkptName) + command, err := generateCommand(repo.Name, grampus.ProcessorTypeNPU, codeObsPath+cloudbrain.DefaultBranchName+".zip", datasetRemotePath, bootFile, params, setting.CodePathPrefix+jobName+modelarts.OutputPath, allFileName, preTrainModelPath, option.CkptName, grampus.GetNpuModelRemoteObsUrl(jobName)) if err != nil { log.Error("Failed to generateCommand: %s (%v)", displayJobName, err, ctx.Data["MsgID"]) @@ -862,15 +864,18 @@ func getPreTrainModelPath(pretrainModelDir string, fileName string) string { } -func generateCommand(repoName, processorType, codeRemotePath, dataRemotePath, bootFile, paramSrc, outputRemotePath, datasetName, pretrainModelPath, pretrainModelFileName string) (string, error) { +func generateCommand(repoName, processorType, codeRemotePath, dataRemotePath, bootFile, paramSrc, outputRemotePath, datasetName, pretrainModelPath, pretrainModelFileName, modelRemoteObsUrl string) (string, error) { var command string + //prepare workDir := grampus.NpuWorkDir - if processorType == grampus.ProcessorTypeGPU { + if processorType == grampus.ProcessorTypeNPU { + command += "pwd;cd " + workDir + grampus.CommandPrepareScriptNpu + } else if processorType == grampus.ProcessorTypeGPU { workDir = grampus.GpuWorkDir + command += "pwd;cd " + workDir + fmt.Sprintf(grampus.CommandPrepareScriptGpu, setting.Grampus.SyncScriptProject, setting.Grampus.SyncScriptProject) } - command += "pwd;cd " + workDir + fmt.Sprintf(grampus.CommandPrepareScript, setting.Grampus.SyncScriptProject, setting.Grampus.SyncScriptProject) //download code & dataset if processorType == grampus.ProcessorTypeNPU { //no need to download code & dataset by internet @@ -885,7 +890,7 @@ func generateCommand(repoName, processorType, codeRemotePath, dataRemotePath, bo //no need to process } else if processorType == grampus.ProcessorTypeGPU { unZipDatasetCommand := generateDatasetUnzipCommand(datasetName) - commandUnzip := "cd " + workDir + "code;unzip -q master.zip;echo \"start to unzip dataset\";cd " + workDir + "dataset;" + unZipDatasetCommand + commandUnzip := "cd " + workDir + "code;unzip -q master.zip;rm -f master.zip;echo \"start to unzip dataset\";cd " + workDir + "dataset;" + unZipDatasetCommand command += commandUnzip } @@ -919,7 +924,8 @@ func generateCommand(repoName, processorType, codeRemotePath, dataRemotePath, bo var commandCode string if processorType == grampus.ProcessorTypeNPU { - commandCode = "/bin/bash /home/work/run_train_for_openi.sh /home/work/openi.py /tmp/log/train.log" + paramCode + ";" + paramCode += " --model_url=" + modelRemoteObsUrl + commandCode = "/bin/bash /home/work/run_train_for_openi.sh /home/work/openi.py " + grampus.NpuLocalLogUrl + paramCode + ";" } else if processorType == grampus.ProcessorTypeGPU { if pretrainModelFileName != "" { paramCode += " --ckpt_url" + "=" + workDir + "pretrainmodel/" + pretrainModelFileName @@ -935,8 +941,7 @@ func generateCommand(repoName, processorType, codeRemotePath, dataRemotePath, bo //upload models if processorType == grampus.ProcessorTypeNPU { - commandUpload := "cd " + workDir + setting.Grampus.SyncScriptProject + "/;./uploader_for_npu " + setting.Bucket + " " + outputRemotePath + " " + workDir + "output/;" - command += commandUpload + // no need to upload } else if processorType == grampus.ProcessorTypeGPU { commandUpload := "cd " + workDir + setting.Grampus.SyncScriptProject + "/;./uploader_for_gpu " + setting.Grampus.Env + " " + outputRemotePath + " " + workDir + "output/;" command += commandUpload @@ -948,7 +953,6 @@ func generateCommand(repoName, processorType, codeRemotePath, dataRemotePath, bo return command, nil } - func processPretrainModelParameter(pretrainModelPath string, pretrainModelFileName string, commandDownload string) string { commandDownloadTemp := commandDownload if pretrainModelPath != "" { @@ -1083,6 +1087,11 @@ func SyncTaskStatus(task *models.Cloudbrain) error { task.CorrectCreateUnix() if oldStatus != task.Status { notification.NotifyChangeCloudbrainStatus(task, oldStatus) + if models.IsTrainJobTerminal(task.Status) && task.ComputeResource == models.NPUResource { + if len(result.JobInfo.Tasks[0].CenterID) == 1 { + urchin.GetBackNpuModel(task.ID, grampus.GetRemoteEndPoint(result.JobInfo.Tasks[0].CenterID[0]), grampus.BucketRemote, grampus.GetNpuModelObjectKey(task.JobName), grampus.GetCenterProxy(setting.Grampus.LocalCenterID)) + } + } } err = models.UpdateJob(task) if err != nil {