Browse Source

task execute增加输入值

gitlink
songjc 2 years ago
parent
commit
5869569422
11 changed files with 22 additions and 11 deletions
  1. +2
    -1
      agent/internal/task/cache_move_package.go
  2. +2
    -1
      agent/internal/task/create_ec_package.go
  3. +2
    -1
      agent/internal/task/create_rep_package.go
  4. +2
    -1
      agent/internal/task/download_package.go
  5. +2
    -1
      agent/internal/task/ipfs_pin.go
  6. +2
    -1
      agent/internal/task/ipfs_read.go
  7. +2
    -1
      client/internal/task/create_ec_package.go
  8. +2
    -1
      client/internal/task/create_rep_package.go
  9. +2
    -1
      client/internal/task/storage_load_package.go
  10. +2
    -1
      client/internal/task/update_ec_package.go
  11. +2
    -1
      client/internal/task/update_rep_package.go

+ 2
- 1
agent/internal/task/cache_move_package.go View File

@@ -5,6 +5,7 @@ import (
"time" "time"


"gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/pkgs/task"
"gitlink.org.cn/cloudream/storage/common/globals" "gitlink.org.cn/cloudream/storage/common/globals"
"gitlink.org.cn/cloudream/storage/common/pkgs/db/model" "gitlink.org.cn/cloudream/storage/common/pkgs/db/model"
"gitlink.org.cn/cloudream/storage/common/pkgs/distlock/reqbuilder" "gitlink.org.cn/cloudream/storage/common/pkgs/distlock/reqbuilder"
@@ -23,7 +24,7 @@ func NewCacheMovePackage(userID int64, packageID int64) *CacheMovePackage {
} }
} }


func (t *CacheMovePackage) Execute(ctx TaskContext, complete CompleteFn) {
func (t *CacheMovePackage) Execute(task *task.Task[TaskContext], ctx TaskContext, complete CompleteFn) {
err := t.do(ctx) err := t.do(ctx)
complete(err, CompleteOption{ complete(err, CompleteOption{
RemovingDelay: time.Minute, RemovingDelay: time.Minute,


+ 2
- 1
agent/internal/task/create_ec_package.go View File

@@ -5,6 +5,7 @@ import (


"gitlink.org.cn/cloudream/common/models" "gitlink.org.cn/cloudream/common/models"
"gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/pkgs/task"
"gitlink.org.cn/cloudream/storage/common/pkgs/cmd" "gitlink.org.cn/cloudream/storage/common/pkgs/cmd"
"gitlink.org.cn/cloudream/storage/common/pkgs/iterator" "gitlink.org.cn/cloudream/storage/common/pkgs/iterator"
) )
@@ -23,7 +24,7 @@ func NewCreateECPackage(userID int64, bucketID int64, name string, objIter itera
} }
} }


func (t *CreateECPackage) Execute(ctx TaskContext, complete CompleteFn) {
func (t *CreateECPackage) Execute(task *task.Task[TaskContext], ctx TaskContext, complete CompleteFn) {
log := logger.WithType[CreateECPackage]("Task") log := logger.WithType[CreateECPackage]("Task")
log.Debugf("begin") log.Debugf("begin")
defer log.Debugf("end") defer log.Debugf("end")


+ 2
- 1
agent/internal/task/create_rep_package.go View File

@@ -5,6 +5,7 @@ import (


"gitlink.org.cn/cloudream/common/models" "gitlink.org.cn/cloudream/common/models"
"gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/pkgs/task"
"gitlink.org.cn/cloudream/storage/common/pkgs/cmd" "gitlink.org.cn/cloudream/storage/common/pkgs/cmd"
"gitlink.org.cn/cloudream/storage/common/pkgs/iterator" "gitlink.org.cn/cloudream/storage/common/pkgs/iterator"
) )
@@ -23,7 +24,7 @@ func NewCreateRepPackage(userID int64, bucketID int64, name string, objIter iter
} }
} }


