Browse Source

init

tags/v1.22.11.1^2
lewis 3 years ago
parent
commit
26a6f883ed
10 changed files with 851 additions and 12 deletions
  1. +16
    -3
      modules/grampus/grampus.go
  2. +93
    -0
      modules/urfs_client/config/constants.go
  3. +66
    -0
      modules/urfs_client/config/dfstore.go
  4. +32
    -0
      modules/urfs_client/config/headers.go
  5. +307
    -0
      modules/urfs_client/dfstore/dfstore.go
  6. +47
    -0
      modules/urfs_client/objectstorage/objectstorage.go
  7. +268
    -0
      modules/urfs_client/urchin/urchinfs.go
  8. +1
    -0
      routers/api/v1/repo/modelarts.go
  9. +1
    -0
      routers/repo/cloudbrain.go
  10. +20
    -9
      routers/repo/grampus.go

+ 16
- 3
modules/grampus/grampus.go View File

@@ -20,10 +20,14 @@ const (
ProcessorTypeNPU = "npu.huawei.com/NPU"
ProcessorTypeGPU = "nvidia.com/gpu"

GpuWorkDir = "/tmp/"
NpuWorkDir = "/cache/"
GpuWorkDir = "/tmp/"
NpuWorkDir = "/cache/"
NpuLocalLogUrl = "/cache/output/train.log"
CommandPrepareScriptNpu = ";mkdir -p output;mkdir -p code;mkdir -p dataset;mkdir -p pretrainmodel;"

CodeArchiveName = "master.zip"

BucketRemote = "grampus"
)

var (
@@ -33,7 +37,7 @@ var (

SpecialPools *models.SpecialPools

CommandPrepareScript = ";mkdir -p output;mkdir -p code;mkdir -p dataset;mkdir -p pretrainmodel;echo \"start loading script\";wget -q https://git.openi.org.cn/OpenIOSSG/%s/archive/master.zip;" +
CommandPrepareScriptGpu = ";mkdir -p output;mkdir -p code;mkdir -p dataset;mkdir -p pretrainmodel;echo \"start loading script\";wget -q https://git.openi.org.cn/OpenIOSSG/%s/archive/master.zip;" +
"echo \"finish loading script\";unzip -q master.zip;cd %s;chmod 777 downloader_for_obs uploader_for_npu downloader_for_minio uploader_for_gpu;"
)

@@ -273,3 +277,12 @@ func InitSpecialPool() {
json.Unmarshal([]byte(setting.Grampus.SpecialPools), &SpecialPools)
}
}

func GetNpuModelRemoteObsUrl(jobName string) string {
return "s3:///grampus/jobs/" + jobName + "/output/models.zip"
}

func GetBackNpuModel(jobName string) error {
return nil
}

+ 93
- 0
modules/urfs_client/config/constants.go View File

