| @@ -106,7 +106,7 @@ func (t *StorageLoadPackage) do(ctx TaskContext) error { | |||||
| } | } | ||||
| } | } | ||||
| _, err = coorCli.StoragePackageLoaded(coormq.NewStoragePackageLoaded(t.userID, t.packageID, t.storageID)) | |||||
| _, err = coorCli.StoragePackageLoaded(coormq.NewStoragePackageLoaded(t.userID, t.storageID, t.packageID)) | |||||
| if err != nil { | if err != nil { | ||||
| return fmt.Errorf("loading package to storage: %w", err) | return fmt.Errorf("loading package to storage: %w", err) | ||||
| } | } | ||||
| @@ -144,19 +144,18 @@ create table Cache ( | |||||
| ) comment = '缓存表'; | ) comment = '缓存表'; | ||||
| create table StoragePackage ( | create table StoragePackage ( | ||||
| PackageID int not null comment '包ID', | |||||
| StorageID int not null comment '存储服务ID', | StorageID int not null comment '存储服务ID', | ||||
| PackageID int not null comment '包ID', | |||||
| UserID int not null comment '调度了此文件的用户ID', | UserID int not null comment '调度了此文件的用户ID', | ||||
| State varchar(100) not null comment '包状态', | State varchar(100) not null comment '包状态', | ||||
| primary key(PackageID, StorageID, UserID) | |||||
| primary key(StorageID, PackageID, UserID) | |||||
| ); | ); | ||||
| create table StoragePackageLog ( | create table StoragePackageLog ( | ||||
| PackageID int not null comment '包ID', | PackageID int not null comment '包ID', | ||||
| StorageID int not null comment '存储服务ID', | StorageID int not null comment '存储服务ID', | ||||
| UserID int not null comment '调度了此文件的用户ID', | UserID int not null comment '调度了此文件的用户ID', | ||||
| CreateTime timestamp not null comment '加载Package完成的时间', | |||||
| primary key(PackageID, StorageID, UserID) | |||||
| CreateTime timestamp not null comment '加载Package完成的时间' | |||||
| ); | ); | ||||
| create table Location ( | create table Location ( | ||||
| @@ -34,8 +34,8 @@ func (*StoragePackageDB) GetAllByStorageID(ctx SQLContext, storageID cdssdk.Stor | |||||
| return ret, err | return ret, err | ||||
| } | } | ||||
| func (*StoragePackageDB) LoadPackage(ctx SQLContext, packageID cdssdk.PackageID, storageID cdssdk.StorageID, userID cdssdk.UserID) error { | |||||
| _, err := ctx.Exec("insert into StoragePackage values(?,?,?,?)", packageID, storageID, userID, model.StoragePackageStateNormal) | |||||
| func (*StoragePackageDB) Create(ctx SQLContext, storageID cdssdk.StorageID, packageID cdssdk.PackageID, userID cdssdk.UserID) error { | |||||
| _, err := ctx.Exec("insert into StoragePackage values(?,?,?,?)", storageID, packageID, userID, model.StoragePackageStateNormal) | |||||
| return err | return err | ||||
| } | } | ||||
| @@ -16,16 +16,15 @@ func (db *DB) StoragePackageLog() *StoragePackageLogDB { | |||||
| return &StoragePackageLogDB{DB: db} | return &StoragePackageLogDB{DB: db} | ||||
| } | } | ||||
| func (*StoragePackageLogDB) Get(ctx SQLContext, storageID cdssdk.StorageID, packageID cdssdk.PackageID, userID cdssdk.UserID) (model.StoragePackage, error) { | |||||
| var ret model.StoragePackage | |||||
| func (*StoragePackageLogDB) Get(ctx SQLContext, storageID cdssdk.StorageID, packageID cdssdk.PackageID, userID cdssdk.UserID) (model.StoragePackageLog, error) { | |||||
| var ret model.StoragePackageLog | |||||
| err := sqlx.Get(ctx, &ret, "select * from StoragePackageLog where StorageID = ? and PackageID = ? and UserID = ?", storageID, packageID, userID) | err := sqlx.Get(ctx, &ret, "select * from StoragePackageLog where StorageID = ? and PackageID = ? and UserID = ?", storageID, packageID, userID) | ||||
| return ret, err | return ret, err | ||||
| } | } | ||||
| func (*StoragePackageLogDB) Create(ctx SQLContext, storageID cdssdk.StorageID, packageID cdssdk.PackageID, userID cdssdk.UserID, createTime time.Time) (model.StoragePackage, error) { | |||||
| var ret model.StoragePackage | |||||
| err := sqlx.Get(ctx, &ret, "insert into StoragePackageLog values(?,?,?,?)", storageID, packageID, userID, createTime) | |||||
| return ret, err | |||||
| func (*StoragePackageLogDB) Create(ctx SQLContext, storageID cdssdk.StorageID, packageID cdssdk.PackageID, userID cdssdk.UserID, createTime time.Time) error { | |||||
| _, err := ctx.Exec("insert into StoragePackageLog values(?,?,?,?)", storageID, packageID, userID, createTime) | |||||
| return err | |||||
| } | } | ||||
| func (*StoragePackageLogDB) Delete(ctx SQLContext, storageID cdssdk.StorageID, packageID cdssdk.PackageID, userID cdssdk.UserID) error { | func (*StoragePackageLogDB) Delete(ctx SQLContext, storageID cdssdk.StorageID, packageID cdssdk.PackageID, userID cdssdk.UserID) error { | ||||
| @@ -52,14 +52,14 @@ var _ = Register(Service.StoragePackageLoaded) | |||||
| type StoragePackageLoaded struct { | type StoragePackageLoaded struct { | ||||
| mq.MessageBodyBase | mq.MessageBodyBase | ||||
| UserID cdssdk.UserID `json:"userID"` | UserID cdssdk.UserID `json:"userID"` | ||||
| PackageID cdssdk.PackageID `json:"packageID"` | |||||
| StorageID cdssdk.StorageID `json:"storageID"` | StorageID cdssdk.StorageID `json:"storageID"` | ||||
| PackageID cdssdk.PackageID `json:"packageID"` | |||||
| } | } | ||||
| type StoragePackageLoadedResp struct { | type StoragePackageLoadedResp struct { | ||||
| mq.MessageBodyBase | mq.MessageBodyBase | ||||
| } | } | ||||
| func NewStoragePackageLoaded(userID cdssdk.UserID, packageID cdssdk.PackageID, stgID cdssdk.StorageID) *StoragePackageLoaded { | |||||
| func NewStoragePackageLoaded(userID cdssdk.UserID, stgID cdssdk.StorageID, packageID cdssdk.PackageID) *StoragePackageLoaded { | |||||
| return &StoragePackageLoaded{ | return &StoragePackageLoaded{ | ||||
| UserID: userID, | UserID: userID, | ||||
| PackageID: packageID, | PackageID: packageID, | ||||
| @@ -2,6 +2,8 @@ package services | |||||
| import ( | import ( | ||||
| "database/sql" | "database/sql" | ||||
| "fmt" | |||||
| "time" | |||||
| "github.com/jmoiron/sqlx" | "github.com/jmoiron/sqlx" | ||||
| "gitlink.org.cn/cloudream/common/consts/errorcode" | "gitlink.org.cn/cloudream/common/consts/errorcode" | ||||
| @@ -24,12 +26,22 @@ func (svc *Service) GetStorageInfo(msg *coormq.GetStorageInfo) (*coormq.GetStora | |||||
| func (svc *Service) StoragePackageLoaded(msg *coormq.StoragePackageLoaded) (*coormq.StoragePackageLoadedResp, *mq.CodeMessage) { | func (svc *Service) StoragePackageLoaded(msg *coormq.StoragePackageLoaded) (*coormq.StoragePackageLoadedResp, *mq.CodeMessage) { | ||||
| // TODO: 对于的storage中已经存在的文件,直接覆盖已有文件 | // TODO: 对于的storage中已经存在的文件,直接覆盖已有文件 | ||||
| err := svc.db.DoTx(sql.LevelDefault, func(tx *sqlx.Tx) error { | err := svc.db.DoTx(sql.LevelDefault, func(tx *sqlx.Tx) error { | ||||
| return svc.db.StoragePackage().LoadPackage(tx, msg.PackageID, msg.StorageID, msg.UserID) | |||||
| err := svc.db.StoragePackage().Create(tx, msg.StorageID, msg.PackageID, msg.UserID) | |||||
| if err != nil { | |||||
| return fmt.Errorf("creating storage package: %w", err) | |||||
| } | |||||
| err = svc.db.StoragePackageLog().Create(tx, msg.StorageID, msg.PackageID, msg.UserID, time.Now()) | |||||
| if err != nil { | |||||
| return fmt.Errorf("creating storage package log: %w", err) | |||||
| } | |||||
| return nil | |||||
| }) | }) | ||||
| if err != nil { | if err != nil { | ||||
| logger.WithField("UserID", msg.UserID). | logger.WithField("UserID", msg.UserID). | ||||
| WithField("PackageID", msg.PackageID). | |||||
| WithField("StorageID", msg.StorageID). | WithField("StorageID", msg.StorageID). | ||||
| WithField("PackageID", msg.PackageID). | |||||
| Warnf("user load package to storage failed, err: %s", err.Error()) | Warnf("user load package to storage failed, err: %s", err.Error()) | ||||
| return nil, mq.Failed(errorcode.OperationFailed, "user load package to storage failed") | return nil, mq.Failed(errorcode.OperationFailed, "user load package to storage failed") | ||||
| } | } | ||||