diff --git a/agent/internal/cmd/serve.go b/agent/internal/cmd/serve.go index b716252..ca6b9cf 100644 --- a/agent/internal/cmd/serve.go +++ b/agent/internal/cmd/serve.go @@ -22,7 +22,7 @@ import ( "gitlink.org.cn/cloudream/storage/common/pkgs/downloader/strategy" agtrpc "gitlink.org.cn/cloudream/storage/common/pkgs/grpc/agent" "gitlink.org.cn/cloudream/storage/common/pkgs/metacache" - "gitlink.org.cn/cloudream/storage/common/pkgs/storage/svcmgr" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/agtpool" "gitlink.org.cn/cloudream/storage/common/pkgs/uploader" "google.golang.org/grpc" @@ -55,7 +55,7 @@ func serve(configPath string) { hubCfg := downloadHubConfig() // 初始化存储服务管理器 - stgAgts := svcmgr.NewPool() + stgAgts := agtpool.NewPool() for _, stg := range hubCfg.Storages { err := stgAgts.SetupAgent(stg) if err != nil { diff --git a/agent/internal/grpc/io.go b/agent/internal/grpc/io.go index bef65f7..15c49a8 100644 --- a/agent/internal/grpc/io.go +++ b/agent/internal/grpc/io.go @@ -29,7 +29,7 @@ func (s *Service) ExecuteIOPlan(ctx context.Context, req *agtrpc.ExecuteIOPlanRe defer s.swWorker.Remove(sw) execCtx := exec.NewWithContext(ctx) - exec.SetValueByType(execCtx, s.stgMgr) + exec.SetValueByType(execCtx, s.stgAgts) _, err = sw.Run(execCtx) if err != nil { log.Warnf("running io plan: %v", err) diff --git a/agent/internal/grpc/service.go b/agent/internal/grpc/service.go index 9f4a5f9..3dbbf85 100644 --- a/agent/internal/grpc/service.go +++ b/agent/internal/grpc/service.go @@ -3,18 +3,18 @@ package grpc import ( "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" agentserver "gitlink.org.cn/cloudream/storage/common/pkgs/grpc/agent" - "gitlink.org.cn/cloudream/storage/common/pkgs/storage/svcmgr" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/agtpool" ) type Service struct { agentserver.AgentServer swWorker *exec.Worker - stgMgr *svcmgr.AgentPool + stgAgts *agtpool.AgentPool } -func NewService(swWorker *exec.Worker, stgMgr *svcmgr.AgentPool) *Service { +func NewService(swWorker *exec.Worker, stgAgts *agtpool.AgentPool) *Service { return &Service{ swWorker: swWorker, - stgMgr: stgMgr, + stgAgts: stgAgts, } } diff --git a/agent/internal/http/hub_io.go b/agent/internal/http/hub_io.go index 36e4e5d..c03deae 100644 --- a/agent/internal/http/hub_io.go +++ b/agent/internal/http/hub_io.go @@ -162,7 +162,7 @@ func (s *IOService) ExecuteIOPlan(ctx *gin.Context) { defer s.svc.swWorker.Remove(sw) execCtx := exec.NewWithContext(ctx.Request.Context()) - exec.SetValueByType(execCtx, s.svc.stgMgr) + exec.SetValueByType(execCtx, s.svc.stgAgts) _, err = sw.Run(execCtx) if err != nil { ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, fmt.Sprintf("executing plan: %v", err))) diff --git a/agent/internal/http/service.go b/agent/internal/http/service.go index 9a9e596..2c38d36 100644 --- a/agent/internal/http/service.go +++ b/agent/internal/http/service.go @@ -2,17 +2,17 @@ package http import ( "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" - "gitlink.org.cn/cloudream/storage/common/pkgs/storage/svcmgr" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/agtpool" ) type Service struct { swWorker *exec.Worker - stgMgr *svcmgr.AgentPool + stgAgts *agtpool.AgentPool } -func NewService(swWorker *exec.Worker, stgMgr *svcmgr.AgentPool) *Service { +func NewService(swWorker *exec.Worker, stgAgts *agtpool.AgentPool) *Service { return &Service{ swWorker: swWorker, - stgMgr: stgMgr, + stgAgts: stgAgts, } } diff --git a/agent/internal/mq/cache.go b/agent/internal/mq/cache.go index 19fe06d..735e1e3 100644 --- a/agent/internal/mq/cache.go +++ b/agent/internal/mq/cache.go @@ -12,7 +12,7 @@ import ( ) func (svc *Service) CheckCache(msg *agtmq.CheckCache) (*agtmq.CheckCacheResp, *mq.CodeMessage) { - store, err := svc.stgMgr.GetShardStore(msg.StorageID) + store, err := svc.stgAgts.GetShardStore(msg.StorageID) if err != nil { return nil, mq.Failed(errorcode.OperationFailed, fmt.Sprintf("get shard store of storage %v: %v", msg.StorageID, err)) } @@ -31,7 +31,7 @@ func (svc *Service) CheckCache(msg *agtmq.CheckCache) (*agtmq.CheckCacheResp, *m } func (svc *Service) CacheGC(msg *agtmq.CacheGC) (*agtmq.CacheGCResp, *mq.CodeMessage) { - store, err := svc.stgMgr.GetShardStore(msg.StorageID) + store, err := svc.stgAgts.GetShardStore(msg.StorageID) if err != nil { return nil, mq.Failed(errorcode.OperationFailed, fmt.Sprintf("get shard store of storage %v: %v", msg.StorageID, err)) } diff --git a/agent/internal/mq/service.go b/agent/internal/mq/service.go index 208a93a..b4688ed 100644 --- a/agent/internal/mq/service.go +++ b/agent/internal/mq/service.go @@ -2,17 +2,17 @@ package mq import ( "gitlink.org.cn/cloudream/storage/agent/internal/task" - "gitlink.org.cn/cloudream/storage/common/pkgs/storage/svcmgr" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/agtpool" ) type Service struct { taskManager *task.Manager - stgMgr *svcmgr.AgentPool + stgAgts *agtpool.AgentPool } -func NewService(taskMgr *task.Manager, stgMgr *svcmgr.AgentPool) *Service { +func NewService(taskMgr *task.Manager, stgAgts *agtpool.AgentPool) *Service { return &Service{ taskManager: taskMgr, - stgMgr: stgMgr, + stgAgts: stgAgts, } } diff --git a/agent/internal/task/cache_move_package.go b/agent/internal/task/cache_move_package.go index a6f51f8..404d7ec 100644 --- a/agent/internal/task/cache_move_package.go +++ b/agent/internal/task/cache_move_package.go @@ -39,7 +39,7 @@ func (t *CacheMovePackage) do(ctx TaskContext) error { log.Debugf("begin with %v", logger.FormatStruct(t)) defer log.Debugf("end") - store, err := ctx.stgMgr.GetShardStore(t.storageID) + store, err := ctx.stgAgts.GetShardStore(t.storageID) if err != nil { return fmt.Errorf("get shard store of storage %v: %w", t.storageID, err) } diff --git a/agent/internal/task/task.go b/agent/internal/task/task.go index e04c161..4080005 100644 --- a/agent/internal/task/task.go +++ b/agent/internal/task/task.go @@ -6,7 +6,7 @@ import ( "gitlink.org.cn/cloudream/storage/common/pkgs/accessstat" "gitlink.org.cn/cloudream/storage/common/pkgs/connectivity" "gitlink.org.cn/cloudream/storage/common/pkgs/downloader" - "gitlink.org.cn/cloudream/storage/common/pkgs/storage/svcmgr" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/agtpool" "gitlink.org.cn/cloudream/storage/common/pkgs/uploader" ) @@ -16,7 +16,7 @@ type TaskContext struct { connectivity *connectivity.Collector downloader *downloader.Downloader accessStat *accessstat.AccessStat - stgMgr *svcmgr.AgentPool + stgAgts *agtpool.AgentPool uploader *uploader.Uploader } @@ -35,13 +35,13 @@ type Task = task.Task[TaskContext] // CompleteOption 类型定义了任务完成时的选项,可用于定制化任务完成的处理方式 type CompleteOption = task.CompleteOption -func NewManager(distlock *distlock.Service, connectivity *connectivity.Collector, downloader *downloader.Downloader, accessStat *accessstat.AccessStat, stgMgr *svcmgr.AgentPool, uploader *uploader.Uploader) Manager { +func NewManager(distlock *distlock.Service, connectivity *connectivity.Collector, downloader *downloader.Downloader, accessStat *accessstat.AccessStat, stgAgts *agtpool.AgentPool, uploader *uploader.Uploader) Manager { return task.NewManager(TaskContext{ distlock: distlock, connectivity: connectivity, downloader: downloader, accessStat: accessStat, - stgMgr: stgMgr, + stgAgts: stgAgts, uploader: uploader, }) } diff --git a/client/internal/task/task.go b/client/internal/task/task.go index 4a9da80..348621b 100644 --- a/client/internal/task/task.go +++ b/client/internal/task/task.go @@ -4,14 +4,14 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/distlock" // 引入分布式锁服务 "gitlink.org.cn/cloudream/common/pkgs/task" // 引入任务处理相关的包 "gitlink.org.cn/cloudream/storage/common/pkgs/connectivity" // 引入网络连接状态收集器 - "gitlink.org.cn/cloudream/storage/common/pkgs/storage/svcmgr" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/agtpool" ) // TaskContext 定义了任务执行的上下文环境,包含分布式锁服务和网络连接状态收集器 type TaskContext struct { distlock *distlock.Service connectivity *connectivity.Collector - stgMgr *svcmgr.AgentPool + stgAgts *agtpool.AgentPool } // CompleteFn 类型定义了任务完成时的回调函数,用于设置任务的执行结果 @@ -31,10 +31,10 @@ type CompleteOption = task.CompleteOption // NewManager 创建一个新的任务管理器实例,接受一个分布式锁服务和一个网络连接状态收集器作为参数 // 返回一个初始化好的任务管理器实例 -func NewManager(distlock *distlock.Service, connectivity *connectivity.Collector, stgMgr *svcmgr.AgentPool) Manager { +func NewManager(distlock *distlock.Service, connectivity *connectivity.Collector, stgAgts *agtpool.AgentPool) Manager { return task.NewManager(TaskContext{ distlock: distlock, connectivity: connectivity, - stgMgr: stgMgr, + stgAgts: stgAgts, }) } diff --git a/client/main.go b/client/main.go index da0e93d..abd1a45 100644 --- a/client/main.go +++ b/client/main.go @@ -21,7 +21,7 @@ import ( "gitlink.org.cn/cloudream/storage/common/pkgs/downloader/strategy" "gitlink.org.cn/cloudream/storage/common/pkgs/metacache" coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" - "gitlink.org.cn/cloudream/storage/common/pkgs/storage/svcmgr" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/agtpool" "gitlink.org.cn/cloudream/storage/common/pkgs/uploader" ) @@ -99,18 +99,18 @@ func main() { go serveAccessStat(acStat) // 存储管理器 - stgMgr := svcmgr.NewPool() + stgAgts := agtpool.NewPool() // 任务管理器 - taskMgr := task.NewManager(distlockSvc, &conCol, stgMgr) + taskMgr := task.NewManager(distlockSvc, &conCol, stgAgts) strgSel := strategy.NewSelector(config.Cfg().DownloadStrategy, stgMeta, hubMeta, conMeta) // 下载器 - dlder := downloader.NewDownloader(config.Cfg().Downloader, &conCol, stgMgr, strgSel) + dlder := downloader.NewDownloader(config.Cfg().Downloader, &conCol, stgAgts, strgSel) // 上传器 - uploader := uploader.NewUploader(distlockSvc, &conCol, stgMgr, stgMeta) + uploader := uploader.NewUploader(distlockSvc, &conCol, stgAgts, stgMeta) svc, err := services.NewService(distlockSvc, &taskMgr, &dlder, acStat, uploader, strgSel, stgMeta) if err != nil { diff --git a/common/pkgs/downloader/downloader.go b/common/pkgs/downloader/downloader.go index df98d64..176234f 100644 --- a/common/pkgs/downloader/downloader.go +++ b/common/pkgs/downloader/downloader.go @@ -12,7 +12,7 @@ import ( "gitlink.org.cn/cloudream/storage/common/pkgs/connectivity" "gitlink.org.cn/cloudream/storage/common/pkgs/downloader/strategy" coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" - "gitlink.org.cn/cloudream/storage/common/pkgs/storage/svcmgr" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/agtpool" ) const ( @@ -42,11 +42,11 @@ type Downloader struct { strips *StripCache cfg Config conn *connectivity.Collector - stgMgr *svcmgr.AgentPool + stgAgts *agtpool.AgentPool selector *strategy.Selector } -func NewDownloader(cfg Config, conn *connectivity.Collector, stgMgr *svcmgr.AgentPool, sel *strategy.Selector) Downloader { +func NewDownloader(cfg Config, conn *connectivity.Collector, stgAgts *agtpool.AgentPool, sel *strategy.Selector) Downloader { if cfg.MaxStripCacheCount == 0 { cfg.MaxStripCacheCount = DefaultMaxStripCacheCount } @@ -56,7 +56,7 @@ func NewDownloader(cfg Config, conn *connectivity.Collector, stgMgr *svcmgr.Agen strips: ch, cfg: cfg, conn: conn, - stgMgr: stgMgr, + stgAgts: stgAgts, selector: sel, } } diff --git a/common/pkgs/downloader/iterator.go b/common/pkgs/downloader/iterator.go index afc7eed..37c061f 100644 --- a/common/pkgs/downloader/iterator.go +++ b/common/pkgs/downloader/iterator.go @@ -136,7 +136,7 @@ func (i *DownloadObjectIterator) downloadDirect(req downloadReqeust2, strg strat } exeCtx := exec.NewExecContext() - exec.SetValueByType(exeCtx, i.downloader.stgMgr) + exec.SetValueByType(exeCtx, i.downloader.stgAgts) exec := plans.Execute(exeCtx) go exec.Wait(context.TODO()) diff --git a/common/pkgs/downloader/lrc_strip_iterator.go b/common/pkgs/downloader/lrc_strip_iterator.go index 3e4455f..4270ec5 100644 --- a/common/pkgs/downloader/lrc_strip_iterator.go +++ b/common/pkgs/downloader/lrc_strip_iterator.go @@ -114,7 +114,7 @@ func (s *LRCStripIterator) downloading() { } exeCtx := exec.NewExecContext() - exec.SetValueByType(exeCtx, s.downloder.stgMgr) + exec.SetValueByType(exeCtx, s.downloder.stgAgts) exec := plans.Execute(exeCtx) diff --git a/common/pkgs/downloader/strip_iterator.go b/common/pkgs/downloader/strip_iterator.go index e03e23f..1d6a117 100644 --- a/common/pkgs/downloader/strip_iterator.go +++ b/common/pkgs/downloader/strip_iterator.go @@ -218,7 +218,7 @@ func (s *StripIterator) readStrip(stripIndex int64, buf []byte) (int, error) { } exeCtx := exec.NewExecContext() - exec.SetValueByType(exeCtx, s.downloader.stgMgr) + exec.SetValueByType(exeCtx, s.downloader.stgAgts) exec := plans.Execute(exeCtx) ctx, cancel := context.WithCancel(context.Background()) diff --git a/common/pkgs/ioswitch2/ops2/bypass.go b/common/pkgs/ioswitch2/ops2/bypass.go index 4c543a2..09613e8 100644 --- a/common/pkgs/ioswitch2/ops2/bypass.go +++ b/common/pkgs/ioswitch2/ops2/bypass.go @@ -6,7 +6,7 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - "gitlink.org.cn/cloudream/storage/common/pkgs/storage/svcmgr" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/agtpool" "gitlink.org.cn/cloudream/storage/common/pkgs/storage/types" ) @@ -44,12 +44,12 @@ type BypassToShardStore struct { } func (o *BypassToShardStore) Execute(ctx *exec.ExecContext, e *exec.Executor) error { - svcMgr, err := exec.GetValueByType[*svcmgr.AgentPool](ctx) + stgAgts, err := exec.GetValueByType[*agtpool.AgentPool](ctx) if err != nil { return err } - shardStore, err := svcMgr.GetShardStore(o.StorageID) + shardStore, err := stgAgts.GetShardStore(o.StorageID) if err != nil { return err } diff --git a/common/pkgs/ioswitch2/ops2/faas.go b/common/pkgs/ioswitch2/ops2/faas.go index aaa5bcb..a6c7c63 100644 --- a/common/pkgs/ioswitch2/ops2/faas.go +++ b/common/pkgs/ioswitch2/ops2/faas.go @@ -7,7 +7,7 @@ import ( "github.com/samber/lo" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - "gitlink.org.cn/cloudream/storage/common/pkgs/storage/svcmgr" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/agtpool" "gitlink.org.cn/cloudream/storage/common/pkgs/storage/types" ) @@ -20,17 +20,17 @@ type InternalFaaSGalMultiply struct { } func (o *InternalFaaSGalMultiply) Execute(ctx *exec.ExecContext, e *exec.Executor) error { - stgMgr, err := exec.GetValueByType[*svcmgr.AgentPool](ctx) + stgAgts, err := exec.GetValueByType[*agtpool.AgentPool](ctx) if err != nil { return err } - fass, err := svcmgr.GetComponent[types.InternalFaaSCall](stgMgr, o.StorageID) + fass, err := agtpool.GetComponent[types.InternalFaaSCall](stgAgts, o.StorageID) if err != nil { return fmt.Errorf("getting faas component: %w", err) } - tmp, err := svcmgr.GetComponent[types.TempStore](stgMgr, o.StorageID) + tmp, err := agtpool.GetComponent[types.TempStore](stgAgts, o.StorageID) if err != nil { return fmt.Errorf("getting temp store component: %w", err) } diff --git a/common/pkgs/ioswitch2/ops2/shard_store.go b/common/pkgs/ioswitch2/ops2/shard_store.go index 495b247..76dfd86 100644 --- a/common/pkgs/ioswitch2/ops2/shard_store.go +++ b/common/pkgs/ioswitch2/ops2/shard_store.go @@ -12,7 +12,7 @@ import ( "gitlink.org.cn/cloudream/common/utils/io2" stgmod "gitlink.org.cn/cloudream/storage/common/models" "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2" - "gitlink.org.cn/cloudream/storage/common/pkgs/storage/svcmgr" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/agtpool" "gitlink.org.cn/cloudream/storage/common/pkgs/storage/types" ) @@ -42,12 +42,12 @@ func (o *ShardRead) Execute(ctx *exec.ExecContext, e *exec.Executor) error { Debugf("reading from shard store") defer logger.Debugf("reading from shard store finished") - stgMgr, err := exec.GetValueByType[*svcmgr.AgentPool](ctx) + stgAgts, err := exec.GetValueByType[*agtpool.AgentPool](ctx) if err != nil { return fmt.Errorf("getting storage manager: %w", err) } - store, err := stgMgr.GetShardStore(o.StorageID) + store, err := stgAgts.GetShardStore(o.StorageID) if err != nil { return fmt.Errorf("getting shard store of storage %v: %w", o.StorageID, err) } @@ -84,12 +84,12 @@ func (o *ShardWrite) Execute(ctx *exec.ExecContext, e *exec.Executor) error { Debugf("writting file to shard store") defer logger.Debugf("write to shard store finished") - stgMgr, err := exec.GetValueByType[*svcmgr.AgentPool](ctx) + stgAgts, err := exec.GetValueByType[*agtpool.AgentPool](ctx) if err != nil { return fmt.Errorf("getting storage manager: %w", err) } - store, err := stgMgr.GetShardStore(o.StorageID) + store, err := stgAgts.GetShardStore(o.StorageID) if err != nil { return fmt.Errorf("getting shard store of storage %v: %w", o.StorageID, err) } diff --git a/common/pkgs/ioswitch2/ops2/shared_store.go b/common/pkgs/ioswitch2/ops2/shared_store.go index a549179..d370c5e 100644 --- a/common/pkgs/ioswitch2/ops2/shared_store.go +++ b/common/pkgs/ioswitch2/ops2/shared_store.go @@ -8,7 +8,7 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/logger" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2" - "gitlink.org.cn/cloudream/storage/common/pkgs/storage/svcmgr" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/agtpool" ) func init() { @@ -27,12 +27,12 @@ func (o *SharedLoad) Execute(ctx *exec.ExecContext, e *exec.Executor) error { Debugf("load file to shared store") defer logger.Debugf("load file to shared store finished") - stgMgr, err := exec.GetValueByType[*svcmgr.AgentPool](ctx) + stgAgts, err := exec.GetValueByType[*agtpool.AgentPool](ctx) if err != nil { return fmt.Errorf("getting storage manager: %w", err) } - store, err := stgMgr.GetSharedStore(o.StorageID) + store, err := stgAgts.GetSharedStore(o.StorageID) if err != nil { return fmt.Errorf("getting shard store of storage %v: %w", o.StorageID, err) } diff --git a/common/pkgs/ioswitchlrc/ops2/shard_store.go b/common/pkgs/ioswitchlrc/ops2/shard_store.go index f301313..34c0e33 100644 --- a/common/pkgs/ioswitchlrc/ops2/shard_store.go +++ b/common/pkgs/ioswitchlrc/ops2/shard_store.go @@ -11,7 +11,7 @@ import ( cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/common/utils/io2" "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitchlrc" - "gitlink.org.cn/cloudream/storage/common/pkgs/storage/svcmgr" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/agtpool" "gitlink.org.cn/cloudream/storage/common/pkgs/storage/types" ) @@ -41,12 +41,12 @@ func (o *ShardRead) Execute(ctx *exec.ExecContext, e *exec.Executor) error { Debugf("reading from shard store") defer logger.Debugf("reading from shard store finished") - stgMgr, err := exec.GetValueByType[*svcmgr.AgentPool](ctx) + stgAgts, err := exec.GetValueByType[*agtpool.AgentPool](ctx) if err != nil { return fmt.Errorf("getting storage manager: %w", err) } - store, err := stgMgr.GetShardStore(o.StorageID) + store, err := stgAgts.GetShardStore(o.StorageID) if err != nil { return fmt.Errorf("getting shard store of storage %v: %w", o.StorageID, err) } @@ -83,12 +83,12 @@ func (o *ShardWrite) Execute(ctx *exec.ExecContext, e *exec.Executor) error { Debugf("writting file to shard store") defer logger.Debugf("write to shard store finished") - stgMgr, err := exec.GetValueByType[*svcmgr.AgentPool](ctx) + stgAgts, err := exec.GetValueByType[*agtpool.AgentPool](ctx) if err != nil { return fmt.Errorf("getting storage manager: %w", err) } - store, err := stgMgr.GetShardStore(o.StorageID) + store, err := stgAgts.GetShardStore(o.StorageID) if err != nil { return fmt.Errorf("getting shard store of storage %v: %w", o.StorageID, err) } diff --git a/common/pkgs/storage/svcmgr/mgr.go b/common/pkgs/storage/agtpool/pool.go similarity index 99% rename from common/pkgs/storage/svcmgr/mgr.go rename to common/pkgs/storage/agtpool/pool.go index 41639a1..29dfd20 100644 --- a/common/pkgs/storage/svcmgr/mgr.go +++ b/common/pkgs/storage/agtpool/pool.go @@ -1,4 +1,4 @@ -package svcmgr +package agtpool import ( "fmt" diff --git a/common/pkgs/storage/cos/multiPartUploader.go b/common/pkgs/storage/cos/multiPartUploader.go deleted file mode 100644 index 4d161dc..0000000 --- a/common/pkgs/storage/cos/multiPartUploader.go +++ /dev/null @@ -1,80 +0,0 @@ -package cos - -import ( - "context" - "fmt" - "io" - "net/http" - "net/url" - - "github.com/tencentyun/cos-go-sdk-v5" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - "gitlink.org.cn/cloudream/storage/common/pkgs/storage/types" -) - -type MultiPartUploader struct { - client *cos.Client -} - -func NewMultiPartUpload(address *cdssdk.COSType) *MultiPartUploader { - // cos的endpoint已包含bucket名,会自动将桶解析出来 - u, _ := url.Parse(address.Endpoint) - b := &cos.BaseURL{BucketURL: u} - client := cos.NewClient(b, &http.Client{ - Transport: &cos.AuthorizationTransport{ - SecretID: address.AK, - SecretKey: address.SK, - }, - }) - - return &MultiPartUploader{ - client: client, - } -} - -func (c *MultiPartUploader) Initiate(objectName string) (string, error) { - v, _, err := c.client.Object.InitiateMultipartUpload(context.Background(), objectName, nil) - if err != nil { - return "", fmt.Errorf("failed to initiate multipart upload: %w", err) - } - return v.UploadID, nil -} - -func (c *MultiPartUploader) UploadPart(uploadID string, key string, partSize int64, partNumber int, stream io.Reader) (*types.UploadedPartInfo, error) { - resp, err := c.client.Object.UploadPart( - context.Background(), key, uploadID, partNumber, stream, nil, - ) - if err != nil { - return nil, fmt.Errorf("failed to upload part: %w", err) - } - - result := &types.UploadedPartInfo{ - ETag: resp.Header.Get("ETag"), - PartNumber: partNumber, - } - return result, nil -} - -func (c *MultiPartUploader) Complete(uploadID string, key string, parts []*types.UploadedPartInfo) error { - opt := &cos.CompleteMultipartUploadOptions{} - for i := 0; i < len(parts); i++ { - opt.Parts = append(opt.Parts, cos.Object{ - PartNumber: parts[i].PartNumber, ETag: parts[i].ETag}, - ) - } - _, _, err := c.client.Object.CompleteMultipartUpload( - context.Background(), key, uploadID, opt, - ) - if err != nil { - return err - } - - return nil -} -func (c *MultiPartUploader) Abort() { - -} - -func (c *MultiPartUploader) Close() { - -} diff --git a/common/pkgs/storage/local/service.go b/common/pkgs/storage/local/agent.go similarity index 71% rename from common/pkgs/storage/local/service.go rename to common/pkgs/storage/local/agent.go index b757c49..c2e5e52 100644 --- a/common/pkgs/storage/local/service.go +++ b/common/pkgs/storage/local/agent.go @@ -5,13 +5,13 @@ import ( "gitlink.org.cn/cloudream/storage/common/pkgs/storage/types" ) -type Agent struct { +type agent struct { Detail stgmod.StorageDetail ShardStore *ShardStore SharedStore *SharedStore } -func (s *Agent) Start(ch *types.StorageEventChan) { +func (s *agent) Start(ch *types.StorageEventChan) { if s.ShardStore != nil { s.ShardStore.Start(ch) } @@ -21,7 +21,7 @@ func (s *Agent) Start(ch *types.StorageEventChan) { } } -func (s *Agent) Stop() { +func (s *agent) Stop() { if s.ShardStore != nil { s.ShardStore.Stop() } @@ -31,11 +31,11 @@ func (s *Agent) Stop() { } } -func (s *Agent) Info() stgmod.StorageDetail { +func (s *agent) Info() stgmod.StorageDetail { return s.Detail } -func (a *Agent) GetShardStore() (types.ShardStore, error) { +func (a *agent) GetShardStore() (types.ShardStore, error) { if a.ShardStore == nil { return nil, types.ErrUnsupported } @@ -43,7 +43,7 @@ func (a *Agent) GetShardStore() (types.ShardStore, error) { return a.ShardStore, nil } -func (a *Agent) GetSharedStore() (types.SharedStore, error) { +func (a *agent) GetSharedStore() (types.SharedStore, error) { if a.SharedStore == nil { return nil, types.ErrUnsupported } diff --git a/common/pkgs/storage/local/local.go b/common/pkgs/storage/local/local.go index f31ec33..0c0b3fd 100644 --- a/common/pkgs/storage/local/local.go +++ b/common/pkgs/storage/local/local.go @@ -18,7 +18,7 @@ func init() { type builder struct{} func (b *builder) CreateAgent(detail stgmod.StorageDetail) (types.StorageAgent, error) { - agt := &Agent{ + agt := &agent{ Detail: detail, } diff --git a/common/pkgs/storage/local/shard_store.go b/common/pkgs/storage/local/shard_store.go index 062e5d6..9176504 100644 --- a/common/pkgs/storage/local/shard_store.go +++ b/common/pkgs/storage/local/shard_store.go @@ -23,7 +23,7 @@ const ( ) type ShardStore struct { - agt *Agent + agt *agent cfg cdssdk.LocalShardStorage absRoot string lock sync.Mutex @@ -31,7 +31,7 @@ type ShardStore struct { done chan any } -func NewShardStore(svc *Agent, cfg cdssdk.LocalShardStorage) (*ShardStore, error) { +func NewShardStore(svc *agent, cfg cdssdk.LocalShardStorage) (*ShardStore, error) { absRoot, err := filepath.Abs(cfg.Root) if err != nil { return nil, fmt.Errorf("get abs root: %w", err) diff --git a/common/pkgs/storage/local/shared_store.go b/common/pkgs/storage/local/shared_store.go index 137f975..97d6756 100644 --- a/common/pkgs/storage/local/shared_store.go +++ b/common/pkgs/storage/local/shared_store.go @@ -11,11 +11,11 @@ import ( ) type SharedStore struct { - agt *Agent + agt *agent cfg cdssdk.LocalSharedStorage } -func NewSharedStore(agt *Agent, cfg cdssdk.LocalSharedStorage) (*SharedStore, error) { +func NewSharedStore(agt *agent, cfg cdssdk.LocalSharedStorage) (*SharedStore, error) { return &SharedStore{ agt: agt, cfg: cfg, diff --git a/common/pkgs/storage/obs/faas.go b/common/pkgs/storage/obs/faas.go deleted file mode 100644 index 6e7889f..0000000 --- a/common/pkgs/storage/obs/faas.go +++ /dev/null @@ -1 +0,0 @@ -package obs diff --git a/common/pkgs/storage/obs/multiPartUploader.go b/common/pkgs/storage/obs/multiPartUploader.go deleted file mode 100644 index 9ce8a2e..0000000 --- a/common/pkgs/storage/obs/multiPartUploader.go +++ /dev/null @@ -1,90 +0,0 @@ -package obs - -import ( - "fmt" - "io" - - "github.com/huaweicloud/huaweicloud-sdk-go-obs/obs" - log "gitlink.org.cn/cloudream/common/pkgs/logger" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - "gitlink.org.cn/cloudream/storage/common/pkgs/storage/types" -) - -type MultiPartUploader struct { - client *obs.ObsClient - bucket string -} - -func NewMultiPartUpload(address *cdssdk.OBSType) *MultiPartUploader { - client, err := obs.New(address.AK, address.SK, address.Endpoint) - if err != nil { - log.Fatalf("Error: %v", err) - } - - return &MultiPartUploader{ - client: client, - bucket: address.Bucket, - } -} - -func (c *MultiPartUploader) Initiate(objectName string) (string, error) { - input := &obs.InitiateMultipartUploadInput{} - input.Bucket = c.bucket - input.Key = objectName - imur, err := c.client.InitiateMultipartUpload(input) - if err != nil { - return "", fmt.Errorf("failed to initiate multipart upload: %w", err) - } - return imur.UploadId, nil -} - -func (c *MultiPartUploader) UploadPart(uploadID string, key string, partSize int64, partNumber int, stream io.Reader) (*types.UploadedPartInfo, error) { - uploadParam := &obs.UploadPartInput{ - Bucket: c.bucket, - Key: key, - UploadId: uploadID, - PartSize: partSize, - PartNumber: partNumber, - Body: stream, - } - - part, err := c.client.UploadPart(uploadParam) - if err != nil { - return nil, fmt.Errorf("failed to upload part: %w", err) - } - result := &types.UploadedPartInfo{ - ETag: part.ETag, - PartNumber: partNumber, - } - return result, nil -} - -func (c *MultiPartUploader) Complete(uploadID string, Key string, parts []*types.UploadedPartInfo) error { - var uploadPart []obs.Part - for i := 0; i < len(parts); i++ { - uploadPart = append(uploadPart, obs.Part{ - PartNumber: parts[i].PartNumber, - ETag: parts[i].ETag, - }) - } - - notifyParam := &obs.CompleteMultipartUploadInput{ - Bucket: c.bucket, - Key: Key, - UploadId: uploadID, - Parts: uploadPart, - } - - _, err := c.client.CompleteMultipartUpload(notifyParam) - if err != nil { - return err - } - return nil -} -func (c *MultiPartUploader) Abort() { - -} - -func (c *MultiPartUploader) Close() { - -} diff --git a/common/pkgs/storage/obs/obs.go b/common/pkgs/storage/obs/obs.go deleted file mode 100644 index 6e7889f..0000000 --- a/common/pkgs/storage/obs/obs.go +++ /dev/null @@ -1 +0,0 @@ -package obs diff --git a/common/pkgs/storage/oss/multiPartUploader.go b/common/pkgs/storage/oss/multiPartUploader.go deleted file mode 100644 index 7ce5771..0000000 --- a/common/pkgs/storage/oss/multiPartUploader.go +++ /dev/null @@ -1,88 +0,0 @@ -package oss - -import ( - "fmt" - "io" - "log" - - "github.com/aliyun/aliyun-oss-go-sdk/oss" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - "gitlink.org.cn/cloudream/storage/common/pkgs/storage/types" -) - -type MultiPartUploader struct { - client *oss.Client - bucket *oss.Bucket -} - -func NewMultiPartUpload(address *cdssdk.OSSType) *MultiPartUploader { - // 创建OSSClient实例。 - client, err := oss.New(address.Endpoint, address.AK, address.SK) - if err != nil { - log.Fatalf("Error: %v", err) - } - - bucket, err := client.Bucket(address.Bucket) - if err != nil { - log.Fatalf("Error: %v", err) - } - - return &MultiPartUploader{ - client: client, - bucket: bucket, - } -} - -func (c *MultiPartUploader) Initiate(objectName string) (string, error) { - imur, err := c.bucket.InitiateMultipartUpload(objectName) - if err != nil { - return "", fmt.Errorf("failed to initiate multipart upload: %w", err) - } - return imur.UploadID, nil -} - -func (c *MultiPartUploader) UploadPart(uploadID string, key string, partSize int64, partNumber int, stream io.Reader) (*types.UploadedPartInfo, error) { - uploadParam := oss.InitiateMultipartUploadResult{ - UploadID: uploadID, - Key: key, - Bucket: c.bucket.BucketName, - } - part, err := c.bucket.UploadPart(uploadParam, stream, partSize, partNumber) - if err != nil { - return nil, fmt.Errorf("failed to upload part: %w", err) - } - result := &types.UploadedPartInfo{ - ETag: part.ETag, - PartNumber: partNumber, - } - return result, nil -} - -func (c *MultiPartUploader) Complete(uploadID string, Key string, parts []*types.UploadedPartInfo) error { - notifyParam := oss.InitiateMultipartUploadResult{ - UploadID: uploadID, - Key: Key, - Bucket: c.bucket.BucketName, - } - var uploadPart []oss.UploadPart - for i := 0; i < len(parts); i++ { - uploadPart = append(uploadPart, oss.UploadPart{ - PartNumber: parts[i].PartNumber, - ETag: parts[i].ETag, - }) - } - _, err := c.bucket.CompleteMultipartUpload(notifyParam, uploadPart) - if err != nil { - return err - } - return nil -} - -func (c *MultiPartUploader) Abort() { - -} - -func (c *MultiPartUploader) Close() { - // 关闭client - -} diff --git a/common/pkgs/storage/s3/service.go b/common/pkgs/storage/s3/agent.go similarity index 100% rename from common/pkgs/storage/s3/service.go rename to common/pkgs/storage/s3/agent.go diff --git a/common/pkgs/storage/s3/s3_test.go b/common/pkgs/storage/s3/s3_test.go index 7c3f2cd..7d81900 100644 --- a/common/pkgs/storage/s3/s3_test.go +++ b/common/pkgs/storage/s3/s3_test.go @@ -19,8 +19,8 @@ func Test_S3(t *testing.T) { Convey("OBS", t, func() { cli, bkt, err := createS3Client(&cdssdk.OBSType{ Region: "cn-north-4", - AK: "CANMDYKXIWRDR0IYDB32", - SK: "V67yEYpu7ol2NT8nhLlNF1g9k5hq2VwIP5N5jIoQ", + AK: "*", + SK: "*", Endpoint: "https://obs.cn-north-4.myhuaweicloud.com", Bucket: "pcm3-bucket3", }) diff --git a/common/pkgs/uploader/create_load.go b/common/pkgs/uploader/create_load.go index df81387..a0ec252 100644 --- a/common/pkgs/uploader/create_load.go +++ b/common/pkgs/uploader/create_load.go @@ -56,7 +56,7 @@ func (u *CreateLoadUploader) Upload(pa string, size int64, stream io.Reader) err } exeCtx := exec.NewExecContext() - exec.SetValueByType(exeCtx, u.uploader.stgMgr) + exec.SetValueByType(exeCtx, u.uploader.stgAgts) exec := plans.Execute(exeCtx) exec.BeginWrite(io.NopCloser(stream), hd) ret, err := exec.Wait(context.TODO()) diff --git a/common/pkgs/uploader/update.go b/common/pkgs/uploader/update.go index 359dea0..0a19a86 100644 --- a/common/pkgs/uploader/update.go +++ b/common/pkgs/uploader/update.go @@ -61,7 +61,7 @@ func (w *UpdateUploader) Upload(pat string, size int64, stream io.Reader) error } exeCtx := exec.NewExecContext() - exec.SetValueByType(exeCtx, w.uploader.stgMgr) + exec.SetValueByType(exeCtx, w.uploader.stgAgts) exec := plans.Execute(exeCtx) exec.BeginWrite(io.NopCloser(stream), hd) ret, err := exec.Wait(context.TODO()) diff --git a/common/pkgs/uploader/uploader.go b/common/pkgs/uploader/uploader.go index 6846ba2..bcb3f4d 100644 --- a/common/pkgs/uploader/uploader.go +++ b/common/pkgs/uploader/uploader.go @@ -16,23 +16,23 @@ import ( "gitlink.org.cn/cloudream/storage/common/pkgs/distlock/reqbuilder" "gitlink.org.cn/cloudream/storage/common/pkgs/metacache" coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" - "gitlink.org.cn/cloudream/storage/common/pkgs/storage/svcmgr" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/agtpool" ) type Uploader struct { distlock *distlock.Service connectivity *connectivity.Collector - stgMgr *svcmgr.AgentPool + stgAgts *agtpool.AgentPool stgMeta *metacache.StorageMeta loadTo []cdssdk.StorageID loadToPath []string } -func NewUploader(distlock *distlock.Service, connectivity *connectivity.Collector, stgMgr *svcmgr.AgentPool, stgMeta *metacache.StorageMeta) *Uploader { +func NewUploader(distlock *distlock.Service, connectivity *connectivity.Collector, stgAgts *agtpool.AgentPool, stgMeta *metacache.StorageMeta) *Uploader { return &Uploader{ distlock: distlock, connectivity: connectivity, - stgMgr: stgMgr, + stgAgts: stgAgts, stgMeta: stgMeta, } } diff --git a/scanner/internal/event/event.go b/scanner/internal/event/event.go index 0c644f3..900d404 100644 --- a/scanner/internal/event/event.go +++ b/scanner/internal/event/event.go @@ -9,13 +9,13 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/typedispatcher" "gitlink.org.cn/cloudream/storage/common/pkgs/db2" scevt "gitlink.org.cn/cloudream/storage/common/pkgs/mq/scanner/event" - "gitlink.org.cn/cloudream/storage/common/pkgs/storage/svcmgr" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/agtpool" ) type ExecuteArgs struct { DB *db2.DB DistLock *distlock.Service - StgMgr *svcmgr.AgentPool + StgMgr *agtpool.AgentPool } type Executor = event.Executor[ExecuteArgs] @@ -26,11 +26,11 @@ type Event = event.Event[ExecuteArgs] type ExecuteOption = event.ExecuteOption -func NewExecutor(db *db2.DB, distLock *distlock.Service, stgMgr *svcmgr.AgentPool) Executor { +func NewExecutor(db *db2.DB, distLock *distlock.Service, stgAgts *agtpool.AgentPool) Executor { return event.NewExecutor(ExecuteArgs{ DB: db, DistLock: distLock, - StgMgr: stgMgr, + StgMgr: stgAgts, }) } diff --git a/scanner/main.go b/scanner/main.go index 33385fe..c9de497 100644 --- a/scanner/main.go +++ b/scanner/main.go @@ -10,7 +10,7 @@ import ( "gitlink.org.cn/cloudream/storage/common/pkgs/distlock" agtrpc "gitlink.org.cn/cloudream/storage/common/pkgs/grpc/agent" scmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/scanner" - "gitlink.org.cn/cloudream/storage/common/pkgs/storage/svcmgr" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/agtpool" "gitlink.org.cn/cloudream/storage/scanner/internal/config" "gitlink.org.cn/cloudream/storage/scanner/internal/event" "gitlink.org.cn/cloudream/storage/scanner/internal/mq" @@ -48,10 +48,10 @@ func main() { go serveDistLock(distlockSvc) // 启动存储服务管理器 - stgMgr := svcmgr.NewPool() + stgAgts := agtpool.NewPool() // 启动事件执行器 - eventExecutor := event.NewExecutor(db, distlockSvc, stgMgr) + eventExecutor := event.NewExecutor(db, distlockSvc, stgAgts) go serveEventExecutor(&eventExecutor) agtSvr, err := scmq.NewServer(mq.NewService(&eventExecutor), config.Cfg().RabbitMQ)