From 586956942243a42a449e9090d2f0b38c70a10bfe Mon Sep 17 00:00:00 2001 From: songjc <969378911@qq.com> Date: Mon, 4 Sep 2023 09:50:14 +0800 Subject: [PATCH] =?UTF-8?q?task=20execute=E5=A2=9E=E5=8A=A0=E8=BE=93?= =?UTF-8?q?=E5=85=A5=E5=80=BC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- agent/internal/task/cache_move_package.go | 3 ++- agent/internal/task/create_ec_package.go | 3 ++- agent/internal/task/create_rep_package.go | 3 ++- agent/internal/task/download_package.go | 3 ++- agent/internal/task/ipfs_pin.go | 3 ++- agent/internal/task/ipfs_read.go | 3 ++- client/internal/task/create_ec_package.go | 3 ++- client/internal/task/create_rep_package.go | 3 ++- client/internal/task/storage_load_package.go | 3 ++- client/internal/task/update_ec_package.go | 3 ++- client/internal/task/update_rep_package.go | 3 ++- 11 files changed, 22 insertions(+), 11 deletions(-) diff --git a/agent/internal/task/cache_move_package.go b/agent/internal/task/cache_move_package.go index d19896d..6319cfd 100644 --- a/agent/internal/task/cache_move_package.go +++ b/agent/internal/task/cache_move_package.go @@ -5,6 +5,7 @@ import ( "time" "gitlink.org.cn/cloudream/common/pkgs/logger" + "gitlink.org.cn/cloudream/common/pkgs/task" "gitlink.org.cn/cloudream/storage/common/globals" "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" "gitlink.org.cn/cloudream/storage/common/pkgs/distlock/reqbuilder" @@ -23,7 +24,7 @@ func NewCacheMovePackage(userID int64, packageID int64) *CacheMovePackage { } } -func (t *CacheMovePackage) Execute(ctx TaskContext, complete CompleteFn) { +func (t *CacheMovePackage) Execute(task *task.Task[TaskContext], ctx TaskContext, complete CompleteFn) { err := t.do(ctx) complete(err, CompleteOption{ RemovingDelay: time.Minute, diff --git a/agent/internal/task/create_ec_package.go b/agent/internal/task/create_ec_package.go index 6e30611..44b78e0 100644 --- a/agent/internal/task/create_ec_package.go +++ b/agent/internal/task/create_ec_package.go @@ -5,6 +5,7 @@ import ( "gitlink.org.cn/cloudream/common/models" "gitlink.org.cn/cloudream/common/pkgs/logger" + "gitlink.org.cn/cloudream/common/pkgs/task" "gitlink.org.cn/cloudream/storage/common/pkgs/cmd" "gitlink.org.cn/cloudream/storage/common/pkgs/iterator" ) @@ -23,7 +24,7 @@ func NewCreateECPackage(userID int64, bucketID int64, name string, objIter itera } } -func (t *CreateECPackage) Execute(ctx TaskContext, complete CompleteFn) { +func (t *CreateECPackage) Execute(task *task.Task[TaskContext], ctx TaskContext, complete CompleteFn) { log := logger.WithType[CreateECPackage]("Task") log.Debugf("begin") defer log.Debugf("end") diff --git a/agent/internal/task/create_rep_package.go b/agent/internal/task/create_rep_package.go index 0838954..6167e4a 100644 --- a/agent/internal/task/create_rep_package.go +++ b/agent/internal/task/create_rep_package.go @@ -5,6 +5,7 @@ import ( "gitlink.org.cn/cloudream/common/models" "gitlink.org.cn/cloudream/common/pkgs/logger" + "gitlink.org.cn/cloudream/common/pkgs/task" "gitlink.org.cn/cloudream/storage/common/pkgs/cmd" "gitlink.org.cn/cloudream/storage/common/pkgs/iterator" ) @@ -23,7 +24,7 @@ func NewCreateRepPackage(userID int64, bucketID int64, name string, objIter iter } } -func (t *CreateRepPackage) Execute(ctx TaskContext, complete CompleteFn) { +func (t *CreateRepPackage) Execute(task *task.Task[TaskContext], ctx TaskContext, complete CompleteFn) { log := logger.WithType[CreateRepPackage]("Task") log.Debugf("begin") defer log.Debugf("end") diff --git a/agent/internal/task/download_package.go b/agent/internal/task/download_package.go index bdc2625..9e9c275 100644 --- a/agent/internal/task/download_package.go +++ b/agent/internal/task/download_package.go @@ -3,6 +3,7 @@ package task import ( "time" + "gitlink.org.cn/cloudream/common/pkgs/task" "gitlink.org.cn/cloudream/storage/common/pkgs/cmd" ) @@ -15,7 +16,7 @@ func NewDownloadPackage(userID int64, packageID int64, outputPath string) *Downl cmd: cmd.NewDownloadPackage(userID, packageID, outputPath), } } -func (t *DownloadPackage) Execute(ctx TaskContext, complete CompleteFn) { +func (t *DownloadPackage) Execute(task *task.Task[TaskContext], ctx TaskContext, complete CompleteFn) { err := t.cmd.Execute(&cmd.DownloadPackageContext{ Distlock: ctx.distlock, }) diff --git a/agent/internal/task/ipfs_pin.go b/agent/internal/task/ipfs_pin.go index 4deb8ab..29255c8 100644 --- a/agent/internal/task/ipfs_pin.go +++ b/agent/internal/task/ipfs_pin.go @@ -5,6 +5,7 @@ import ( "time" "gitlink.org.cn/cloudream/common/pkgs/logger" + "gitlink.org.cn/cloudream/common/pkgs/task" "gitlink.org.cn/cloudream/storage/common/globals" ) @@ -27,7 +28,7 @@ func (t *IPFSPin) Compare(other *Task) bool { return t.FileHash == tsk.FileHash } -func (t *IPFSPin) Execute(ctx TaskContext, complete CompleteFn) { +func (t *IPFSPin) Execute(task *task.Task[TaskContext], ctx TaskContext, complete CompleteFn) { log := logger.WithType[IPFSPin]("Task") log.Debugf("begin with %v", logger.FormatStruct(t)) defer log.Debugf("end") diff --git a/agent/internal/task/ipfs_read.go b/agent/internal/task/ipfs_read.go index c701360..bbce93a 100644 --- a/agent/internal/task/ipfs_read.go +++ b/agent/internal/task/ipfs_read.go @@ -8,6 +8,7 @@ import ( "time" "gitlink.org.cn/cloudream/common/pkgs/logger" + "gitlink.org.cn/cloudream/common/pkgs/task" "gitlink.org.cn/cloudream/storage/common/globals" ) @@ -32,7 +33,7 @@ func (t *IPFSRead) Compare(other *Task) bool { return t.FileHash == tsk.FileHash && t.LocalPath == tsk.LocalPath } -func (t *IPFSRead) Execute(ctx TaskContext, complete CompleteFn) { +func (t *IPFSRead) Execute(task *task.Task[TaskContext], ctx TaskContext, complete CompleteFn) { log := logger.WithType[IPFSRead]("Task") log.Debugf("begin with %v", logger.FormatStruct(t)) defer log.Debugf("end") diff --git a/client/internal/task/create_ec_package.go b/client/internal/task/create_ec_package.go index 518a3e8..d181e1f 100644 --- a/client/internal/task/create_ec_package.go +++ b/client/internal/task/create_ec_package.go @@ -4,6 +4,7 @@ import ( "time" "gitlink.org.cn/cloudream/common/models" + "gitlink.org.cn/cloudream/common/pkgs/task" "gitlink.org.cn/cloudream/storage/common/pkgs/cmd" "gitlink.org.cn/cloudream/storage/common/pkgs/iterator" ) @@ -22,7 +23,7 @@ func NewCreateECPackage(userID int64, bucketID int64, name string, objIter itera } } -func (t *CreateECPackage) Execute(ctx TaskContext, complete CompleteFn) { +func (t *CreateECPackage) Execute(task *task.Task[TaskContext], ctx TaskContext, complete CompleteFn) { ret, err := t.cmd.Execute(&cmd.UpdatePackageContext{ Distlock: ctx.distlock, }) diff --git a/client/internal/task/create_rep_package.go b/client/internal/task/create_rep_package.go index e95c085..b102a698 100644 --- a/client/internal/task/create_rep_package.go +++ b/client/internal/task/create_rep_package.go @@ -4,6 +4,7 @@ import ( "time" "gitlink.org.cn/cloudream/common/models" + "gitlink.org.cn/cloudream/common/pkgs/task" "gitlink.org.cn/cloudream/storage/common/pkgs/cmd" "gitlink.org.cn/cloudream/storage/common/pkgs/iterator" ) @@ -22,7 +23,7 @@ func NewCreateRepPackage(userID int64, bucketID int64, name string, objIter iter } } -func (t *CreateRepPackage) Execute(ctx TaskContext, complete CompleteFn) { +func (t *CreateRepPackage) Execute(task *task.Task[TaskContext], ctx TaskContext, complete CompleteFn) { ret, err := t.cmd.Execute(&cmd.UpdatePackageContext{ Distlock: ctx.distlock, }) diff --git a/client/internal/task/storage_load_package.go b/client/internal/task/storage_load_package.go index 20a0d21..73c88bb 100644 --- a/client/internal/task/storage_load_package.go +++ b/client/internal/task/storage_load_package.go @@ -4,6 +4,7 @@ import ( "fmt" "time" + "gitlink.org.cn/cloudream/common/pkgs/task" "gitlink.org.cn/cloudream/storage/common/globals" "gitlink.org.cn/cloudream/storage/common/pkgs/distlock/reqbuilder" agtmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/agent" @@ -24,7 +25,7 @@ func NewStorageLoadPackage(userID int64, packageID int64, storageID int64) *Stor } } -func (t *StorageLoadPackage) Execute(ctx TaskContext, complete CompleteFn) { +func (t *StorageLoadPackage) Execute(task *task.Task[TaskContext], ctx TaskContext, complete CompleteFn) { err := t.do(ctx) complete(err, CompleteOption{ RemovingDelay: time.Minute, diff --git a/client/internal/task/update_ec_package.go b/client/internal/task/update_ec_package.go index f035d5a..02faaf3 100644 --- a/client/internal/task/update_ec_package.go +++ b/client/internal/task/update_ec_package.go @@ -3,6 +3,7 @@ package task import ( "time" + "gitlink.org.cn/cloudream/common/pkgs/task" "gitlink.org.cn/cloudream/storage/common/pkgs/cmd" "gitlink.org.cn/cloudream/storage/common/pkgs/iterator" ) @@ -21,7 +22,7 @@ func NewUpdateECPackage(userID int64, packageID int64, objectIter iterator.Uploa } } -func (t *UpdateECPackage) Execute(ctx TaskContext, complete CompleteFn) { +func (t *UpdateECPackage) Execute(task *task.Task[TaskContext], ctx TaskContext, complete CompleteFn) { ret, err := t.cmd.Execute(&cmd.UpdatePackageContext{ Distlock: ctx.distlock, }) diff --git a/client/internal/task/update_rep_package.go b/client/internal/task/update_rep_package.go index 23d967f..2a83c51 100644 --- a/client/internal/task/update_rep_package.go +++ b/client/internal/task/update_rep_package.go @@ -3,6 +3,7 @@ package task import ( "time" + "gitlink.org.cn/cloudream/common/pkgs/task" "gitlink.org.cn/cloudream/storage/common/pkgs/cmd" "gitlink.org.cn/cloudream/storage/common/pkgs/iterator" ) @@ -21,7 +22,7 @@ func NewUpdateRepPackage(userID int64, packageID int64, objectIter iterator.Uplo } } -func (t *UpdateRepPackage) Execute(ctx TaskContext, complete CompleteFn) { +func (t *UpdateRepPackage) Execute(task *task.Task[TaskContext], ctx TaskContext, complete CompleteFn) { ret, err := t.cmd.Execute(&cmd.UpdatePackageContext{ Distlock: ctx.distlock, })