diff --git a/client/internal/http/package.go b/client/internal/http/package.go index d4b1c80..306d40b 100644 --- a/client/internal/http/package.go +++ b/client/internal/http/package.go @@ -88,15 +88,15 @@ func (s *PackageService) Create(ctx *gin.Context) { } type PackageCreateLoad struct { - Info cdsapi.PackageCreateLoad `form:"info" binding:"required"` - Files []*multipart.FileHeader `form:"files"` + Info cdsapi.PackageCreateLoadInfo `form:"info" binding:"required"` + Files []*multipart.FileHeader `form:"files"` } func (s *PackageService) CreateLoad(ctx *gin.Context) { log := logger.WithField("HTTP", "Package.CreateLoad") var req PackageCreateLoad - if err := ctx.ShouldBindJSON(&req); err != nil { + if err := ctx.ShouldBind(&req); err != nil { log.Warnf("binding body: %s", err.Error()) ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) return diff --git a/client/internal/http/server.go b/client/internal/http/server.go index 562601c..40d61d1 100644 --- a/client/internal/http/server.go +++ b/client/internal/http/server.go @@ -53,6 +53,7 @@ func (s *Server) initRouters() { rt.GET(cdsapi.PackageGetPath, s.Package().Get) rt.GET(cdsapi.PackageGetByNamePath, s.Package().GetByName) rt.POST(cdsapi.PackageCreatePath, s.Package().Create) + rt.POST(cdsapi.PackageCreateLoadPath, s.Package().CreateLoad) rt.POST(cdsapi.PackageDeletePath, s.Package().Delete) rt.GET(cdsapi.PackageListBucketPackagesPath, s.Package().ListBucketPackages) rt.GET(cdsapi.PackageGetCachedStoragesPath, s.Package().GetCachedStorages) diff --git a/common/pkgs/db2/storage.go b/common/pkgs/db2/storage.go index c702b4c..f2ad541 100644 --- a/common/pkgs/db2/storage.go +++ b/common/pkgs/db2/storage.go @@ -53,7 +53,7 @@ func (db *StorageDB) BatchGetAllStorageIDs(ctx SQLContext, start int, count int) func (db *StorageDB) IsAvailable(ctx SQLContext, userID cdssdk.UserID, storageID cdssdk.StorageID) (bool, error) { rows, err := ctx.Table("Storage").Select("Storage.StorageID"). Joins("inner join UserStorage on Storage.StorageID = UserStorage.StorageID"). - Where("UserID = ? and StorageID = ?", userID, storageID).Rows() + Where("UserID = ? and Storage.StorageID = ?", userID, storageID).Rows() if err != nil { return false, fmt.Errorf("execute sql: %w", err) } @@ -66,7 +66,7 @@ func (db *StorageDB) GetUserStorage(ctx SQLContext, userID cdssdk.UserID, storag var stg model.Storage err := ctx.Table("Storage").Select("Storage.*"). Joins("inner join UserStorage on Storage.StorageID = UserStorage.StorageID"). - Where("UserID = ? and StorageID = ?", userID, storageID).First(&stg).Error + Where("UserID = ? and Storage.StorageID = ?", userID, storageID).First(&stg).Error return stg, err } diff --git a/common/pkgs/ioswitch2/ops2/clone.go b/common/pkgs/ioswitch2/ops2/clone.go index 7ede930..578a237 100644 --- a/common/pkgs/ioswitch2/ops2/clone.go +++ b/common/pkgs/ioswitch2/ops2/clone.go @@ -33,7 +33,10 @@ func (o *CloneStream) Execute(ctx *exec.ExecContext, e *exec.Executor) error { sem := semaphore.NewWeighted(int64(len(o.Cloneds))) for i, s := range cloned { - sem.Acquire(ctx.Context, 1) + err = sem.Acquire(ctx.Context, 1) + if err != nil { + return err + } e.PutVar(o.Cloneds[i], &exec.StreamValue{ Stream: io2.AfterReadClosedOnce(s, func(closer io.ReadCloser) { diff --git a/common/pkgs/ioswitch2/ops2/shared_store.go b/common/pkgs/ioswitch2/ops2/shared_store.go index 112d06c..72764ad 100644 --- a/common/pkgs/ioswitch2/ops2/shared_store.go +++ b/common/pkgs/ioswitch2/ops2/shared_store.go @@ -11,7 +11,7 @@ import ( ) func init() { - exec.UseOp[*ShardWrite]() + exec.UseOp[*SharedLoad]() } type SharedLoad struct { @@ -99,9 +99,12 @@ func (t *SharedLoadNode) FullPathVar() *dag.Var { } func (t *SharedLoadNode) GenerateOp() (exec.Op, error) { - return &ShardWrite{ - Input: t.InputStreams().Get(0).VarID, - FileHash: t.OutputValues().Get(0).VarID, - StorageID: t.StorageID, + return &SharedLoad{ + Input: t.InputStreams().Get(0).VarID, + StorageID: t.StorageID, + UserID: t.UserID, + PackageID: t.PackageID, + Path: t.Path, + FullPathOutput: t.OutputValues().Get(0).VarID, }, nil } diff --git a/common/pkgs/storage/local/shard_store.go b/common/pkgs/storage/local/shard_store.go index 557ed6f..b8633a7 100644 --- a/common/pkgs/storage/local/shard_store.go +++ b/common/pkgs/storage/local/shard_store.go @@ -150,6 +150,8 @@ func (s *ShardStore) createTempFile() (*os.File, error) { } func (s *ShardStore) writeTempFile(file *os.File, stream io.Reader) (int64, cdssdk.FileHash, error) { + defer file.Close() + buf := make([]byte, 32*1024) size := int64(0) diff --git a/common/pkgs/storage/local/shared_store.go b/common/pkgs/storage/local/shared_store.go index b285ab4..33c596a 100644 --- a/common/pkgs/storage/local/shared_store.go +++ b/common/pkgs/storage/local/shared_store.go @@ -13,6 +13,7 @@ import ( cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" stgmod "gitlink.org.cn/cloudream/storage/common/models" "gitlink.org.cn/cloudream/storage/common/pkgs/storage/types" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/utils" ) type SharedStore struct { @@ -42,7 +43,7 @@ func (s *SharedStore) Stop() { } func (s *SharedStore) WritePackageObject(userID cdssdk.UserID, pkgID cdssdk.PackageID, path string, stream io.Reader) (string, error) { - relaPath := filepath.Join(fmt.Sprintf("%v", userID), fmt.Sprintf("%v", pkgID), path) + relaPath := filepath.Join(utils.MakeLoadedPackagePath(userID, pkgID), path) fullPath := filepath.Join(s.cfg.LoadBase, relaPath) err := os.MkdirAll(filepath.Dir(fullPath), 0755) if err != nil { diff --git a/common/pkgs/storage/utils/utils.go b/common/pkgs/storage/utils/utils.go index 8a7f6b6..97b2ec7 100644 --- a/common/pkgs/storage/utils/utils.go +++ b/common/pkgs/storage/utils/utils.go @@ -1,12 +1,12 @@ package utils import ( + "fmt" "path/filepath" - "strconv" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" ) func MakeLoadedPackagePath(userID cdssdk.UserID, packageID cdssdk.PackageID) string { - return filepath.Join("packages", strconv.FormatInt(int64(userID), 10), strconv.FormatInt(int64(packageID), 10)) + return filepath.Join(fmt.Sprintf("%v", userID), fmt.Sprintf("%v", packageID)) } diff --git a/common/pkgs/uploader/create_load.go b/common/pkgs/uploader/create_load.go index 5e50cf2..1605a2e 100644 --- a/common/pkgs/uploader/create_load.go +++ b/common/pkgs/uploader/create_load.go @@ -102,6 +102,7 @@ func (u *CreateLoadUploader) Commit() (CreateLoadResult, error) { } ret := CreateLoadResult{ + Package: u.pkg, Objects: make(map[string]cdssdk.Object), }