func (t *CreateRepPackage) Execute(ctx TaskContext, complete CompleteFn) {
func (t *CreateRepPackage) Execute(task *task.Task[TaskContext], ctx TaskContext, complete CompleteFn) {
log := logger.WithType[CreateRepPackage]("Task") log := logger.WithType[CreateRepPackage]("Task")
log.Debugf("begin") log.Debugf("begin")
defer log.Debugf("end") defer log.Debugf("end")


+ 2
- 1
agent/internal/task/download_package.go View File

@@ -3,6 +3,7 @@ package task
import ( import (
"time" "time"


"gitlink.org.cn/cloudream/common/pkgs/task"
"gitlink.org.cn/cloudream/storage/common/pkgs/cmd" "gitlink.org.cn/cloudream/storage/common/pkgs/cmd"
) )


@@ -15,7 +16,7 @@ func NewDownloadPackage(userID int64, packageID int64, outputPath string) *Downl
cmd: cmd.NewDownloadPackage(userID, packageID, outputPath), cmd: cmd.NewDownloadPackage(userID, packageID, outputPath),
} }
} }
func (t *DownloadPackage) Execute(ctx TaskContext, complete CompleteFn) {
func (t *DownloadPackage) Execute(task *task.Task[TaskContext], ctx TaskContext, complete CompleteFn) {
err := t.cmd.Execute(&cmd.DownloadPackageContext{ err := t.cmd.Execute(&cmd.DownloadPackageContext{
Distlock: ctx.distlock, Distlock: ctx.distlock,
}) })


+ 2
- 1
agent/internal/task/ipfs_pin.go View File

@@ -5,6 +5,7 @@ import (
"time" "time"


"gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/pkgs/task"
"gitlink.org.cn/cloudream/storage/common/globals" "gitlink.org.cn/cloudream/storage/common/globals"
) )


@@ -27,7 +28,7 @@ func (t *IPFSPin) Compare(other *Task) bool {
return t.FileHash == tsk.FileHash return t.FileHash == tsk.FileHash
} }


func (t *IPFSPin) Execute(ctx TaskContext, complete CompleteFn) {
func (t *IPFSPin) Execute(task *task.Task[TaskContext], ctx TaskContext, complete CompleteFn) {
log := logger.WithType[IPFSPin]("Task") log := logger.WithType[IPFSPin]("Task")
log.Debugf("begin with %v", logger.FormatStruct(t)) log.Debugf("begin with %v", logger.FormatStruct(t))
defer log.Debugf("end") defer log.Debugf("end")


+ 2
- 1
agent/internal/task/ipfs_read.go View File

@@ -8,6 +8,7 @@ import (
"time" "time"


"gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/pkgs/task"
"gitlink.org.cn/cloudream/storage/common/globals" "gitlink.org.cn/cloudream/storage/common/globals"
) )


@@ -32,7 +33,7 @@ func (t *IPFSRead) Compare(other *Task) bool {
return t.FileHash == tsk.FileHash && t.LocalPath == tsk.LocalPath return t.FileHash == tsk.FileHash && t.LocalPath == tsk.LocalPath
} }


func (t *IPFSRead) Execute(ctx TaskContext, complete CompleteFn) {
func (t *IPFSRead) Execute(task *task.Task[TaskContext], ctx TaskContext, complete CompleteFn) {
log := logger.WithType[IPFSRead]("Task") log := logger.WithType[IPFSRead]("Task")
log.Debugf("begin with %v", logger.FormatStruct(t)) log.Debugf("begin with %v", logger.FormatStruct(t))
defer log.Debugf("end") defer log.Debugf("end")


+ 2
- 1
client/internal/task/create_ec_package.go View File

@@ -4,6 +4,7 @@ import (
"time" "time"


"gitlink.org.cn/cloudream/common/models" "gitlink.org.cn/cloudream/common/models"
"gitlink.org.cn/cloudream/common/pkgs/task"
"gitlink.org.cn/cloudream/storage/common/pkgs/cmd" "gitlink.org.cn/cloudream/storage/common/pkgs/cmd"
"gitlink.org.cn/cloudream/storage/common/pkgs/iterator" "gitlink.org.cn/cloudream/storage/common/pkgs/iterator"
) )
@@ -22,7 +23,7 @@ func NewCreateECPackage(userID int64, bucketID int64, name string, objIter itera
} }
} }


func (t *CreateECPackage) Execute(ctx TaskContext, complete CompleteFn) {
func (t *CreateECPackage) Execute(task *task.Task[TaskContext], ctx TaskContext, complete CompleteFn) {
ret, err := t.cmd.Execute(&cmd.UpdatePackageContext{ ret, err := t.cmd.Execute(&cmd.UpdatePackageContext{
Distlock: ctx.distlock, Distlock: ctx.distlock,
}) })


+ 2
- 1
client/internal/task/create_rep_package.go View File

@@ -4,6 +4,7 @@ import (
"time" "time"


"gitlink.org.cn/cloudream/common/models" "gitlink.org.cn/cloudream/common/models"
"gitlink.org.cn/cloudream/common/pkgs/task"
"gitlink.org.cn/cloudream/storage/common/pkgs/cmd" "gitlink.org.cn/cloudream/storage/common/pkgs/cmd"
"gitlink.org.cn/cloudream/storage/common/pkgs/iterator" "gitlink.org.cn/cloudream/storage/common/pkgs/iterator"
) )
@@ -22,7 +23,7 @@ func NewCreateRepPackage(userID int64, bucketID int64, name string, objIter iter
} }
} }


