| @@ -205,6 +205,7 @@ type Cloudbrain struct { | |||||
| BenchmarkTypeRankLink string `xorm:"-"` | BenchmarkTypeRankLink string `xorm:"-"` | ||||
| StartTime timeutil.TimeStamp | StartTime timeutil.TimeStamp | ||||
| EndTime timeutil.TimeStamp | EndTime timeutil.TimeStamp | ||||
| Cleared bool `xorm:"DEFAULT false"` | |||||
| Spec *Specification `xorm:"-"` | Spec *Specification `xorm:"-"` | ||||
| } | } | ||||
| @@ -2116,6 +2117,65 @@ func GetCloudBrainUnStoppedJob() ([]*Cloudbrain, error) { | |||||
| Find(&cloudbrains) | Find(&cloudbrains) | ||||
| } | } | ||||
| func GetCloudBrainOneStoppedJobDaysAgo(days int, limit int) ([]*Cloudbrain, error) { | |||||
| cloudbrains := make([]*Cloudbrain, 0, 10) | |||||
| endTimeBefore := time.Now().Unix() - int64(days)*24*3600 | |||||
| missEndTimeBefore := endTimeBefore - 24*3600 | |||||
| return cloudbrains, x.Cols("id,job_name,job_id"). | |||||
| In("status", | |||||
| JobStopped, JobSucceeded, JobFailed, ModelArtsCreateFailed, ModelArtsStartFailed, ModelArtsUnavailable, ModelArtsResizFailed, ModelArtsDeleted, | |||||
| ModelArtsStopped, ModelArtsTrainJobCanceled, ModelArtsTrainJobCheckFailed, ModelArtsTrainJobCompleted, ModelArtsTrainJobDeleteFailed, ModelArtsTrainJobDeployServiceFailed, | |||||
| ModelArtsTrainJobFailed, ModelArtsTrainJobImageFailed, ModelArtsTrainJobKilled, ModelArtsTrainJobLost, ModelArtsTrainJobSubmitFailed, ModelArtsTrainJobSubmitModelFailed). | |||||
| Where("((end_time is null and created_unix<?) or end_time<?) and cleared=false and type=0", missEndTimeBefore, endTimeBefore). | |||||
| Limit(limit). | |||||
| Find(&cloudbrains) | |||||
| } | |||||
| func UpdateCloudBrainRecordsCleared(ids []int64) error { | |||||
| pageSize := 150 | |||||
| n := len(ids) / pageSize | |||||
| var err error | |||||
| for i := 1; i <= n+1; i++ { | |||||
| tempIds := getPageIds(ids, i, pageSize) | |||||
| if len(tempIds) > 0 { | |||||
| idsIn := "" | |||||
| for i, id := range tempIds { | |||||
| if i == 0 { | |||||
| idsIn += strconv.FormatInt(id, 10) | |||||
| } else { | |||||
| idsIn += "," + strconv.FormatInt(id, 10) | |||||
| } | |||||
| } | |||||
| _, errTemp := x.Unscoped().Exec("update cloudbrain set cleared=true where id in (" + idsIn + ")") | |||||
| if errTemp != nil { | |||||
| err = errTemp | |||||
| } | |||||
| } | |||||
| } | |||||
| return err | |||||
| } | |||||
| func getPageIds(ids []int64, page int, pagesize int) []int64 { | |||||
| begin := (page - 1) * pagesize | |||||
| end := (page) * pagesize | |||||
| if begin > len(ids)-1 { | |||||
| return []int64{} | |||||
| } | |||||
| if end > len(ids)-1 { | |||||
| return ids[begin:] | |||||
| } else { | |||||
| return ids[begin:end] | |||||
| } | |||||
| } | |||||
| func GetStoppedJobWithNoDurationJob() ([]*Cloudbrain, error) { | func GetStoppedJobWithNoDurationJob() ([]*Cloudbrain, error) { | ||||
| cloudbrains := make([]*Cloudbrain, 0) | cloudbrains := make([]*Cloudbrain, 0) | ||||
| return cloudbrains, x. | return cloudbrains, x. | ||||
| @@ -5,10 +5,13 @@ | |||||
| package cron | package cron | ||||
| import ( | import ( | ||||
| "code.gitea.io/gitea/modules/urfs_client/urchin" | |||||
| "code.gitea.io/gitea/modules/setting" | |||||
| "context" | "context" | ||||
| "time" | "time" | ||||
| "code.gitea.io/gitea/modules/urfs_client/urchin" | |||||
| cloudbrainService "code.gitea.io/gitea/services/cloudbrain" | |||||
| "code.gitea.io/gitea/modules/modelarts" | "code.gitea.io/gitea/modules/modelarts" | ||||
| "code.gitea.io/gitea/services/cloudbrain/resource" | "code.gitea.io/gitea/services/cloudbrain/resource" | ||||
| "code.gitea.io/gitea/services/reward" | "code.gitea.io/gitea/services/reward" | ||||
| @@ -190,6 +193,17 @@ func registerHandleRepoAndUserStatistic() { | |||||
| }) | }) | ||||
| } | } | ||||
| func registerHandleClearCloudbrainResult() { | |||||
| RegisterTaskFatal("handle_cloudbrain_one_result_clear", &BaseConfig{ | |||||
| Enabled: true, | |||||
| RunAtStart: setting.ClearStrategy.RunAtStart, | |||||
| Schedule: setting.ClearStrategy.Cron, | |||||
| }, func(ctx context.Context, _ *models.User, _ Config) error { | |||||
| cloudbrainService.ClearCloudbrainResultSpace() | |||||
| return nil | |||||
| }) | |||||
| } | |||||
| func registerHandleSummaryStatistic() { | func registerHandleSummaryStatistic() { | ||||
| RegisterTaskFatal("handle_summary_statistic", &BaseConfig{ | RegisterTaskFatal("handle_summary_statistic", &BaseConfig{ | ||||
| Enabled: true, | Enabled: true, | ||||
| @@ -306,6 +320,7 @@ func initBasicTasks() { | |||||
| registerHandleRepoAndUserStatistic() | registerHandleRepoAndUserStatistic() | ||||
| registerHandleSummaryStatistic() | registerHandleSummaryStatistic() | ||||
| registerHandleClearCloudbrainResult() | |||||
| registerSyncCloudbrainStatus() | registerSyncCloudbrainStatus() | ||||
| registerHandleOrgStatistic() | registerHandleOrgStatistic() | ||||
| @@ -317,6 +332,6 @@ func initBasicTasks() { | |||||
| registerHandleModelSafetyTask() | registerHandleModelSafetyTask() | ||||
| registerHandleScheduleRecord() | |||||
| registerHandleScheduleRecord() | |||||
| registerHandleCloudbrainDurationStatistic() | registerHandleCloudbrainDurationStatistic() | ||||
| } | } | ||||
| @@ -519,6 +519,7 @@ var ( | |||||
| CullIdleTimeout string | CullIdleTimeout string | ||||
| CullInterval string | CullInterval string | ||||
| //benchmark config | //benchmark config | ||||
| IsBenchmarkEnabled bool | IsBenchmarkEnabled bool | ||||
| BenchmarkOwner string | BenchmarkOwner string | ||||
| @@ -613,6 +614,15 @@ var ( | |||||
| UsageRateBeginTime string | UsageRateBeginTime string | ||||
| }{} | }{} | ||||
| ClearStrategy= struct { | |||||
| Enabled bool | |||||
| ResultSaveDays int | |||||
| BatchSize int | |||||
| TrashSaveDays int | |||||
| Cron string | |||||
| RunAtStart bool | |||||
| }{} | |||||
| C2NetInfos *C2NetSqInfos | C2NetInfos *C2NetSqInfos | ||||
| CenterInfos *AiCenterInfos | CenterInfos *AiCenterInfos | ||||
| C2NetMapInfo map[string]*C2NetSequenceInfo | C2NetMapInfo map[string]*C2NetSequenceInfo | ||||
| @@ -1619,6 +1629,7 @@ func NewContext() { | |||||
| getModelConvertConfig() | getModelConvertConfig() | ||||
| getModelSafetyConfig() | getModelSafetyConfig() | ||||
| getModelAppConfig() | getModelAppConfig() | ||||
| getClearStrategy() | |||||
| } | } | ||||
| func getModelSafetyConfig() { | func getModelSafetyConfig() { | ||||
| @@ -1679,6 +1690,17 @@ func getModelartsCDConfig() { | |||||
| getNotebookFlavorInfos() | getNotebookFlavorInfos() | ||||
| } | } | ||||
| func getClearStrategy(){ | |||||
| sec := Cfg.Section("clear_strategy") | |||||
| ClearStrategy.Enabled=sec.Key("ENABLED").MustBool(false) | |||||
| ClearStrategy.ResultSaveDays=sec.Key("RESULT_SAVE_DAYS").MustInt(30) | |||||
| ClearStrategy.BatchSize=sec.Key("BATCH_SIZE").MustInt(500) | |||||
| ClearStrategy.TrashSaveDays=sec.Key("TRASH_SAVE_DAYS").MustInt(90) | |||||
| ClearStrategy.Cron=sec.Key("CRON").MustString("* 0,30 2-8 * * ? *") | |||||
| ClearStrategy.RunAtStart=sec.Key("RUN_AT_START").MustBool(false) | |||||
| } | |||||
| func getGrampusConfig() { | func getGrampusConfig() { | ||||
| sec := Cfg.Section("grampus") | sec := Cfg.Section("grampus") | ||||
| @@ -3248,6 +3248,7 @@ specification = specification | |||||
| select_specification = select specification | select_specification = select specification | ||||
| description = description | description = description | ||||
| wrong_specification=You cannot use this specification, please choose another item. | wrong_specification=You cannot use this specification, please choose another item. | ||||
| result_cleared=The files of the task have been cleared, can not restart any more, please create a new debug task instead. | |||||
| resource_use=Resource Occupancy | resource_use=Resource Occupancy | ||||
| job_name_rule = Please enter letters, numbers, _ and - up to 64 characters and cannot end with a dash (-). | job_name_rule = Please enter letters, numbers, _ and - up to 64 characters and cannot end with a dash (-). | ||||
| @@ -3268,6 +3268,8 @@ card_duration = 运行卡时 | |||||
| card_type = 卡类型 | card_type = 卡类型 | ||||
| wrong_specification=您目前不能使用这个资源规格,请选择其他资源规格。 | wrong_specification=您目前不能使用这个资源规格,请选择其他资源规格。 | ||||
| result_cleared=本任务的文件已被清理,无法再次调试,请新建调试任务。 | |||||
| job_name_rule = 请输入字母、数字、_和-,最长64个字符,且不能以中划线(-)结尾。 | job_name_rule = 请输入字母、数字、_和-,最长64个字符,且不能以中划线(-)结尾。 | ||||
| train_dataset_path_rule = 数据集位置存储在运行参数 <strong style="color:#010101">data_url</strong> 中,预训练模型存放在运行参数 <strong style="color:#010101">ckpt_url</strong> 中,训练输出路径存储在运行参数 <strong style="color:#010101">train_url</strong> 中。 | train_dataset_path_rule = 数据集位置存储在运行参数 <strong style="color:#010101">data_url</strong> 中,预训练模型存放在运行参数 <strong style="color:#010101">ckpt_url</strong> 中,训练输出路径存储在运行参数 <strong style="color:#010101">train_url</strong> 中。 | ||||
| infer_dataset_path_rule = 数据集位置存储在运行参数 <strong style="color:#010101">data_url</strong> 中,推理输出路径存储在运行参数 <strong style="color:#010101">result_url</strong> 中。 | infer_dataset_path_rule = 数据集位置存储在运行参数 <strong style="color:#010101">data_url</strong> 中,推理输出路径存储在运行参数 <strong style="color:#010101">result_url</strong> 中。 | ||||
| @@ -683,6 +683,13 @@ func CloudBrainRestart(ctx *context.Context) { | |||||
| break | break | ||||
| } | } | ||||
| if _, err := os.Stat(getOldJobPath(task)); err != nil { | |||||
| log.Error("Can not find job minio path", err) | |||||
| resultCode = "-1" | |||||
| errorMsg = ctx.Tr("cloudbrain.result_cleared") | |||||
| break | |||||
| } | |||||
| count, err := cloudbrainTask.GetNotFinalStatusTaskCount(ctx.User.ID, models.TypeCloudBrainOne, string(models.JobTypeDebug)) | count, err := cloudbrainTask.GetNotFinalStatusTaskCount(ctx.User.ID, models.TypeCloudBrainOne, string(models.JobTypeDebug)) | ||||
| if err != nil { | if err != nil { | ||||
| log.Error("GetCloudbrainCountByUserID failed:%v", err, ctx.Data["MsgID"]) | log.Error("GetCloudbrainCountByUserID failed:%v", err, ctx.Data["MsgID"]) | ||||
| @@ -722,6 +729,7 @@ func CloudBrainRestart(ctx *context.Context) { | |||||
| }) | }) | ||||
| } | } | ||||
| func HasModelFile(task *models.Cloudbrain) bool { | func HasModelFile(task *models.Cloudbrain) bool { | ||||
| if task.PreTrainModelUrl == "" { | if task.PreTrainModelUrl == "" { | ||||
| return true | return true | ||||
| @@ -747,6 +755,10 @@ func HasModelFile(task *models.Cloudbrain) bool { | |||||
| return isFind | return isFind | ||||
| } | } | ||||
| func getOldJobPath(task *models.Cloudbrain) string { | |||||
| return setting.Attachment.Minio.RealPath + setting.Attachment.Minio.Bucket + "/" + setting.CBCodePathPrefix + task.JobName | |||||
| } | |||||
| func CloudBrainBenchMarkShow(ctx *context.Context) { | func CloudBrainBenchMarkShow(ctx *context.Context) { | ||||
| cloudBrainShow(ctx, tplCloudBrainBenchmarkShow, models.JobTypeBenchmark) | cloudBrainShow(ctx, tplCloudBrainBenchmarkShow, models.JobTypeBenchmark) | ||||
| } | } | ||||
| @@ -0,0 +1,132 @@ | |||||
| package cloudbrain | |||||
| import ( | |||||
| "io/ioutil" | |||||
| "os" | |||||
| "sort" | |||||
| "time" | |||||
| "code.gitea.io/gitea/models" | |||||
| "code.gitea.io/gitea/modules/log" | |||||
| "code.gitea.io/gitea/modules/setting" | |||||
| "code.gitea.io/gitea/modules/storage" | |||||
| ) | |||||
| func ClearCloudbrainResultSpace() { | |||||
| if !setting.ClearStrategy.Enabled{ | |||||
| return | |||||
| } | |||||
| tasks, err := models.GetCloudBrainOneStoppedJobDaysAgo(setting.ClearStrategy.ResultSaveDays, setting.ClearStrategy.BatchSize) | |||||
| if err != nil { | |||||
| log.Warn("Failed to get cloudbrain, clear result failed.", err) | |||||
| return | |||||
| } | |||||
| var ids []int64 | |||||
| for _, task := range tasks { | |||||
| err := DeleteCloudbrainOneJobStorage(task.JobName) | |||||
| if err == nil { | |||||
| ids = append(ids, task.ID) | |||||
| } | |||||
| } | |||||
| err = models.UpdateCloudBrainRecordsCleared(ids) | |||||
| if err != nil { | |||||
| log.Warn("Failed to set cloudbrain cleared status", err) | |||||
| } | |||||
| //如果云脑表处理完了,通过遍历minio对象处理历史垃圾数据,如果存在的话 | |||||
| if len(tasks) < setting.ClearStrategy.BatchSize { | |||||
| clearLocalHistoryTrashFile() | |||||
| clearMinioHistoryTrashFile() | |||||
| } | |||||
| } | |||||
| func clearMinioHistoryTrashFile() { | |||||
| JobRealPrefix := setting.Attachment.Minio.RealPath + setting.Attachment.Minio.Bucket + "/" + setting.CBCodePathPrefix | |||||
| miniofiles, err := ioutil.ReadDir(JobRealPrefix) | |||||
| processCount := 0 | |||||
| if err != nil { | |||||
| log.Warn("Can not browser minio job path.") | |||||
| } else { | |||||
| SortModTimeAscend(miniofiles) | |||||
| for _, file := range miniofiles { | |||||
| if file.ModTime().Before(time.Now().AddDate(0, 0, -setting.ClearStrategy.TrashSaveDays)) { | |||||
| dirPath := setting.CBCodePathPrefix + file.Name() + "/" | |||||
| storage.Attachments.DeleteDir(dirPath) | |||||
| processCount++ | |||||
| if processCount == setting.ClearStrategy.BatchSize { | |||||
| break | |||||
| } | |||||
| } else { | |||||
| break | |||||
| } | |||||
| } | |||||
| } | |||||
| } | |||||
| func clearLocalHistoryTrashFile() { | |||||
| files, err := ioutil.ReadDir(setting.JobPath) | |||||
| processCount := 0 | |||||
| if err != nil { | |||||
| log.Warn("Can not browser local job path.") | |||||
| } else { | |||||
| SortModTimeAscend(files) | |||||
| for _, file := range files { | |||||
| //清理n天前的历史垃圾数据,清理job目录 | |||||
| if file.ModTime().Before(time.Now().AddDate(0, 0, -setting.ClearStrategy.TrashSaveDays)) { | |||||
| os.RemoveAll(setting.JobPath + file.Name()) | |||||
| processCount++ | |||||
| if processCount == setting.ClearStrategy.BatchSize { | |||||
| break | |||||
| } | |||||
| } else { | |||||
| break | |||||
| } | |||||
| } | |||||
| } | |||||
| } | |||||
| func SortModTimeAscend(files []os.FileInfo) { | |||||
| sort.Slice(files, func(i, j int) bool { | |||||
| return files[i].ModTime().Before(files[j].ModTime()) | |||||
| }) | |||||
| } | |||||
| func SortModTimeAscendForMinio(files []storage.FileInfo) { | |||||
| sort.Slice(files, func(i, j int) bool { | |||||
| timeI, _ := time.Parse("2006-01-02 15:04:05", files[i].ModTime) | |||||
| timeJ, _ := time.Parse("2006-01-02 15:04:05", files[i].ModTime) | |||||
| return timeI.Before(timeJ) | |||||
| }) | |||||
| } | |||||
| func DeleteCloudbrainOneJobStorage(jobName string) error { | |||||
| //delete local | |||||
| localJobPath := setting.JobPath + jobName | |||||
| err := os.RemoveAll(localJobPath) | |||||
| if err != nil { | |||||
| log.Error("RemoveAll(%s) failed:%v", localJobPath, err) | |||||
| } | |||||
| dirPath := setting.CBCodePathPrefix + jobName + "/" | |||||
| err1 := storage.Attachments.DeleteDir(dirPath) | |||||
| if err1 != nil { | |||||
| log.Error("DeleteDir(%s) failed:%v", localJobPath, err) | |||||
| } | |||||
| if err == nil { | |||||
| err = err1 | |||||
| } | |||||
| return err | |||||
| } | |||||