diff --git a/modules/grampus/grampus.go b/modules/grampus/grampus.go index 7bacb46d3..e56342046 100755 --- a/modules/grampus/grampus.go +++ b/modules/grampus/grampus.go @@ -20,10 +20,14 @@ const ( ProcessorTypeNPU = "npu.huawei.com/NPU" ProcessorTypeGPU = "nvidia.com/gpu" - GpuWorkDir = "/tmp/" - NpuWorkDir = "/cache/" + GpuWorkDir = "/tmp/" + NpuWorkDir = "/cache/" + NpuLocalLogUrl = "/cache/output/train.log" + CommandPrepareScriptNpu = ";mkdir -p output;mkdir -p code;mkdir -p dataset;mkdir -p pretrainmodel;" CodeArchiveName = "master.zip" + + BucketRemote = "grampus" ) var ( @@ -33,7 +37,7 @@ var ( SpecialPools *models.SpecialPools - CommandPrepareScript = ";mkdir -p output;mkdir -p code;mkdir -p dataset;mkdir -p pretrainmodel;echo \"start loading script\";wget -q https://git.openi.org.cn/OpenIOSSG/%s/archive/master.zip;" + + CommandPrepareScriptGpu = ";mkdir -p output;mkdir -p code;mkdir -p dataset;mkdir -p pretrainmodel;echo \"start loading script\";wget -q https://git.openi.org.cn/OpenIOSSG/%s/archive/master.zip;" + "echo \"finish loading script\";unzip -q master.zip;cd %s;chmod 777 downloader_for_obs uploader_for_npu downloader_for_minio uploader_for_gpu;" ) @@ -273,3 +277,12 @@ func InitSpecialPool() { json.Unmarshal([]byte(setting.Grampus.SpecialPools), &SpecialPools) } } + +func GetNpuModelRemoteObsUrl(jobName string) string { + return "s3:///grampus/jobs/" + jobName + "/output/models.zip" +} + +func GetBackNpuModel(jobName string) error { + + return nil +} diff --git a/modules/urfs_client/config/constants.go b/modules/urfs_client/config/constants.go new file mode 100755 index 000000000..76bdc5eab --- /dev/null +++ b/modules/urfs_client/config/constants.go @@ -0,0 +1,93 @@ +/* + * Copyright 2020 The Dragonfly Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package config + +import ( + "time" +) + +// Reason of backing to source. +const ( + BackSourceReasonNone = 0 + BackSourceReasonRegisterFail = 1 + BackSourceReasonMd5NotMatch = 2 + BackSourceReasonDownloadError = 3 + BackSourceReasonNoSpace = 4 + BackSourceReasonInitError = 5 + BackSourceReasonWriteError = 6 + BackSourceReasonHostSysError = 7 + BackSourceReasonNodeEmpty = 8 + BackSourceReasonSourceError = 10 + BackSourceReasonUserSpecified = 100 + ForceNotBackSourceAddition = 1000 +) + +// Download pattern. +const ( + PatternP2P = "p2p" + PatternSeedPeer = "seed-peer" + PatternSource = "source" +) + +//// Download limit. +//const ( +// DefaultPerPeerDownloadLimit = 20 * unit.MB +// DefaultTotalDownloadLimit = 100 * unit.MB +// DefaultUploadLimit = 100 * unit.MB +// DefaultMinRate = 20 * unit.MB +//) + +// Others. +const ( + DefaultTimestampFormat = "2006-01-02 15:04:05" + SchemaHTTP = "http" + + DefaultTaskExpireTime = 6 * time.Hour + DefaultGCInterval = 1 * time.Minute + DefaultDaemonAliveTime = 5 * time.Minute + DefaultScheduleTimeout = 5 * time.Minute + DefaultDownloadTimeout = 5 * time.Minute + + DefaultSchedulerSchema = "http" + DefaultSchedulerIP = "127.0.0.1" + DefaultSchedulerPort = 8002 + + DefaultPieceChanSize = 16 + DefaultObjectMaxReplicas = 3 +) + +// Dfcache subcommand names. +const ( + CmdStat = "stat" + CmdImport = "import" + CmdExport = "export" + CmdDelete = "delete" +) + +// Service defalut port of listening. +const ( + DefaultEndPort = 65535 + DefaultPeerStartPort = 65000 + DefaultUploadStartPort = 65002 + DefaultObjectStorageStartPort = 65004 + DefaultHealthyStartPort = 40901 +) + +var ( + // DefaultCertValidityPeriod is default validity period of certificate. + DefaultCertValidityPeriod = 180 * 24 * time.Hour +) diff --git a/modules/urfs_client/config/dfstore.go b/modules/urfs_client/config/dfstore.go new file mode 100755 index 000000000..aafb1b33c --- /dev/null +++ b/modules/urfs_client/config/dfstore.go @@ -0,0 +1,66 @@ +/* + * Copyright 2022 The Dragonfly Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package config + +import ( + "errors" + "fmt" + "net/url" +) + +type DfstoreConfig struct { + // Address of the object storage service. + Endpoint string `yaml:"endpoint,omitempty" mapstructure:"endpoint,omitempty"` + + // Filter is used to generate a unique Task ID by + // filtering unnecessary query params in the URL, + // it is separated by & character. + Filter string `yaml:"filter,omitempty" mapstructure:"filter,omitempty"` + + // Mode is the mode in which the backend is written, + // including WriteBack and AsyncWriteBack. + Mode int `yaml:"mode,omitempty" mapstructure:"mode,omitempty"` + + // MaxReplicas is the maximum number of + // replicas of an object cache in seed peers. + MaxReplicas int `yaml:"maxReplicas,omitempty" mapstructure:"mode,maxReplicas"` +} + +// New dfstore configuration. +func NewDfstore() *DfstoreConfig { + url := url.URL{ + Scheme: "http", + Host: fmt.Sprintf("%s:%d", "127.0.0.1", DefaultObjectStorageStartPort), + } + + return &DfstoreConfig{ + Endpoint: url.String(), + MaxReplicas: DefaultObjectMaxReplicas, + } +} + +func (cfg *DfstoreConfig) Validate() error { + if cfg.Endpoint == "" { + return errors.New("dfstore requires parameter endpoint") + } + + if _, err := url.ParseRequestURI(cfg.Endpoint); err != nil { + return fmt.Errorf("invalid endpoint: %w", err) + } + + return nil +} diff --git a/modules/urfs_client/config/headers.go b/modules/urfs_client/config/headers.go new file mode 100755 index 000000000..9a27296d3 --- /dev/null +++ b/modules/urfs_client/config/headers.go @@ -0,0 +1,32 @@ +/* + * Copyright 2020 The Dragonfly Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package config + +const ( + HeaderDragonflyFilter = "X-Dragonfly-Filter" + HeaderDragonflyPeer = "X-Dragonfly-Peer" + HeaderDragonflyTask = "X-Dragonfly-Task" + HeaderDragonflyRange = "X-Dragonfly-Range" + // HeaderDragonflyTag different HeaderDragonflyTag for the same url will be divided into different P2P overlay + HeaderDragonflyTag = "X-Dragonfly-Tag" + // HeaderDragonflyApplication is used for statistics and traffic control + HeaderDragonflyApplication = "X-Dragonfly-Application" + // HeaderDragonflyRegistry is used for dynamic registry mirrors. + HeaderDragonflyRegistry = "X-Dragonfly-Registry" + // HeaderDragonflyObjectMetaDigest is used for digest of object storage. + HeaderDragonflyObjectMetaDigest = "X-Dragonfly-Object-Meta-Digest" +) diff --git a/modules/urfs_client/dfstore/dfstore.go b/modules/urfs_client/dfstore/dfstore.go new file mode 100755 index 000000000..2901b0abc --- /dev/null +++ b/modules/urfs_client/dfstore/dfstore.go @@ -0,0 +1,307 @@ +package dfstore + +import ( + "context" + "errors" + "fmt" + "github.com/go-http-utils/headers" + "io" + "net/http" + "net/url" + "path" + "strconv" + + "git.openi.org.cn/OpenI/Grampus/server/common/urfs_client/config" + pkgobjectstorage "git.openi.org.cn/OpenI/Grampus/server/common/urfs_client/objectstorage" +) + +// Dfstore is the interface used for object storage. +type Dfstore interface { + + // GetUrfsMetadataRequestWithContext returns *http.Request of getting Urfs metadata. + GetUrfsMetadataRequestWithContext(ctx context.Context, input *GetUrfsMetadataInput) (*http.Request, error) + + // GetUrfsMetadataWithContext returns matedata of Urfs. + GetUrfsMetadataWithContext(ctx context.Context, input *GetUrfsMetadataInput) (*pkgobjectstorage.ObjectMetadata, error) + + // GetUrfsRequestWithContext returns *http.Request of getting Urfs. + GetUrfsRequestWithContext(ctx context.Context, input *GetUrfsInput) (*http.Request, error) + + // GetUrfsWithContext returns data of Urfs. + GetUrfsWithContext(ctx context.Context, input *GetUrfsInput) (io.ReadCloser, error) + + // GetUrfsStatusRequestWithContext returns *http.Request of getting Urfs status. + GetUrfsStatusRequestWithContext(ctx context.Context, input *GetUrfsInput) (*http.Request, error) + + // GetUrfsStatusWithContext returns schedule status of Urfs. + GetUrfsStatusWithContext(ctx context.Context, input *GetUrfsInput) (io.ReadCloser, error) +} + +// dfstore provides object storage function. +type dfstore struct { + endpoint string + httpClient *http.Client +} + +// Option is a functional option for configuring the dfstore. +type Option func(dfs *dfstore) + +// New dfstore instance. +func New(endpoint string, options ...Option) Dfstore { + dfs := &dfstore{ + endpoint: endpoint, + httpClient: http.DefaultClient, + } + + for _, opt := range options { + opt(dfs) + } + + return dfs +} + +// GetUrfsMetadataInput is used to construct request of getting object metadata. +type GetUrfsMetadataInput struct { + + // Endpoint is endpoint name. + Endpoint string + + // BucketName is bucket name. + BucketName string + + // ObjectKey is object key. + ObjectKey string + + // DstPeer is target peerHost. + DstPeer string +} + +// Validate validates GetUrfsMetadataInput fields. +func (i *GetUrfsMetadataInput) Validate() error { + + if i.Endpoint == "" { + return errors.New("invalid Endpoint") + + } + + if i.BucketName == "" { + return errors.New("invalid BucketName") + + } + + if i.ObjectKey == "" { + return errors.New("invalid ObjectKey") + } + + return nil +} + +// GetObjectMetadataRequestWithContext returns *http.Request of getting object metadata. +func (dfs *dfstore) GetUrfsMetadataRequestWithContext(ctx context.Context, input *GetUrfsMetadataInput) (*http.Request, error) { + if err := input.Validate(); err != nil { + return nil, err + } + + dstUrl := url.URL{ + Scheme: "http", + Host: fmt.Sprintf("%s:%d", input.DstPeer, config.DefaultObjectStorageStartPort), + } + + u, err := url.Parse(dstUrl.String()) + if err != nil { + return nil, err + } + + u.Path = path.Join("buckets", input.BucketName+"."+input.Endpoint, "objects", input.ObjectKey) + req, err := http.NewRequestWithContext(ctx, http.MethodHead, u.String(), nil) + if err != nil { + return nil, err + } + + return req, nil +} + +// GetObjectMetadataWithContext returns metadata of object. +func (dfs *dfstore) GetUrfsMetadataWithContext(ctx context.Context, input *GetUrfsMetadataInput) (*pkgobjectstorage.ObjectMetadata, error) { + req, err := dfs.GetUrfsMetadataRequestWithContext(ctx, input) + if err != nil { + return nil, err + } + + resp, err := dfs.httpClient.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + if resp.StatusCode/100 != 2 { + return nil, fmt.Errorf("bad response status %s", resp.Status) + } + + contentLength, err := strconv.ParseInt(resp.Header.Get(headers.ContentLength), 10, 64) + if err != nil { + return nil, err + } + + return &pkgobjectstorage.ObjectMetadata{ + ContentDisposition: resp.Header.Get(headers.ContentDisposition), + ContentEncoding: resp.Header.Get(headers.ContentEncoding), + ContentLanguage: resp.Header.Get(headers.ContentLanguage), + ContentLength: int64(contentLength), + ContentType: resp.Header.Get(headers.ContentType), + ETag: resp.Header.Get(headers.ContentType), + Digest: resp.Header.Get(config.HeaderDragonflyObjectMetaDigest), + }, nil +} + +// GetUrfsInput is used to construct request of getting object. +type GetUrfsInput struct { + + // Endpoint is endpoint name. + Endpoint string + + // BucketName is bucket name. + BucketName string + + // ObjectKey is object key. + ObjectKey string + + // Filter is used to generate a unique Task ID by + // filtering unnecessary query params in the URL, + // it is separated by & character. + Filter string + + // Range is the HTTP range header. + Range string + + // DstPeer is target peerHost. + DstPeer string +} + +// GetObjectWithContext returns data of object. +func (dfs *dfstore) GetUrfsWithContext(ctx context.Context, input *GetUrfsInput) (io.ReadCloser, error) { + req, err := dfs.GetUrfsRequestWithContext(ctx, input) + if err != nil { + return nil, err + } + + resp, err := dfs.httpClient.Do(req) + if err != nil { + return nil, err + } + + if resp.StatusCode/100 != 2 { + return nil, fmt.Errorf("bad response status %s", resp.Status) + } + + return resp.Body, nil +} + +// GetObjectRequestWithContext returns *http.Request of getting object. +func (dfs *dfstore) GetUrfsRequestWithContext(ctx context.Context, input *GetUrfsInput) (*http.Request, error) { + if err := input.Validate(); err != nil { + return nil, err + } + + dstUrl := url.URL{ + Scheme: "http", + Host: fmt.Sprintf("%s:%d", input.DstPeer, config.DefaultObjectStorageStartPort), + } + + u, err := url.Parse(dstUrl.String()) + if err != nil { + return nil, err + } + + u.Path = path.Join("buckets", input.BucketName+"."+input.Endpoint, "cache_object", input.ObjectKey) + + query := u.Query() + if input.Filter != "" { + query.Set("filter", input.Filter) + } + u.RawQuery = query.Encode() + req, err := http.NewRequestWithContext(ctx, http.MethodPost, u.String(), nil) + if err != nil { + return nil, err + } + + if input.Range != "" { + req.Header.Set(headers.Range, input.Range) + } + + return req, nil +} + +// Validate validates GetUrfsInput fields. +func (i *GetUrfsInput) Validate() error { + + if i.Endpoint == "" { + return errors.New("invalid Endpoint") + + } + + if i.BucketName == "" { + return errors.New("invalid BucketName") + + } + + if i.ObjectKey == "" { + return errors.New("invalid ObjectKey") + } + + return nil +} + +// GetUrfsStatusWithContext returns schedule task status. +func (dfs *dfstore) GetUrfsStatusWithContext(ctx context.Context, input *GetUrfsInput) (io.ReadCloser, error) { + req, err := dfs.GetUrfsStatusRequestWithContext(ctx, input) + if err != nil { + return nil, err + } + + resp, err := dfs.httpClient.Do(req) + if err != nil { + return nil, err + } + + if resp.StatusCode/100 != 2 { + return nil, fmt.Errorf("bad response status %s", resp.Status) + } + + return resp.Body, nil +} + +// GetObjectStatusRequestWithContext returns *http.Request of check schedule task status. +func (dfs *dfstore) GetUrfsStatusRequestWithContext(ctx context.Context, input *GetUrfsInput) (*http.Request, error) { + if err := input.Validate(); err != nil { + return nil, err + } + + dstUrl := url.URL{ + Scheme: "http", + Host: fmt.Sprintf("%s:%d", input.DstPeer, config.DefaultObjectStorageStartPort), + } + + u, err := url.Parse(dstUrl.String()) + if err != nil { + return nil, err + } + + u.Path = path.Join("buckets", input.BucketName+"."+input.Endpoint, "check_object", input.ObjectKey) + + query := u.Query() + if input.Filter != "" { + query.Set("filter", input.Filter) + } + u.RawQuery = query.Encode() + req, err := http.NewRequestWithContext(ctx, http.MethodGet, u.String(), nil) + if err != nil { + return nil, err + } + + if input.Range != "" { + req.Header.Set(headers.Range, input.Range) + } + + return req, nil +} diff --git a/modules/urfs_client/objectstorage/objectstorage.go b/modules/urfs_client/objectstorage/objectstorage.go new file mode 100755 index 000000000..e81356760 --- /dev/null +++ b/modules/urfs_client/objectstorage/objectstorage.go @@ -0,0 +1,47 @@ +/* + * Copyright 2022 The Dragonfly Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +//go:generate mockgen -destination mocks/objectstorage_mock.go -source objectstorage.go -package mocks + +package objectstorage + +type ObjectMetadata struct { + // Key is object key. + Key string + + // ContentDisposition is Content-Disposition header. + ContentDisposition string + + // ContentEncoding is Content-Encoding header. + ContentEncoding string + + // ContentLanguage is Content-Language header. + ContentLanguage string + + // ContentLanguage is Content-Length header. + ContentLength int64 + + // ContentType is Content-Type header. + ContentType string + + // ETag is ETag header. + ETag string + + // Digest is object digest. + Digest string +} + + diff --git a/modules/urfs_client/urchin/urchinfs.go b/modules/urfs_client/urchin/urchinfs.go new file mode 100755 index 000000000..8c59108b3 --- /dev/null +++ b/modules/urfs_client/urchin/urchinfs.go @@ -0,0 +1,268 @@ +package urchin + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "io/ioutil" + "net/url" + "strconv" + "strings" + + "git.openi.org.cn/OpenI/Grampus/server/common/urfs_client/config" + urfs "git.openi.org.cn/OpenI/Grampus/server/common/urfs_client/dfstore" +) + +type Urchinfs interface { + + // schedule source dataset to target peer + ScheduleDataToPeer(sourceUrl, destPeerHost string) (*PeerResult, error) + + // check schedule data to peer task status + CheckScheduleTaskStatus(sourceUrl, destPeerHost string) (*PeerResult, error) + + ScheduleDataToPeerByKey(endpoint, bucketName, objectKey, destPeerHost string) (*PeerResult, error) + + CheckScheduleTaskStatusByKey(endpoint, bucketName, objectKey, destPeerHost string) (*PeerResult, error) +} + +type urchinfs struct { + // Initialize default urfs config. + cfg *config.DfstoreConfig +} + +// New urchinfs instance. +func New() Urchinfs { + + urfs := &urchinfs{ + cfg: config.NewDfstore(), + } + return urfs +} + +const ( + // UrfsScheme if the scheme of object storage. + UrfsScheme = "urfs" +) + +func (urfs *urchinfs) ScheduleDataToPeer(sourceUrl, destPeerHost string) (*PeerResult, error) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + if err := urfs.cfg.Validate(); err != nil { + return nil, err + } + + if err := validateSchedulelArgs(sourceUrl, destPeerHost); err != nil { + return nil, err + } + + // Copy object storage to local file. + endpoint, bucketName, objectKey, err := parseUrfsURL(sourceUrl) + if err != nil { + return nil, err + } + peerResult, err := processScheduleDataToPeer(ctx, urfs.cfg, endpoint, bucketName, objectKey, destPeerHost) + if err != nil { + return nil, err + } + + return peerResult, err +} + +func (urfs *urchinfs) ScheduleDataToPeerByKey(endpoint, bucketName, objectKey, destPeerHost string) (*PeerResult, error) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + peerResult, err := processScheduleDataToPeer(ctx, urfs.cfg, endpoint, bucketName, objectKey, destPeerHost) + if err != nil { + return nil, err + } + + return peerResult, err +} + +func (urfs *urchinfs) CheckScheduleTaskStatus(sourceUrl, destPeerHost string) (*PeerResult, error) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + if err := urfs.cfg.Validate(); err != nil { + return nil, err + } + + if err := validateSchedulelArgs(sourceUrl, destPeerHost); err != nil { + return nil, err + } + + // Copy object storage to local file. + endpoint, bucketName, objectKey, err := parseUrfsURL(sourceUrl) + if err != nil { + return nil, err + } + peerResult, err := processCheckScheduleTaskStatus(ctx, urfs.cfg, endpoint, bucketName, objectKey, destPeerHost) + if err != nil { + return nil, err + } + + return peerResult, err +} + +func (urfs *urchinfs) CheckScheduleTaskStatusByKey(endpoint, bucketName, objectKey, destPeerHost string) (*PeerResult, error) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + peerResult, err := processCheckScheduleTaskStatus(ctx, urfs.cfg, endpoint, bucketName, objectKey, destPeerHost) + if err != nil { + return nil, err + } + + return peerResult, err +} + +// isUrfsURL determines whether the raw url is urfs url. +func isUrfsURL(rawURL string) bool { + u, err := url.ParseRequestURI(rawURL) + if err != nil { + return false + } + + if u.Scheme != UrfsScheme || u.Host == "" || u.Path == "" { + return false + } + + return true +} + +// Validate copy arguments. +func validateSchedulelArgs(sourceUrl, destPeer string) error { + if !isUrfsURL(sourceUrl) { + return errors.New("source url should be urfs:// protocol") + } + + return nil +} + +// Parse object storage url. eg: urfs://源数据$endpoint/源数据$bucket/源数据filepath +func parseUrfsURL(rawURL string) (string, string, string, error) { + u, err := url.ParseRequestURI(rawURL) + if err != nil { + return "", "", "", err + } + + if u.Scheme != UrfsScheme { + return "", "", "", fmt.Errorf("invalid scheme, e.g. %s://endpoint/bucket_name/object_key", UrfsScheme) + } + + if u.Host == "" { + return "", "", "", errors.New("empty endpoint name") + } + + if u.Path == "" { + return "", "", "", errors.New("empty object path") + } + + bucket, key, found := strings.Cut(strings.Trim(u.Path, "/"), "/") + if found == false { + return "", "", "", errors.New("invalid bucket and object key " + u.Path) + } + + return u.Host, bucket, key, nil +} + +// Schedule object storage to peer. +func processScheduleDataToPeer(ctx context.Context, cfg *config.DfstoreConfig, endpoint, bucketName, objectKey, dstPeer string) (*PeerResult, error) { + dfs := urfs.New(cfg.Endpoint) + meta, err := dfs.GetUrfsMetadataWithContext(ctx, &urfs.GetUrfsMetadataInput{ + Endpoint: endpoint, + BucketName: bucketName, + ObjectKey: objectKey, + DstPeer: dstPeer, + }) + if err != nil { + return nil, err + } + + reader, err := dfs.GetUrfsWithContext(ctx, &urfs.GetUrfsInput{ + Endpoint: endpoint, + BucketName: bucketName, + ObjectKey: objectKey, + DstPeer: dstPeer, + }) + if err != nil { + return nil, err + } + defer reader.Close() + + body, err := ioutil.ReadAll(reader) + + var peerResult PeerResult + if err == nil { + err = json.Unmarshal((body), &peerResult) + } + peerResult.SignedUrl = strings.ReplaceAll(peerResult.SignedUrl, "\\u0026", "&") + + fileContentLength, err := strconv.ParseInt(peerResult.ContentLength, 10, 64) + if err != nil { + return nil, err + } + if fileContentLength != meta.ContentLength { + return nil, errors.New("content length inconsistent with meta") + } + + return &peerResult, err +} + +// check schedule task status. +func processCheckScheduleTaskStatus(ctx context.Context, cfg *config.DfstoreConfig, endpoint, bucketName, objectKey, dstPeer string) (*PeerResult, error) { + dfs := urfs.New(cfg.Endpoint) + meta, err := dfs.GetUrfsMetadataWithContext(ctx, &urfs.GetUrfsMetadataInput{ + Endpoint: endpoint, + BucketName: bucketName, + ObjectKey: objectKey, + DstPeer: dstPeer, + }) + if err != nil { + return nil, err + } + + reader, err := dfs.GetUrfsStatusWithContext(ctx, &urfs.GetUrfsInput{ + Endpoint: endpoint, + BucketName: bucketName, + ObjectKey: objectKey, + DstPeer: dstPeer, + }) + if err != nil { + return nil, err + } + defer reader.Close() + + body, err := ioutil.ReadAll(reader) + + var peerResult PeerResult + if err == nil { + err = json.Unmarshal((body), &peerResult) + } + peerResult.SignedUrl = strings.ReplaceAll(peerResult.SignedUrl, "\\u0026", "&") + + fileContentLength, err := strconv.ParseInt(peerResult.ContentLength, 10, 64) + if err != nil { + return nil, err + } + if fileContentLength != meta.ContentLength { + return nil, err + } + return &peerResult, err +} + +type PeerResult struct { + ContentType string `json:"Content-Type"` + ContentLength string `json:"Content-Length"` + SignedUrl string + DataRoot string + DataPath string + DataEndpoint string + StatusCode int + StatusMsg string + TaskID string +} diff --git a/routers/api/v1/repo/modelarts.go b/routers/api/v1/repo/modelarts.go index 79e35812e..a6e807f61 100755 --- a/routers/api/v1/repo/modelarts.go +++ b/routers/api/v1/repo/modelarts.go @@ -180,6 +180,7 @@ func GetModelArtsTrainJobVersion(ctx *context.APIContext) { } if oldStatus != job.Status { notification.NotifyChangeCloudbrainStatus(job, oldStatus) + // todo: get model back } err = models.UpdateTrainJobVersion(job) if err != nil { diff --git a/routers/repo/cloudbrain.go b/routers/repo/cloudbrain.go index a2ea7d51b..ff19d5829 100755 --- a/routers/repo/cloudbrain.go +++ b/routers/repo/cloudbrain.go @@ -1939,6 +1939,7 @@ func SyncCloudbrainStatus() { task.CorrectCreateUnix() if oldStatus != task.Status { notification.NotifyChangeCloudbrainStatus(task, oldStatus) + // todo: get model back } err = models.UpdateJob(task) if err != nil { diff --git a/routers/repo/grampus.go b/routers/repo/grampus.go index c3258adc7..b92f19931 100755 --- a/routers/repo/grampus.go +++ b/routers/repo/grampus.go @@ -1,6 +1,7 @@ package repo import ( + "code.gitea.io/gitea/modules/urfs_client/urchin" "encoding/json" "errors" "fmt" @@ -680,8 +681,7 @@ func grampusTrainJobNpuCreate(ctx *context.Context, form auth.CreateGrampusTrain //prepare command preTrainModelPath := getPreTrainModelPath(form.PreTrainModelUrl, form.CkptName) - modelRemoteObsUrl := "s3:///grampus/jobs/" + jobName + "/output/models.zip" - command, err := generateCommand(repo.Name, grampus.ProcessorTypeNPU, codeObsPath+cloudbrain.DefaultBranchName+".zip", datasetRemotePath, bootFile, params, setting.CodePathPrefix+jobName+modelarts.OutputPath, allFileName, preTrainModelPath, form.CkptName, modelRemoteObsUrl) + command, err := generateCommand(repo.Name, grampus.ProcessorTypeNPU, codeObsPath+cloudbrain.DefaultBranchName+".zip", datasetRemotePath, bootFile, params, setting.CodePathPrefix+jobName+modelarts.OutputPath, allFileName, preTrainModelPath, form.CkptName, grampus.GetNpuModelRemoteObsUrl(jobName)) if err != nil { log.Error("Failed to generateCommand: %s (%v)", displayJobName, err, ctx.Data["MsgID"]) grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeNPU) @@ -855,7 +855,7 @@ func GrampusTrainJobShow(ctx *context.Context) { } oldStatus := task.Status task.Status = grampus.TransTrainJobStatus(result.JobInfo.Status) - if task.Status != result.JobInfo.Status || result.JobInfo.Status == models.GrampusStatusRunning { + if task.Status != oldStatus || task.Status == models.GrampusStatusRunning { task.Duration = result.JobInfo.RunSec if task.Duration < 0 { task.Duration = 0 @@ -871,6 +871,15 @@ func GrampusTrainJobShow(ctx *context.Context) { task.CorrectCreateUnix() if oldStatus != task.Status { notification.NotifyChangeCloudbrainStatus(task, oldStatus) + if models.IsTrainJobTerminal(task.Status) { + //get model back + urfs := urchin.New() + res, err := urfs.ScheduleDataToPeerByKey(endPoint, grampus.BucketRemote, objectKey, dstPeer) + if err != nil { + log.Error("ScheduleDataToPeer failed:%v", err) + return isExist, dataUrl, err + } + } } err = models.UpdateJob(task) if err != nil { @@ -971,12 +980,15 @@ func GrampusGetLog(ctx *context.Context) { func generateCommand(repoName, processorType, codeRemotePath, dataRemotePath, bootFile, paramSrc, outputRemotePath, datasetName, pretrainModelPath, pretrainModelFileName, modelRemoteObsUrl string) (string, error) { var command string + //prepare workDir := grampus.NpuWorkDir - if processorType == grampus.ProcessorTypeGPU { + if processorType == grampus.ProcessorTypeNPU { + command += "pwd;cd " + workDir + grampus.CommandPrepareScriptNpu + } else if processorType == grampus.ProcessorTypeGPU { workDir = grampus.GpuWorkDir + command += "pwd;cd " + workDir + fmt.Sprintf(grampus.CommandPrepareScriptGpu, setting.Grampus.SyncScriptProject, setting.Grampus.SyncScriptProject) } - command += "pwd;cd " + workDir + fmt.Sprintf(grampus.CommandPrepareScript, setting.Grampus.SyncScriptProject, setting.Grampus.SyncScriptProject) //download code & dataset if processorType == grampus.ProcessorTypeNPU { //no need to download code & dataset by internet @@ -1025,8 +1037,8 @@ func generateCommand(repoName, processorType, codeRemotePath, dataRemotePath, bo var commandCode string if processorType == grampus.ProcessorTypeNPU { - paramCode += " --obs_url=" + modelRemoteObsUrl - commandCode = "/bin/bash /home/work/run_train_for_openi.sh /home/work/openi.py /tmp/log/train.log" + paramCode + ";" + paramCode += " --model_url=" + modelRemoteObsUrl + commandCode = "/bin/bash /home/work/run_train_for_openi.sh /home/work/openi.py " + grampus.NpuLocalLogUrl + paramCode + ";" } else if processorType == grampus.ProcessorTypeGPU { if pretrainModelFileName != "" { paramCode += " --ckpt_url" + "=" + workDir + "pretrainmodel/" + pretrainModelFileName @@ -1042,8 +1054,7 @@ func generateCommand(repoName, processorType, codeRemotePath, dataRemotePath, bo //upload models if processorType == grampus.ProcessorTypeNPU { - commandUpload := "cd " + workDir + setting.Grampus.SyncScriptProject + "/;./uploader_for_npu " + setting.Bucket + " " + outputRemotePath + " " + workDir + "output/;" - command += commandUpload + // no need to upload } else if processorType == grampus.ProcessorTypeGPU { commandUpload := "cd " + workDir + setting.Grampus.SyncScriptProject + "/;./uploader_for_gpu " + setting.Grampus.Env + " " + outputRemotePath + " " + workDir + "output/;" command += commandUpload