@@ -0,0 +1,93 @@
/*
* Copyright 2020 The Dragonfly Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package config

import (
"time"
)

// Reason of backing to source.
const (
BackSourceReasonNone = 0
BackSourceReasonRegisterFail = 1
BackSourceReasonMd5NotMatch = 2
BackSourceReasonDownloadError = 3
BackSourceReasonNoSpace = 4
BackSourceReasonInitError = 5
BackSourceReasonWriteError = 6
BackSourceReasonHostSysError = 7
BackSourceReasonNodeEmpty = 8
BackSourceReasonSourceError = 10
BackSourceReasonUserSpecified = 100
ForceNotBackSourceAddition = 1000
)

// Download pattern.
const (
PatternP2P = "p2p"
PatternSeedPeer = "seed-peer"
PatternSource = "source"
)

//// Download limit.
//const (
// DefaultPerPeerDownloadLimit = 20 * unit.MB
// DefaultTotalDownloadLimit = 100 * unit.MB
// DefaultUploadLimit = 100 * unit.MB
// DefaultMinRate = 20 * unit.MB
//)

// Others.
const (
DefaultTimestampFormat = "2006-01-02 15:04:05"
SchemaHTTP = "http"

DefaultTaskExpireTime = 6 * time.Hour
DefaultGCInterval = 1 * time.Minute
DefaultDaemonAliveTime = 5 * time.Minute
DefaultScheduleTimeout = 5 * time.Minute
DefaultDownloadTimeout = 5 * time.Minute

DefaultSchedulerSchema = "http"
DefaultSchedulerIP = "127.0.0.1"
DefaultSchedulerPort = 8002

DefaultPieceChanSize = 16
DefaultObjectMaxReplicas = 3
)

// Dfcache subcommand names.
const (
CmdStat = "stat"
CmdImport = "import"
CmdExport = "export"
CmdDelete = "delete"
)

// Service defalut port of listening.
const (
DefaultEndPort = 65535
DefaultPeerStartPort = 65000
DefaultUploadStartPort = 65002
DefaultObjectStorageStartPort = 65004
DefaultHealthyStartPort = 40901
)

var (
// DefaultCertValidityPeriod is default validity period of certificate.
DefaultCertValidityPeriod = 180 * 24 * time.Hour
)

+ 66
- 0
modules/urfs_client/config/dfstore.go View File

@@ -0,0 +1,66 @@
/*
* Copyright 2022 The Dragonfly Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package config

import (
"errors"
"fmt"
"net/url"
)

type DfstoreConfig struct {
// Address of the object storage service.
Endpoint string `yaml:"endpoint,omitempty" mapstructure:"endpoint,omitempty"`

// Filter is used to generate a unique Task ID by
// filtering unnecessary query params in the URL,
// it is separated by & character.
Filter string `yaml:"filter,omitempty" mapstructure:"filter,omitempty"`

// Mode is the mode in which the backend is written,
// including WriteBack and AsyncWriteBack.
Mode int `yaml:"mode,omitempty" mapstructure:"mode,omitempty"`

// MaxReplicas is the maximum number of
// replicas of an object cache in seed peers.
MaxReplicas int `yaml:"maxReplicas,omitempty" mapstructure:"mode,maxReplicas"`
}

// New dfstore configuration.
func NewDfstore() *DfstoreConfig {
url := url.URL{
Scheme: "http",
Host: fmt.Sprintf("%s:%d", "127.0.0.1", DefaultObjectStorageStartPort),
}

return &DfstoreConfig{
Endpoint: url.String(),
MaxReplicas: DefaultObjectMaxReplicas,
}
}

func (cfg *DfstoreConfig) Validate() error {
if cfg.Endpoint == "" {
return errors.New("dfstore requires parameter endpoint")
}

if _, err := url.ParseRequestURI(cfg.Endpoint); err != nil {
return fmt.Errorf("invalid endpoint: %w", err)
}

return nil
}

+ 32
- 0
modules/urfs_client/config/headers.go View File

@@ -0,0 +1,32 @@
/*
* Copyright 2020 The Dragonfly Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package config

const (
HeaderDragonflyFilter = "X-Dragonfly-Filter"
HeaderDragonflyPeer = "X-Dragonfly-Peer"
HeaderDragonflyTask = "X-Dragonfly-Task"
HeaderDragonflyRange = "X-Dragonfly-Range"
// HeaderDragonflyTag different HeaderDragonflyTag for the same url will be divided into different P2P overlay
HeaderDragonflyTag = "X-Dragonfly-Tag"
// HeaderDragonflyApplication is used for statistics and traffic control
HeaderDragonflyApplication = "X-Dragonfly-Application"
// HeaderDragonflyRegistry is used for dynamic registry mirrors.
HeaderDragonflyRegistry = "X-Dragonfly-Registry"
// HeaderDragonflyObjectMetaDigest is used for digest of object storage.
HeaderDragonflyObjectMetaDigest = "X-Dragonfly-Object-Meta-Digest"
)

+ 307
- 0
modules/urfs_client/dfstore/dfstore.go View File

@@ -0,0 +1,307 @@
package dfstore

import (
"context"
"errors"
"fmt"
"github.com/go-http-utils/headers"
"io"
"net/http"
"net/url"
"path"
"strconv"

"git.openi.org.cn/OpenI/Grampus/server/common/urfs_client/config"
pkgobjectstorage "git.openi.org.cn/OpenI/Grampus/server/common/urfs_client/objectstorage"
)

// Dfstore is the interface used for object storage.
type Dfstore interface {

// GetUrfsMetadataRequestWithContext returns *http.Request of getting Urfs metadata.
GetUrfsMetadataRequestWithContext(ctx context.Context, input *GetUrfsMetadataInput) (*http.Request, error)

// GetUrfsMetadataWithContext returns matedata of Urfs.
GetUrfsMetadataWithContext(ctx context.Context, input *GetUrfsMetadataInput) (*pkgobjectstorage.ObjectMetadata, error)

// GetUrfsRequestWithContext returns *http.Request of getting Urfs.
GetUrfsRequestWithContext(ctx context.Context, input *GetUrfsInput) (*http.Request, error)

// GetUrfsWithContext returns data of Urfs.
GetUrfsWithContext(ctx context.Context, input *GetUrfsInput) (io.ReadCloser, error)

// GetUrfsStatusRequestWithContext returns *http.Request of getting Urfs status.
GetUrfsStatusRequestWithContext(ctx context.Context, input *GetUrfsInput) (*http.Request, error)

// GetUrfsStatusWithContext returns schedule status of Urfs.
GetUrfsStatusWithContext(ctx context.Context, input *GetUrfsInput) (io.ReadCloser, error)
}

// dfstore provides object storage function.
type dfstore struct {
endpoint string
httpClient *http.Client
}

// Option is a functional option for configuring the dfstore.
type Option func(dfs *dfstore)

// New dfstore instance.
func New(endpoint string, options ...Option) Dfstore {
dfs := &dfstore{
endpoint: endpoint,
httpClient: http.DefaultClient,
}

for _, opt := range options {
opt(dfs)
}

return dfs
}

// GetUrfsMetadataInput is used to construct request of getting object metadata.
type GetUrfsMetadataInput struct {

// Endpoint is endpoint name.
Endpoint string

// BucketName is bucket name.
BucketName string

// ObjectKey is object key.
ObjectKey string

// DstPeer is target peerHost.
DstPeer string
}

// Validate validates GetUrfsMetadataInput fields.
func (i *GetUrfsMetadataInput) Validate() error {

if i.Endpoint == "" {
return errors.New("invalid Endpoint")

}

if i.BucketName == "" {
return errors.New("invalid BucketName")

}

if i.ObjectKey == "" {
return errors.New("invalid ObjectKey")
}

return nil
}

// GetObjectMetadataRequestWithContext returns *http.Request of getting object metadata.
func (dfs *dfstore) GetUrfsMetadataRequestWithContext(ctx context.Context, input *GetUrfsMetadataInput) (*http.Request, error) {
if err := input.Validate(); err != nil {
return nil, err
}

dstUrl := url.URL{
Scheme: "http",
Host: fmt.Sprintf("%s:%d", input.DstPeer, config.DefaultObjectStorageStartPort),
}

u, err := url.Parse(dstUrl.String())
if err != nil {
return nil, err
}

u.Path = path.Join("buckets", input.BucketName+"."+input.Endpoint, "objects", input.ObjectKey)
req, err := http.NewRequestWithContext(ctx, http.MethodHead, u.String(), nil)
if err != nil {
return nil, err
}

return req, nil
}

// GetObjectMetadataWithContext returns metadata of object.
func (dfs *dfstore) GetUrfsMetadataWithContext(ctx context.Context, input *GetUrfsMetadataInput) (*pkgobjectstorage.ObjectMetadata, error) {
req, err := dfs.GetUrfsMetadataRequestWithContext(ctx, input)
if err != nil {
return nil, err
}

resp, err := dfs.httpClient.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()

if resp.StatusCode/100 != 2 {
return nil, fmt.Errorf("bad response status %s", resp.Status)
}

contentLength, err := strconv.ParseInt(resp.Header.Get(headers.ContentLength), 10, 64)
if err != nil {
return nil, err
}

return &pkgobjectstorage.ObjectMetadata{
ContentDisposition: resp.Header.Get(headers.ContentDisposition),
ContentEncoding: resp.Header.Get(headers.ContentEncoding),
ContentLanguage: resp.Header.Get(headers.ContentLanguage),
ContentLength: int64(contentLength),
ContentType: resp.Header.Get(headers.ContentType),
ETag: resp.Header.Get(headers.ContentType),
Digest: resp.Header.Get(config.HeaderDragonflyObjectMetaDigest),
}, nil
}

// GetUrfsInput is used to construct request of getting object.
type GetUrfsInput struct {

// Endpoint is endpoint name.
Endpoint string

// BucketName is bucket name.
BucketName string

// ObjectKey is object key.
ObjectKey string

// Filter is used to generate a unique Task ID by
// filtering unnecessary query params in the URL,
// it is separated by & character.
Filter string

// Range is the HTTP range header.
Range string

// DstPeer is target peerHost.
DstPeer string
}

// GetObjectWithContext returns data of object.
func (dfs *dfstore) GetUrfsWithContext(ctx context.Context, input *GetUrfsInput) (io.ReadCloser, error) {
req, err := dfs.GetUrfsRequestWithContext(ctx, input)
if err != nil {
return nil, err
}

resp, err := dfs.httpClient.Do(req)
if err != nil {
return nil, err
}

if resp.StatusCode/100 != 2 {
return nil, fmt.Errorf("bad response status %s", resp.Status)
}

return resp.Body, nil
}

// GetObjectRequestWithContext returns *http.Request of getting object.
func (dfs *dfstore) GetUrfsRequestWithContext(ctx context.Context, input *GetUrfsInput) (*http.Request, error) {
if err := input.Validate(); err != nil {
return nil, err
}

dstUrl := url.URL{
Scheme: "http",
Host: fmt.Sprintf("%s:%d", input.DstPeer, config.DefaultObjectStorageStartPort),
}

u, err := url.Parse(dstUrl.String())
if err != nil {
return nil, err
}

u.Path = path.Join("buckets", input.BucketName+"."+input.Endpoint, "cache_object", input.ObjectKey)

query := u.Query()
if input.Filter != "" {
query.Set("filter", input.Filter)
}
u.RawQuery = query.Encode()
req, err := http.NewRequestWithContext(ctx, http.MethodPost, u.String(), nil)
if err != nil {
return nil, err
}

if input.Range != "" {
req.Header.Set(headers.Range, input.Range)
}

return req, nil
}

// Validate validates GetUrfsInput fields.
func (i *GetUrfsInput) Validate() error {

if i.Endpoint == "" {
return errors.New("invalid Endpoint")

}

if i.BucketName == "" {
return errors.New("invalid BucketName")

}

if i.ObjectKey == "" {
return errors.New("invalid ObjectKey")
}

return nil
}

// GetUrfsStatusWithContext returns schedule task status.
func (dfs *dfstore) GetUrfsStatusWithContext(ctx context.Context, input *GetUrfsInput) (io.ReadCloser, error) {
req, err := dfs.GetUrfsStatusRequestWithContext(ctx, input)
if err != nil {
return nil, err
}

resp, err := dfs.httpClient.Do(req)
if err != nil {
return nil, err
}

if resp.StatusCode/100 != 2 {
return nil, fmt.Errorf("bad response status %s", resp.Status)
}

return resp.Body, nil
}

// GetObjectStatusRequestWithContext returns *http.Request of check schedule task status.
func (dfs *dfstore) GetUrfsStatusRequestWithContext(ctx context.Context, input *GetUrfsInput) (*http.Request, error) {
if err := input.Validate(); err != nil {
return nil, err
}

dstUrl := url.URL{
Scheme: "http",
Host: fmt.Sprintf("%s:%d", input.DstPeer, config.DefaultObjectStorageStartPort),
}

u, err := url.Parse(dstUrl.String())
if err != nil {
return nil, err
}

u.Path = path.Join("buckets", input.BucketName+"."+input.Endpoint, "check_object", input.ObjectKey)

query := u.Query()
if input.Filter != "" {
query.Set("filter", input.Filter)
}
u.RawQuery = query.Encode()
req, err := http.NewRequestWithContext(ctx, http.MethodGet, u.String(), nil)
if err != nil {
return nil, err
}

if input.Range != "" {
req.Header.Set(headers.Range, input.Range)
}

return req, nil
}

+ 47
- 0
modules/urfs_client/objectstorage/objectstorage.go View File

@@ -0,0 +1,47 @@
/*
* Copyright 2022 The Dragonfly Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

//go:generate mockgen -destination mocks/objectstorage_mock.go -source objectstorage.go -package mocks

package objectstorage

type ObjectMetadata struct {
// Key is object key.
Key string

// ContentDisposition is Content-Disposition header.
ContentDisposition string

// ContentEncoding is Content-Encoding header.
ContentEncoding string

// ContentLanguage is Content-Language header.
ContentLanguage string

// ContentLanguage is Content-Length header.
ContentLength int64

// ContentType is Content-Type header.
ContentType string

// ETag is ETag header.
ETag string

// Digest is object digest.
Digest string
}



+ 268
- 0
modules/urfs_client/urchin/urchinfs.go View File

@@ -0,0 +1,268 @@
package urchin

import (
"context"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"net/url"
"strconv"
"strings"

"git.openi.org.cn/OpenI/Grampus/server/common/urfs_client/config"
urfs "git.openi.org.cn/OpenI/Grampus/server/common/urfs_client/dfstore"
)

type Urchinfs interface {

// schedule source dataset to target peer
ScheduleDataToPeer(sourceUrl, destPeerHost string) (*PeerResult, error)

// check schedule data to peer task status
CheckScheduleTaskStatus(sourceUrl, destPeerHost string) (*PeerResult, error)

ScheduleDataToPeerByKey(endpoint, bucketName, objectKey, destPeerHost string) (*PeerResult, error)

CheckScheduleTaskStatusByKey(endpoint, bucketName, objectKey, destPeerHost string) (*PeerResult, error)
}

type urchinfs struct {
// Initialize default urfs config.
cfg *config.DfstoreConfig
}

// New urchinfs instance.
func New() Urchinfs {

urfs := &urchinfs{
cfg: config.NewDfstore(),
}
return urfs
}

const (
// UrfsScheme if the scheme of object storage.
UrfsScheme = "urfs"
)

func (urfs *urchinfs) ScheduleDataToPeer(sourceUrl, destPeerHost string) (*PeerResult, error) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

if err := urfs.cfg.Validate(); err != nil {
return nil, err
}

if err := validateSchedulelArgs(sourceUrl, destPeerHost); err != nil {
return nil, err
}

// Copy object storage to local file.
endpoint, bucketName, objectKey, err := parseUrfsURL(sourceUrl)
if err != nil {
return nil, err
}
peerResult, err := processScheduleDataToPeer(ctx, urfs.cfg, endpoint, bucketName, objectKey, destPeerHost)
if err != nil {
return nil, err
}

return peerResult, err
}

func (urfs *urchinfs) ScheduleDataToPeerByKey(endpoint, bucketName, objectKey, destPeerHost string) (*PeerResult, error) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

peerResult, err := processScheduleDataToPeer(ctx, urfs.cfg, endpoint, bucketName, objectKey, destPeerHost)
if err != nil {
return nil, err
}

return peerResult, err
}

func (urfs *urchinfs) CheckScheduleTaskStatus(sourceUrl, destPeerHost string) (*PeerResult, error) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

if err := urfs.cfg.Validate(); err != nil {
return nil, err
}

if err := validateSchedulelArgs(sourceUrl, destPeerHost); err != nil {
return nil, err
}

// Copy object storage to local file.
endpoint, bucketName, objectKey, err := parseUrfsURL(sourceUrl)
if err != nil {
return nil, err
}
peerResult, err := processCheckScheduleTaskStatus(ctx, urfs.cfg, endpoint, bucketName, objectKey, destPeerHost)
if err != nil {
return nil, err
}

return peerResult, err
}

func (urfs *urchinfs) CheckScheduleTaskStatusByKey(endpoint, bucketName, objectKey, destPeerHost string) (*PeerResult, error) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

peerResult, err := processCheckScheduleTaskStatus(ctx, urfs.cfg, endpoint, bucketName, objectKey, destPeerHost)
if err != nil {
return nil, err
}

return peerResult, err
}

// isUrfsURL determines whether the raw url is urfs url.
func isUrfsURL(rawURL string) bool {
u, err := url.ParseRequestURI(rawURL)
if err != nil {
return false
}

if u.Scheme != UrfsScheme || u.Host == "" || u.Path == "" {
return false
}

return true
}

// Validate copy arguments.
func validateSchedulelArgs(sourceUrl, destPeer string) error {
if !isUrfsURL(sourceUrl) {
return errors.New("source url should be urfs:// protocol")
}

return nil
}

// Parse object storage url. eg: urfs://源数据$endpoint/源数据$bucket/源数据filepath
func parseUrfsURL(rawURL string) (string, string, string, error) {
u, err := url.ParseRequestURI(rawURL)
if err != nil {
return "", "", "", err
}

if u.Scheme != UrfsScheme {
return "", "", "", fmt.Errorf("invalid scheme, e.g. %s://endpoint/bucket_name/object_key", UrfsScheme)
}

if u.Host == "" {
return "", "", "", errors.New("empty endpoint name")
}

if u.Path == "" {
return "", "", "", errors.New("empty object path")
}

bucket, key, found := strings.Cut(strings.Trim(u.Path, "/"), "/")
if found == false {
return "", "", "", errors.New("invalid bucket and object key " + u.Path)
}

return u.Host, bucket, key, nil
}

// Schedule object storage to peer.
func processScheduleDataToPeer(ctx context.Context, cfg *config.DfstoreConfig, endpoint, bucketName, objectKey, dstPeer string) (*PeerResult, error) {
dfs := urfs.New(cfg.Endpoint)
meta, err := dfs.GetUrfsMetadataWithContext(ctx, &urfs.GetUrfsMetadataInput{
Endpoint: endpoint,
BucketName: bucketName,
ObjectKey: objectKey,
DstPeer: dstPeer,
})
if err != nil {
return nil, err
}

reader, err := dfs.GetUrfsWithContext(ctx, &urfs.GetUrfsInput{
Endpoint: endpoint,
BucketName: bucketName,
ObjectKey: objectKey,
DstPeer: dstPeer,
})
if err != nil {
return nil, err
}
defer reader.Close()

body, err := ioutil.ReadAll(reader)

var peerResult PeerResult
if err == nil {
err = json.Unmarshal((body), &peerResult)
}
peerResult.SignedUrl = strings.ReplaceAll(peerResult.SignedUrl, "\\u0026", "&")

fileContentLength, err := strconv.ParseInt(peerResult.ContentLength, 10, 64)
if err != nil {
return nil, err
}
if fileContentLength != meta.ContentLength {
return nil, errors.New("content length inconsistent with meta")
}

return &peerResult, err
}

// check schedule task status.
func processCheckScheduleTaskStatus(ctx context.Context, cfg *config.DfstoreConfig, endpoint, bucketName, objectKey, dstPeer string) (*PeerResult, error) {
dfs := urfs.New(cfg.Endpoint)
meta, err := dfs.GetUrfsMetadataWithContext(ctx, &urfs.GetUrfsMetadataInput{
Endpoint: endpoint,
BucketName: bucketName,
ObjectKey: objectKey,
DstPeer: dstPeer,
})
if err != nil {
return nil, err
}

reader, err := dfs.GetUrfsStatusWithContext(ctx, &urfs.GetUrfsInput{
Endpoint: endpoint,
BucketName: bucketName,
ObjectKey: objectKey,
DstPeer: dstPeer,
})
if err != nil {
return nil, err
}
defer reader.Close()

body, err := ioutil.ReadAll(reader)

var peerResult PeerResult
if err == nil {
err = json.Unmarshal((body), &peerResult)
}
peerResult.SignedUrl = strings.ReplaceAll(peerResult.SignedUrl, "\\u0026", "&")

fileContentLength, err := strconv.ParseInt(peerResult.ContentLength, 10, 64)
if err != nil {
return nil, err
}
if fileContentLength != meta.ContentLength {
return nil, err
}
return &peerResult, err
}

type PeerResult struct {
ContentType string `json:"Content-Type"`
ContentLength string `json:"Content-Length"`
SignedUrl string
DataRoot string
DataPath string
DataEndpoint string
StatusCode int
StatusMsg string
TaskID string
}

+ 1
- 0
routers/api/v1/repo/modelarts.go View File

@@ -180,6 +180,7 @@ func GetModelArtsTrainJobVersion(ctx *context.APIContext) {
}
if oldStatus != job.Status {
notification.NotifyChangeCloudbrainStatus(job, oldStatus)
// todo: get model back
}
err = models.UpdateTrainJobVersion(job)
if err != nil {


+ 1
- 0
routers/repo/cloudbrain.go View File

@@ -1939,6 +1939,7 @@ func SyncCloudbrainStatus() {
task.CorrectCreateUnix()
if oldStatus != task.Status {
notification.NotifyChangeCloudbrainStatus(task, oldStatus)
// todo: get model back
}
err = models.UpdateJob(task)
if err != nil {


+ 20
- 9
routers/repo/grampus.go View File

@@ -1,6 +1,7 @@
package repo

import (
"code.gitea.io/gitea/modules/urfs_client/urchin"
"encoding/json"
"errors"
"fmt"
@@ -680,8 +681,7 @@ func grampusTrainJobNpuCreate(ctx *context.Context, form auth.CreateGrampusTrain

//prepare command
preTrainModelPath := getPreTrainModelPath(form.PreTrainModelUrl, form.CkptName)
modelRemoteObsUrl := "s3:///grampus/jobs/" + jobName + "/output/models.zip"
command, err := generateCommand(repo.Name, grampus.ProcessorTypeNPU, codeObsPath+cloudbrain.DefaultBranchName+".zip", datasetRemotePath, bootFile, params, setting.CodePathPrefix+jobName+modelarts.OutputPath, allFileName, preTrainModelPath, form.CkptName, modelRemoteObsUrl)
command, err := generateCommand(repo.Name, grampus.ProcessorTypeNPU, codeObsPath+cloudbrain.DefaultBranchName+".zip", datasetRemotePath, bootFile, params, setting.CodePathPrefix+jobName+modelarts.OutputPath, allFileName, preTrainModelPath, form.CkptName, grampus.GetNpuModelRemoteObsUrl(jobName))
if err != nil {
log.Error("Failed to generateCommand: %s (%v)", displayJobName, err, ctx.Data["MsgID"])
grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeNPU)
@@ -855,7 +855,7 @@ func GrampusTrainJobShow(ctx *context.Context) {
}
oldStatus := task.Status
task.Status = grampus.TransTrainJobStatus(result.JobInfo.Status)
if task.Status != result.JobInfo.Status || result.JobInfo.Status == models.GrampusStatusRunning {
if task.Status != oldStatus || task.Status == models.GrampusStatusRunning {
task.Duration = result.JobInfo.RunSec
if task.Duration < 0 {
task.Duration = 0
@@ -871,6 +871,15 @@ func GrampusTrainJobShow(ctx *context.Context) {
task.CorrectCreateUnix()
if oldStatus != task.Status {
notification.NotifyChangeCloudbrainStatus(task, oldStatus)
if models.IsTrainJobTerminal(task.Status) {
//get model back
urfs := urchin.New()
res, err := urfs.ScheduleDataToPeerByKey(endPoint, grampus.BucketRemote, objectKey, dstPeer)
if err != nil {
log.Error("ScheduleDataToPeer failed:%v", err)
return isExist, dataUrl, err
}
}
}
err = models.UpdateJob(task)
if err != nil {
@@ -971,12 +980,15 @@ func GrampusGetLog(ctx *context.Context) {
func generateCommand(repoName, processorType, codeRemotePath, dataRemotePath, bootFile, paramSrc, outputRemotePath, datasetName, pretrainModelPath, pretrainModelFileName, modelRemoteObsUrl string) (string, error) {
var command string

//prepare
workDir := grampus.NpuWorkDir
if processorType == grampus.ProcessorTypeGPU {
if processorType == grampus.ProcessorTypeNPU {
command += "pwd;cd " + workDir + grampus.CommandPrepareScriptNpu
} else if processorType == grampus.ProcessorTypeGPU {
workDir = grampus.GpuWorkDir
command += "pwd;cd " + workDir + fmt.Sprintf(grampus.CommandPrepareScriptGpu, setting.Grampus.SyncScriptProject, setting.Grampus.SyncScriptProject)
}

command += "pwd;cd " + workDir + fmt.Sprintf(grampus.CommandPrepareScript, setting.Grampus.SyncScriptProject, setting.Grampus.SyncScriptProject)
//download code & dataset
if processorType == grampus.ProcessorTypeNPU {
//no need to download code & dataset by internet
@@ -1025,8 +1037,8 @@ func generateCommand(repoName, processorType, codeRemotePath, dataRemotePath, bo

var commandCode string
if processorType == grampus.ProcessorTypeNPU {
paramCode += " --obs_url=" + modelRemoteObsUrl
commandCode = "/bin/bash /home/work/run_train_for_openi.sh /home/work/openi.py /tmp/log/train.log" + paramCode + ";"
paramCode += " --model_url=" + modelRemoteObsUrl
commandCode = "/bin/bash /home/work/run_train_for_openi.sh /home/work/openi.py " + grampus.NpuLocalLogUrl + paramCode + ";"
} else if processorType == grampus.ProcessorTypeGPU {
if pretrainModelFileName != "" {
paramCode += " --ckpt_url" + "=" + workDir + "pretrainmodel/" + pretrainModelFileName
@@ -1042,8 +1054,7 @@ func generateCommand(repoName, processorType, codeRemotePath, dataRemotePath, bo

//upload models
if processorType == grampus.ProcessorTypeNPU {
commandUpload := "cd " + workDir + setting.Grampus.SyncScriptProject + "/;./uploader_for_npu " + setting.Bucket + " " + outputRemotePath + " " + workDir + "output/;"
command += commandUpload
// no need to upload
} else if processorType == grampus.ProcessorTypeGPU {
commandUpload := "cd " + workDir + setting.Grampus.SyncScriptProject + "/;./uploader_for_gpu " + setting.Grampus.Env + " " + outputRemotePath + " " + workDir + "output/;"
command += commandUpload


Loading…
Cancel
Save