func (t *CreateRepPackage) Execute(ctx TaskContext, complete CompleteFn) {
func (t *CreateRepPackage) Execute(task *task.Task[TaskContext], ctx TaskContext, complete CompleteFn) {
ret, err := t.cmd.Execute(&cmd.UpdatePackageContext{ ret, err := t.cmd.Execute(&cmd.UpdatePackageContext{
Distlock: ctx.distlock, Distlock: ctx.distlock,
}) })


+ 2
- 1
client/internal/task/storage_load_package.go View File

@@ -4,6 +4,7 @@ import (
"fmt" "fmt"
"time" "time"


"gitlink.org.cn/cloudream/common/pkgs/task"
"gitlink.org.cn/cloudream/storage/common/globals" "gitlink.org.cn/cloudream/storage/common/globals"
"gitlink.org.cn/cloudream/storage/common/pkgs/distlock/reqbuilder" "gitlink.org.cn/cloudream/storage/common/pkgs/distlock/reqbuilder"
agtmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/agent" agtmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/agent"
@@ -24,7 +25,7 @@ func NewStorageLoadPackage(userID int64, packageID int64, storageID int64) *Stor
} }
} }


func (t *StorageLoadPackage) Execute(ctx TaskContext, complete CompleteFn) {
func (t *StorageLoadPackage) Execute(task *task.Task[TaskContext], ctx TaskContext, complete CompleteFn) {
err := t.do(ctx) err := t.do(ctx)
complete(err, CompleteOption{ complete(err, CompleteOption{
RemovingDelay: time.Minute, RemovingDelay: time.Minute,


+ 2
- 1
client/internal/task/update_ec_package.go View File

@@ -3,6 +3,7 @@ package task
import ( import (
"time" "time"


"gitlink.org.cn/cloudream/common/pkgs/task"
"gitlink.org.cn/cloudream/storage/common/pkgs/cmd" "gitlink.org.cn/cloudream/storage/common/pkgs/cmd"
"gitlink.org.cn/cloudream/storage/common/pkgs/iterator" "gitlink.org.cn/cloudream/storage/common/pkgs/iterator"
) )
@@ -21,7 +22,7 @@ func NewUpdateECPackage(userID int64, packageID int64, objectIter iterator.Uploa
} }
} }


func (t *UpdateECPackage) Execute(ctx TaskContext, complete CompleteFn) {
func (t *UpdateECPackage) Execute(task *task.Task[TaskContext], ctx TaskContext, complete CompleteFn) {
ret, err := t.cmd.Execute(&cmd.UpdatePackageContext{ ret, err := t.cmd.Execute(&cmd.UpdatePackageContext{
Distlock: ctx.distlock, Distlock: ctx.distlock,
}) })


+ 2
- 1
client/internal/task/update_rep_package.go View File

@@ -3,6 +3,7 @@ package task
import ( import (
"time" "time"


"gitlink.org.cn/cloudream/common/pkgs/task"
"gitlink.org.cn/cloudream/storage/common/pkgs/cmd" "gitlink.org.cn/cloudream/storage/common/pkgs/cmd"
"gitlink.org.cn/cloudream/storage/common/pkgs/iterator" "gitlink.org.cn/cloudream/storage/common/pkgs/iterator"
) )
@@ -21,7 +22,7 @@ func NewUpdateRepPackage(userID int64, packageID int64, objectIter iterator.Uplo
} }
} }


func (t *UpdateRepPackage) Execute(ctx TaskContext, complete CompleteFn) {
func (t *UpdateRepPackage) Execute(task *task.Task[TaskContext], ctx TaskContext, complete CompleteFn) {
ret, err := t.cmd.Execute(&cmd.UpdatePackageContext{ ret, err := t.cmd.Execute(&cmd.UpdatePackageContext{
Distlock: ctx.distlock, Distlock: ctx.distlock,
}) })


Loading…
Cancel
Save