| @@ -88,15 +88,15 @@ func (s *PackageService) Create(ctx *gin.Context) { | |||||
| } | } | ||||
| type PackageCreateLoad struct { | type PackageCreateLoad struct { | ||||
| Info cdsapi.PackageCreateLoad `form:"info" binding:"required"` | |||||
| Files []*multipart.FileHeader `form:"files"` | |||||
| Info cdsapi.PackageCreateLoadInfo `form:"info" binding:"required"` | |||||
| Files []*multipart.FileHeader `form:"files"` | |||||
| } | } | ||||
| func (s *PackageService) CreateLoad(ctx *gin.Context) { | func (s *PackageService) CreateLoad(ctx *gin.Context) { | ||||
| log := logger.WithField("HTTP", "Package.CreateLoad") | log := logger.WithField("HTTP", "Package.CreateLoad") | ||||
| var req PackageCreateLoad | var req PackageCreateLoad | ||||
| if err := ctx.ShouldBindJSON(&req); err != nil { | |||||
| if err := ctx.ShouldBind(&req); err != nil { | |||||
| log.Warnf("binding body: %s", err.Error()) | log.Warnf("binding body: %s", err.Error()) | ||||
| ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) | ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) | ||||
| return | return | ||||
| @@ -53,6 +53,7 @@ func (s *Server) initRouters() { | |||||
| rt.GET(cdsapi.PackageGetPath, s.Package().Get) | rt.GET(cdsapi.PackageGetPath, s.Package().Get) | ||||
| rt.GET(cdsapi.PackageGetByNamePath, s.Package().GetByName) | rt.GET(cdsapi.PackageGetByNamePath, s.Package().GetByName) | ||||
| rt.POST(cdsapi.PackageCreatePath, s.Package().Create) | rt.POST(cdsapi.PackageCreatePath, s.Package().Create) | ||||
| rt.POST(cdsapi.PackageCreateLoadPath, s.Package().CreateLoad) | |||||
| rt.POST(cdsapi.PackageDeletePath, s.Package().Delete) | rt.POST(cdsapi.PackageDeletePath, s.Package().Delete) | ||||
| rt.GET(cdsapi.PackageListBucketPackagesPath, s.Package().ListBucketPackages) | rt.GET(cdsapi.PackageListBucketPackagesPath, s.Package().ListBucketPackages) | ||||
| rt.GET(cdsapi.PackageGetCachedStoragesPath, s.Package().GetCachedStorages) | rt.GET(cdsapi.PackageGetCachedStoragesPath, s.Package().GetCachedStorages) | ||||
| @@ -53,7 +53,7 @@ func (db *StorageDB) BatchGetAllStorageIDs(ctx SQLContext, start int, count int) | |||||
| func (db *StorageDB) IsAvailable(ctx SQLContext, userID cdssdk.UserID, storageID cdssdk.StorageID) (bool, error) { | func (db *StorageDB) IsAvailable(ctx SQLContext, userID cdssdk.UserID, storageID cdssdk.StorageID) (bool, error) { | ||||
| rows, err := ctx.Table("Storage").Select("Storage.StorageID"). | rows, err := ctx.Table("Storage").Select("Storage.StorageID"). | ||||
| Joins("inner join UserStorage on Storage.StorageID = UserStorage.StorageID"). | Joins("inner join UserStorage on Storage.StorageID = UserStorage.StorageID"). | ||||
| Where("UserID = ? and StorageID = ?", userID, storageID).Rows() | |||||
| Where("UserID = ? and Storage.StorageID = ?", userID, storageID).Rows() | |||||
| if err != nil { | if err != nil { | ||||
| return false, fmt.Errorf("execute sql: %w", err) | return false, fmt.Errorf("execute sql: %w", err) | ||||
| } | } | ||||
| @@ -66,7 +66,7 @@ func (db *StorageDB) GetUserStorage(ctx SQLContext, userID cdssdk.UserID, storag | |||||
| var stg model.Storage | var stg model.Storage | ||||
| err := ctx.Table("Storage").Select("Storage.*"). | err := ctx.Table("Storage").Select("Storage.*"). | ||||
| Joins("inner join UserStorage on Storage.StorageID = UserStorage.StorageID"). | Joins("inner join UserStorage on Storage.StorageID = UserStorage.StorageID"). | ||||
| Where("UserID = ? and StorageID = ?", userID, storageID).First(&stg).Error | |||||
| Where("UserID = ? and Storage.StorageID = ?", userID, storageID).First(&stg).Error | |||||
| return stg, err | return stg, err | ||||
| } | } | ||||
| @@ -33,7 +33,10 @@ func (o *CloneStream) Execute(ctx *exec.ExecContext, e *exec.Executor) error { | |||||
| sem := semaphore.NewWeighted(int64(len(o.Cloneds))) | sem := semaphore.NewWeighted(int64(len(o.Cloneds))) | ||||
| for i, s := range cloned { | for i, s := range cloned { | ||||
| sem.Acquire(ctx.Context, 1) | |||||
| err = sem.Acquire(ctx.Context, 1) | |||||
| if err != nil { | |||||
| return err | |||||
| } | |||||
| e.PutVar(o.Cloneds[i], &exec.StreamValue{ | e.PutVar(o.Cloneds[i], &exec.StreamValue{ | ||||
| Stream: io2.AfterReadClosedOnce(s, func(closer io.ReadCloser) { | Stream: io2.AfterReadClosedOnce(s, func(closer io.ReadCloser) { | ||||
| @@ -11,7 +11,7 @@ import ( | |||||
| ) | ) | ||||
| func init() { | func init() { | ||||
| exec.UseOp[*ShardWrite]() | |||||
| exec.UseOp[*SharedLoad]() | |||||
| } | } | ||||
| type SharedLoad struct { | type SharedLoad struct { | ||||
| @@ -99,9 +99,12 @@ func (t *SharedLoadNode) FullPathVar() *dag.Var { | |||||
| } | } | ||||
| func (t *SharedLoadNode) GenerateOp() (exec.Op, error) { | func (t *SharedLoadNode) GenerateOp() (exec.Op, error) { | ||||
| return &ShardWrite{ | |||||
| Input: t.InputStreams().Get(0).VarID, | |||||
| FileHash: t.OutputValues().Get(0).VarID, | |||||
| StorageID: t.StorageID, | |||||
| return &SharedLoad{ | |||||
| Input: t.InputStreams().Get(0).VarID, | |||||
| StorageID: t.StorageID, | |||||
| UserID: t.UserID, | |||||
| PackageID: t.PackageID, | |||||
| Path: t.Path, | |||||
| FullPathOutput: t.OutputValues().Get(0).VarID, | |||||
| }, nil | }, nil | ||||
| } | } | ||||
| @@ -150,6 +150,8 @@ func (s *ShardStore) createTempFile() (*os.File, error) { | |||||
| } | } | ||||
| func (s *ShardStore) writeTempFile(file *os.File, stream io.Reader) (int64, cdssdk.FileHash, error) { | func (s *ShardStore) writeTempFile(file *os.File, stream io.Reader) (int64, cdssdk.FileHash, error) { | ||||
| defer file.Close() | |||||
| buf := make([]byte, 32*1024) | buf := make([]byte, 32*1024) | ||||
| size := int64(0) | size := int64(0) | ||||
| @@ -13,6 +13,7 @@ import ( | |||||
| cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" | cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" | ||||
| stgmod "gitlink.org.cn/cloudream/storage/common/models" | stgmod "gitlink.org.cn/cloudream/storage/common/models" | ||||
| "gitlink.org.cn/cloudream/storage/common/pkgs/storage/types" | "gitlink.org.cn/cloudream/storage/common/pkgs/storage/types" | ||||
| "gitlink.org.cn/cloudream/storage/common/pkgs/storage/utils" | |||||
| ) | ) | ||||
| type SharedStore struct { | type SharedStore struct { | ||||
| @@ -42,7 +43,7 @@ func (s *SharedStore) Stop() { | |||||
| } | } | ||||
| func (s *SharedStore) WritePackageObject(userID cdssdk.UserID, pkgID cdssdk.PackageID, path string, stream io.Reader) (string, error) { | func (s *SharedStore) WritePackageObject(userID cdssdk.UserID, pkgID cdssdk.PackageID, path string, stream io.Reader) (string, error) { | ||||
| relaPath := filepath.Join(fmt.Sprintf("%v", userID), fmt.Sprintf("%v", pkgID), path) | |||||
| relaPath := filepath.Join(utils.MakeLoadedPackagePath(userID, pkgID), path) | |||||
| fullPath := filepath.Join(s.cfg.LoadBase, relaPath) | fullPath := filepath.Join(s.cfg.LoadBase, relaPath) | ||||
| err := os.MkdirAll(filepath.Dir(fullPath), 0755) | err := os.MkdirAll(filepath.Dir(fullPath), 0755) | ||||
| if err != nil { | if err != nil { | ||||
| @@ -1,12 +1,12 @@ | |||||
| package utils | package utils | ||||
| import ( | import ( | ||||
| "fmt" | |||||
| "path/filepath" | "path/filepath" | ||||
| "strconv" | |||||
| cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" | cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" | ||||
| ) | ) | ||||
| func MakeLoadedPackagePath(userID cdssdk.UserID, packageID cdssdk.PackageID) string { | func MakeLoadedPackagePath(userID cdssdk.UserID, packageID cdssdk.PackageID) string { | ||||
| return filepath.Join("packages", strconv.FormatInt(int64(userID), 10), strconv.FormatInt(int64(packageID), 10)) | |||||
| return filepath.Join(fmt.Sprintf("%v", userID), fmt.Sprintf("%v", packageID)) | |||||
| } | } | ||||
| @@ -102,6 +102,7 @@ func (u *CreateLoadUploader) Commit() (CreateLoadResult, error) { | |||||
| } | } | ||||
| ret := CreateLoadResult{ | ret := CreateLoadResult{ | ||||
| Package: u.pkg, | |||||
| Objects: make(map[string]cdssdk.Object), | Objects: make(map[string]cdssdk.Object), | ||||
| } | } | ||||