From 7745e89829666c55685600a8f50bb242ed1483d2 Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Tue, 26 Sep 2023 10:00:06 +0800 Subject: [PATCH 1/2] =?UTF-8?q?=E4=BC=98=E5=8C=96ClientPool=E5=AE=9A?= =?UTF-8?q?=E4=B9=89=EF=BC=8C=E6=96=B9=E4=BE=BF=E6=B5=8B=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- agent/internal/services/mq/storage.go | 4 +-- agent/internal/task/cache_move_package.go | 4 +-- client/internal/services/bucket.go | 8 ++--- client/internal/services/cacah.go | 4 +-- client/internal/services/package.go | 12 +++---- client/internal/services/scanner.go | 2 +- client/internal/services/storage.go | 6 ++-- client/internal/task/storage_load_package.go | 10 +++--- common/globals/pools.go | 6 ++-- common/pkgs/cmd/create_rep_package.go | 2 +- common/pkgs/cmd/download_package.go | 6 ++-- common/pkgs/iterator/ec_object_iterator.go | 4 +-- common/pkgs/iterator/rep_object_iterator.go | 4 +-- common/pkgs/mq/agent/client.go | 36 +++++++------------ common/pkgs/mq/coordinator/client.go | 36 +++++++------------ common/pkgs/mq/scanner/client.go | 36 +++++++------------ scanner/internal/event/agent_check_cache.go | 6 ++-- scanner/internal/event/agent_check_state.go | 6 ++-- scanner/internal/event/agent_check_storage.go | 6 ++-- 19 files changed, 81 insertions(+), 117 deletions(-) diff --git a/agent/internal/services/mq/storage.go b/agent/internal/services/mq/storage.go index e82cae9..c7399d6 100644 --- a/agent/internal/services/mq/storage.go +++ b/agent/internal/services/mq/storage.go @@ -27,7 +27,7 @@ func (svc *Service) StartStorageLoadPackage(msg *agtmq.StartStorageLoadPackage) return nil, mq.Failed(errorcode.OperationFailed, "new coordinator client failed") } - defer coorCli.Close() + defer stgglb.CoordinatorMQPool.Release(coorCli) getStgResp, err := coorCli.GetStorageInfo(coormq.NewGetStorageInfo(msg.UserID, msg.StorageID)) if err != nil { @@ -163,7 +163,7 @@ func (svc *Service) StartStorageCreatePackage(msg *agtmq.StartStorageCreatePacka return nil, mq.Failed(errorcode.OperationFailed, "new coordinator client failed") } - defer coorCli.Close() + defer stgglb.CoordinatorMQPool.Release(coorCli) getStgResp, err := coorCli.GetStorageInfo(coormq.NewGetStorageInfo(msg.UserID, msg.StorageID)) if err != nil { diff --git a/agent/internal/task/cache_move_package.go b/agent/internal/task/cache_move_package.go index 28e92fa..1be99e3 100644 --- a/agent/internal/task/cache_move_package.go +++ b/agent/internal/task/cache_move_package.go @@ -61,7 +61,7 @@ func (t *CacheMovePackage) do(ctx TaskContext) error { if err != nil { return fmt.Errorf("new coordinator client: %w", err) } - defer coorCli.Close() + defer stgglb.CoordinatorMQPool.Release(coorCli) pkgResp, err := coorCli.GetPackage(coormq.NewGetPackage(t.userID, t.packageID)) if err != nil { @@ -76,7 +76,7 @@ func (t *CacheMovePackage) do(ctx TaskContext) error { return nil } -func (t *CacheMovePackage) moveRep(ctx TaskContext, coorCli *coormq.PoolClient, pkg model.Package) error { +func (t *CacheMovePackage) moveRep(ctx TaskContext, coorCli *coormq.Client, pkg model.Package) error { getRepResp, err := coorCli.GetPackageObjectRepData(coormq.NewGetPackageObjectRepData(pkg.PackageID)) if err != nil { return fmt.Errorf("getting package object rep data: %w", err) diff --git a/client/internal/services/bucket.go b/client/internal/services/bucket.go index 6ef0932..8085496 100644 --- a/client/internal/services/bucket.go +++ b/client/internal/services/bucket.go @@ -27,7 +27,7 @@ func (svc *BucketService) GetUserBuckets(userID int64) ([]model.Bucket, error) { if err != nil { return nil, fmt.Errorf("new coordinator client: %w", err) } - defer coorCli.Close() + defer stgglb.CoordinatorMQPool.Release(coorCli) resp, err := coorCli.GetUserBuckets(coormq.NewGetUserBuckets(userID)) if err != nil { @@ -42,7 +42,7 @@ func (svc *BucketService) GetBucketPackages(userID int64, bucketID int64) ([]mod if err != nil { return nil, fmt.Errorf("new coordinator client: %w", err) } - defer coorCli.Close() + defer stgglb.CoordinatorMQPool.Release(coorCli) resp, err := coorCli.GetBucketPackages(coormq.NewGetBucketPackages(userID, bucketID)) if err != nil { @@ -57,7 +57,7 @@ func (svc *BucketService) CreateBucket(userID int64, bucketName string) (int64, if err != nil { return 0, fmt.Errorf("new coordinator client: %w", err) } - defer coorCli.Close() + defer stgglb.CoordinatorMQPool.Release(coorCli) // TODO 只有阅读了系统操作的源码,才能知道要加哪些锁,但用户的命令可能会调用不止一个系统操作。 // 因此加锁的操作还是必须在用户命令里完成,但具体加锁的内容,则需要被封装起来与系统操作放到一起,方便管理,避免分散改动。 @@ -85,7 +85,7 @@ func (svc *BucketService) DeleteBucket(userID int64, bucketID int64) error { if err != nil { return fmt.Errorf("new coordinator client: %w", err) } - defer coorCli.Close() + defer stgglb.CoordinatorMQPool.Release(coorCli) // TODO 检查用户是否有删除这个Bucket的权限。检查的时候可以只上UserBucket的Read锁 diff --git a/client/internal/services/cacah.go b/client/internal/services/cacah.go index 415089c..6b624cd 100644 --- a/client/internal/services/cacah.go +++ b/client/internal/services/cacah.go @@ -23,7 +23,7 @@ func (svc *CacheService) StartCacheMovePackage(userID int64, packageID int64, no if err != nil { return "", fmt.Errorf("new agent client: %w", err) } - defer agentCli.Close() + defer stgglb.AgentMQPool.Release(agentCli) startResp, err := agentCli.StartCacheMovePackage(agtmq.NewStartCacheMovePackage(userID, packageID)) if err != nil { @@ -38,7 +38,7 @@ func (svc *CacheService) WaitCacheMovePackage(nodeID int64, taskID string, waitT if err != nil { return true, nil, fmt.Errorf("new agent client: %w", err) } - defer agentCli.Close() + defer stgglb.AgentMQPool.Release(agentCli) waitResp, err := agentCli.WaitCacheMovePackage(agtmq.NewWaitCacheMovePackage(taskID, waitTimeout.Milliseconds())) if err != nil { diff --git a/client/internal/services/package.go b/client/internal/services/package.go index 4f5ba28..bfc851f 100644 --- a/client/internal/services/package.go +++ b/client/internal/services/package.go @@ -28,7 +28,7 @@ func (svc *PackageService) DownloadPackage(userID int64, packageID int64) (itera if err != nil { return nil, fmt.Errorf("new coordinator client: %w", err) } - defer coorCli.Close() + defer stgglb.CoordinatorMQPool.Release(coorCli) mutex, err := reqbuilder.NewBuilder(). // 用于判断用户是否有对象权限 @@ -89,7 +89,7 @@ func (svc *PackageService) DownloadPackage(userID int64, packageID int64) (itera } } -func (svc *PackageService) downloadRepPackage(packageID int64, objects []model.Object, coorCli *coormq.PoolClient) (*iterator.RepObjectIterator, error) { +func (svc *PackageService) downloadRepPackage(packageID int64, objects []model.Object, coorCli *coormq.Client) (*iterator.RepObjectIterator, error) { getObjRepDataResp, err := coorCli.GetPackageObjectRepData(coormq.NewGetPackageObjectRepData(packageID)) if err != nil { return nil, fmt.Errorf("getting package object rep data: %w", err) @@ -101,7 +101,7 @@ func (svc *PackageService) downloadRepPackage(packageID int64, objects []model.O return iter, nil } -func (svc *PackageService) downloadECPackage(pkg model.Package, objects []model.Object, coorCli *coormq.PoolClient) (*iterator.ECObjectIterator, error) { +func (svc *PackageService) downloadECPackage(pkg model.Package, objects []model.Object, coorCli *coormq.Client) (*iterator.ECObjectIterator, error) { getObjECDataResp, err := coorCli.GetPackageObjectECData(coormq.NewGetPackageObjectECData(pkg.PackageID)) if err != nil { return nil, fmt.Errorf("getting package object ec data: %w", err) @@ -185,7 +185,7 @@ func (svc *PackageService) DeletePackage(userID int64, packageID int64) error { if err != nil { return fmt.Errorf("new coordinator client: %w", err) } - defer coorCli.Close() + defer stgglb.CoordinatorMQPool.Release(coorCli) mutex, err := reqbuilder.NewBuilder(). Metadata(). @@ -220,7 +220,7 @@ func (svc *PackageService) GetCachedNodes(userID int64, packageID int64) (stgsdk if err != nil { return stgsdk.PackageCachingInfo{}, fmt.Errorf("new coordinator client: %w", err) } - defer coorCli.Close() + defer stgglb.CoordinatorMQPool.Release(coorCli) resp, err := coorCli.GetPackageCachedNodes(coormq.NewGetPackageCachedNodes(userID, packageID)) if err != nil { @@ -240,7 +240,7 @@ func (svc *PackageService) GetLoadedNodes(userID int64, packageID int64) ([]int6 if err != nil { return nil, fmt.Errorf("new coordinator client: %w", err) } - defer coorCli.Close() + defer stgglb.CoordinatorMQPool.Release(coorCli) resp, err := coorCli.GetPackageLoadedNodes(coormq.NewGetPackageLoadedNodes(userID, packageID)) if err != nil { diff --git a/client/internal/services/scanner.go b/client/internal/services/scanner.go index 1e060f3..fa50ba5 100644 --- a/client/internal/services/scanner.go +++ b/client/internal/services/scanner.go @@ -21,7 +21,7 @@ func (svc *ScannerService) PostEvent(event scevt.Event, isEmergency bool, dontMe if err != nil { return fmt.Errorf("new scacnner client: %w", err) } - defer scCli.Close() + defer stgglb.ScannerMQPool.Release(scCli) err = scCli.PostEvent(scmq.NewPostEvent(event, isEmergency, dontMerge)) if err != nil { diff --git a/client/internal/services/storage.go b/client/internal/services/storage.go index 585eaa5..d5514a8 100644 --- a/client/internal/services/storage.go +++ b/client/internal/services/storage.go @@ -44,7 +44,7 @@ func (svc *StorageService) StartStorageCreatePackage(userID int64, bucketID int6 if err != nil { return 0, "", fmt.Errorf("new coordinator client: %w", err) } - defer coorCli.Close() + defer stgglb.CoordinatorMQPool.Release(coorCli) stgResp, err := coorCli.GetStorageInfo(coormq.NewGetStorageInfo(userID, storageID)) if err != nil { @@ -55,7 +55,7 @@ func (svc *StorageService) StartStorageCreatePackage(userID int64, bucketID int6 if err != nil { return 0, "", fmt.Errorf("new agent client: %w", err) } - defer agentCli.Close() + defer stgglb.AgentMQPool.Release(agentCli) startResp, err := agentCli.StartStorageCreatePackage(agtmq.NewStartStorageCreatePackage(userID, bucketID, name, storageID, path, redundancy, nodeAffinity)) if err != nil { @@ -71,7 +71,7 @@ func (svc *StorageService) WaitStorageCreatePackage(nodeID int64, taskID string, // TODO 失败是否要当做任务已经结束? return true, 0, fmt.Errorf("new agent client: %w", err) } - defer agentCli.Close() + defer stgglb.AgentMQPool.Release(agentCli) waitResp, err := agentCli.WaitStorageCreatePackage(agtmq.NewWaitStorageCreatePackage(taskID, waitTimeout.Milliseconds())) if err != nil { diff --git a/client/internal/task/storage_load_package.go b/client/internal/task/storage_load_package.go index 65dd3cf..4c42b15 100644 --- a/client/internal/task/storage_load_package.go +++ b/client/internal/task/storage_load_package.go @@ -62,7 +62,7 @@ func (t *StorageLoadPackage) do(ctx TaskContext) error { if err != nil { return fmt.Errorf("new coordinator client: %w", err) } - defer coorCli.Close() + defer stgglb.CoordinatorMQPool.Release(coorCli) getStgResp, err := coorCli.GetStorageInfo(coormq.NewGetStorageInfo(t.userID, t.storageID)) if err != nil { @@ -70,13 +70,13 @@ func (t *StorageLoadPackage) do(ctx TaskContext) error { } // 然后向代理端发送移动文件的请求 - agentClient, err := stgglb.AgentMQPool.Acquire(getStgResp.NodeID) + agentCli, err := stgglb.AgentMQPool.Acquire(getStgResp.NodeID) if err != nil { return fmt.Errorf("create agent client to %d failed, err: %w", getStgResp.NodeID, err) } - defer agentClient.Close() + defer stgglb.AgentMQPool.Release(agentCli) - agentMoveResp, err := agentClient.StartStorageLoadPackage( + agentMoveResp, err := agentCli.StartStorageLoadPackage( agtmq.NewStartStorageLoadPackage( t.userID, t.packageID, @@ -87,7 +87,7 @@ func (t *StorageLoadPackage) do(ctx TaskContext) error { } for { - waitResp, err := agentClient.WaitStorageLoadPackage(agtmq.NewWaitStorageLoadPackage(agentMoveResp.TaskID, int64(time.Second)*5)) + waitResp, err := agentCli.WaitStorageLoadPackage(agtmq.NewWaitStorageLoadPackage(agentMoveResp.TaskID, int64(time.Second)*5)) if err != nil { return fmt.Errorf("wait loading package: %w", err) } diff --git a/common/globals/pools.go b/common/globals/pools.go index 9b4ef5e..0a09378 100644 --- a/common/globals/pools.go +++ b/common/globals/pools.go @@ -9,11 +9,11 @@ import ( scmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/scanner" ) -var AgentMQPool *agtmq.Pool +var AgentMQPool agtmq.Pool -var CoordinatorMQPool *coormq.Pool +var CoordinatorMQPool coormq.Pool -var ScannerMQPool *scmq.Pool +var ScannerMQPool scmq.Pool func InitMQPool(cfg *stgmq.Config) { AgentMQPool = agtmq.NewPool(cfg) diff --git a/common/pkgs/cmd/create_rep_package.go b/common/pkgs/cmd/create_rep_package.go index 9b31f16..f6024e2 100644 --- a/common/pkgs/cmd/create_rep_package.go +++ b/common/pkgs/cmd/create_rep_package.go @@ -275,7 +275,7 @@ func pinIPFSFile(nodeID int64, fileHash string) error { if err != nil { return fmt.Errorf("new agent client: %w", err) } - defer agtCli.Close() + defer stgglb.AgentMQPool.Release(agtCli) // 然后让最近节点pin本地上传的文件 pinObjResp, err := agtCli.StartPinningObject(agtmq.NewStartPinningObject(fileHash)) diff --git a/common/pkgs/cmd/download_package.go b/common/pkgs/cmd/download_package.go index b7c6d3a..2b7d557 100644 --- a/common/pkgs/cmd/download_package.go +++ b/common/pkgs/cmd/download_package.go @@ -38,7 +38,7 @@ func (t *DownloadPackage) Execute(ctx *DownloadPackageContext) error { if err != nil { return fmt.Errorf("new coordinator client: %w", err) } - defer coorCli.Close() + defer stgglb.CoordinatorMQPool.Release(coorCli) getPkgResp, err := coorCli.GetPackage(coormq.NewGetPackage(t.userID, t.packageID)) if err != nil { @@ -65,7 +65,7 @@ func (t *DownloadPackage) downloadRep(ctx *DownloadPackageContext) (iterator.Dow if err != nil { return nil, fmt.Errorf("new coordinator client: %w", err) } - defer coorCli.Close() + defer stgglb.CoordinatorMQPool.Release(coorCli) getObjsResp, err := coorCli.GetPackageObjects(coormq.NewGetPackageObjects(t.userID, t.packageID)) if err != nil { @@ -89,7 +89,7 @@ func (t *DownloadPackage) downloadEC(ctx *DownloadPackageContext, pkg model.Pack if err != nil { return nil, fmt.Errorf("new coordinator client: %w", err) } - defer coorCli.Close() + defer stgglb.CoordinatorMQPool.Release(coorCli) getObjsResp, err := coorCli.GetPackageObjects(coormq.NewGetPackageObjects(t.userID, t.packageID)) if err != nil { diff --git a/common/pkgs/iterator/ec_object_iterator.go b/common/pkgs/iterator/ec_object_iterator.go index b9b94a1..9d5454a 100644 --- a/common/pkgs/iterator/ec_object_iterator.go +++ b/common/pkgs/iterator/ec_object_iterator.go @@ -48,7 +48,7 @@ func (i *ECObjectIterator) MoveNext() (*IterDownloadingObject, error) { if err != nil { return nil, fmt.Errorf("new coordinator client: %w", err) } - defer coorCli.Close() + defer stgglb.CoordinatorMQPool.Release(coorCli) if !i.inited { i.inited = true @@ -69,7 +69,7 @@ func (i *ECObjectIterator) MoveNext() (*IterDownloadingObject, error) { return item, err } -func (iter *ECObjectIterator) doMove(coorCli *coormq.PoolClient) (*IterDownloadingObject, error) { +func (iter *ECObjectIterator) doMove(coorCli *coormq.Client) (*IterDownloadingObject, error) { obj := iter.objects[iter.currentIndex] ecData := iter.objectECData[iter.currentIndex] diff --git a/common/pkgs/iterator/rep_object_iterator.go b/common/pkgs/iterator/rep_object_iterator.go index 3ae9f28..8bff9b1 100644 --- a/common/pkgs/iterator/rep_object_iterator.go +++ b/common/pkgs/iterator/rep_object_iterator.go @@ -60,7 +60,7 @@ func (i *RepObjectIterator) MoveNext() (*IterDownloadingObject, error) { if err != nil { return nil, fmt.Errorf("new coordinator client: %w", err) } - defer coorCli.Close() + defer stgglb.CoordinatorMQPool.Release(coorCli) if !i.inited { i.inited = true @@ -81,7 +81,7 @@ func (i *RepObjectIterator) MoveNext() (*IterDownloadingObject, error) { return item, err } -func (i *RepObjectIterator) doMove(coorCli *coormq.PoolClient) (*IterDownloadingObject, error) { +func (i *RepObjectIterator) doMove(coorCli *coormq.Client) (*IterDownloadingObject, error) { repData := i.objectRepData[i.currentIndex] if len(repData.NodeIDs) == 0 { return nil, fmt.Errorf("no node has this file %s", repData.FileHash) diff --git a/common/pkgs/mq/agent/client.go b/common/pkgs/mq/agent/client.go index 37f4638..de7d134 100644 --- a/common/pkgs/mq/agent/client.go +++ b/common/pkgs/mq/agent/client.go @@ -6,12 +6,12 @@ import ( ) type Client struct { - rabbitCli *mq.RabbitMQClient + rabbitCli *mq.RabbitMQTransport id int64 } func NewClient(id int64, cfg *stgmq.Config) (*Client, error) { - rabbitCli, err := mq.NewRabbitMQClient(cfg.MakeConnectingURL(), stgmq.MakeAgentQueueName(id), "") + rabbitCli, err := mq.NewRabbitMQTransport(cfg.MakeConnectingURL(), stgmq.MakeAgentQueueName(id), "") if err != nil { return nil, err } @@ -26,36 +26,24 @@ func (c *Client) Close() { c.rabbitCli.Close() } -type PoolClient struct { - *Client - owner *Pool +type Pool interface { + Acquire(id int64) (*Client, error) + Release(cli *Client) } -func (c *PoolClient) Close() { - c.owner.Release(c) -} - -type Pool struct { +type pool struct { mqcfg *stgmq.Config } -func NewPool(mqcfg *stgmq.Config) *Pool { - return &Pool{ +func NewPool(mqcfg *stgmq.Config) Pool { + return &pool{ mqcfg: mqcfg, } } -func (p *Pool) Acquire(id int64) (*PoolClient, error) { - cli, err := NewClient(id, p.mqcfg) - if err != nil { - return nil, err - } - - return &PoolClient{ - Client: cli, - owner: p, - }, nil +func (p *pool) Acquire(id int64) (*Client, error) { + return NewClient(id, p.mqcfg) } -func (p *Pool) Release(cli *PoolClient) { - cli.Client.Close() +func (p *pool) Release(cli *Client) { + cli.Close() } diff --git a/common/pkgs/mq/coordinator/client.go b/common/pkgs/mq/coordinator/client.go index 8658b0d..8d25532 100644 --- a/common/pkgs/mq/coordinator/client.go +++ b/common/pkgs/mq/coordinator/client.go @@ -6,11 +6,11 @@ import ( ) type Client struct { - rabbitCli *mq.RabbitMQClient + rabbitCli *mq.RabbitMQTransport } func NewClient(cfg *stgmq.Config) (*Client, error) { - rabbitCli, err := mq.NewRabbitMQClient(cfg.MakeConnectingURL(), stgmq.COORDINATOR_QUEUE_NAME, "") + rabbitCli, err := mq.NewRabbitMQTransport(cfg.MakeConnectingURL(), stgmq.COORDINATOR_QUEUE_NAME, "") if err != nil { return nil, err } @@ -24,36 +24,24 @@ func (c *Client) Close() { c.rabbitCli.Close() } -type PoolClient struct { - *Client - owner *Pool +type Pool interface { + Acquire() (*Client, error) + Release(cli *Client) } -func (c *PoolClient) Close() { - c.owner.Release(c) -} - -type Pool struct { +type pool struct { mqcfg *stgmq.Config } -func NewPool(mqcfg *stgmq.Config) *Pool { - return &Pool{ +func NewPool(mqcfg *stgmq.Config) Pool { + return &pool{ mqcfg: mqcfg, } } -func (p *Pool) Acquire() (*PoolClient, error) { - cli, err := NewClient(p.mqcfg) - if err != nil { - return nil, err - } - - return &PoolClient{ - Client: cli, - owner: p, - }, nil +func (p *pool) Acquire() (*Client, error) { + return NewClient(p.mqcfg) } -func (p *Pool) Release(cli *PoolClient) { - cli.Client.Close() +func (p *pool) Release(cli *Client) { + cli.Close() } diff --git a/common/pkgs/mq/scanner/client.go b/common/pkgs/mq/scanner/client.go index 64ff8b6..8cb70c3 100644 --- a/common/pkgs/mq/scanner/client.go +++ b/common/pkgs/mq/scanner/client.go @@ -6,11 +6,11 @@ import ( ) type Client struct { - rabbitCli *mq.RabbitMQClient + rabbitCli *mq.RabbitMQTransport } func NewClient(cfg *stgmq.Config) (*Client, error) { - rabbitCli, err := mq.NewRabbitMQClient(cfg.MakeConnectingURL(), stgmq.SCANNER_QUEUE_NAME, "") + rabbitCli, err := mq.NewRabbitMQTransport(cfg.MakeConnectingURL(), stgmq.SCANNER_QUEUE_NAME, "") if err != nil { return nil, err } @@ -24,36 +24,24 @@ func (c *Client) Close() { c.rabbitCli.Close() } -type PoolClient struct { - *Client - owner *Pool +type Pool interface { + Acquire() (*Client, error) + Release(cli *Client) } -func (c *PoolClient) Close() { - c.owner.Release(c) -} - -type Pool struct { +type pool struct { mqcfg *stgmq.Config } -func NewPool(mqcfg *stgmq.Config) *Pool { - return &Pool{ +func NewPool(mqcfg *stgmq.Config) Pool { + return &pool{ mqcfg: mqcfg, } } -func (p *Pool) Acquire() (*PoolClient, error) { - cli, err := NewClient(p.mqcfg) - if err != nil { - return nil, err - } - - return &PoolClient{ - Client: cli, - owner: p, - }, nil +func (p *pool) Acquire() (*Client, error) { + return NewClient(p.mqcfg) } -func (p *Pool) Release(cli *PoolClient) { - cli.Client.Close() +func (p *pool) Release(cli *Client) { + cli.Close() } diff --git a/scanner/internal/event/agent_check_cache.go b/scanner/internal/event/agent_check_cache.go index ec71eb9..8500776 100644 --- a/scanner/internal/event/agent_check_cache.go +++ b/scanner/internal/event/agent_check_cache.go @@ -126,14 +126,14 @@ func (t *AgentCheckCache) startCheck(execCtx ExecuteContext, isComplete bool, ca log := logger.WithType[AgentCheckCache]("Event") // 然后向代理端发送移动文件的请求 - agentClient, err := stgglb.AgentMQPool.Acquire(t.NodeID) + agtCli, err := stgglb.AgentMQPool.Acquire(t.NodeID) if err != nil { log.WithField("NodeID", t.NodeID).Warnf("create agent client failed, err: %s", err.Error()) return } - defer agentClient.Close() + defer stgglb.AgentMQPool.Release(agtCli) - checkResp, err := agentClient.CheckCache(agtmq.NewCheckCache(isComplete, caches), mq.RequestOption{Timeout: time.Minute}) + checkResp, err := agtCli.CheckCache(agtmq.NewCheckCache(isComplete, caches), mq.RequestOption{Timeout: time.Minute}) if err != nil { log.WithField("NodeID", t.NodeID).Warnf("checking ipfs: %s", err.Error()) return diff --git a/scanner/internal/event/agent_check_state.go b/scanner/internal/event/agent_check_state.go index ac2c6ac..de59a4c 100644 --- a/scanner/internal/event/agent_check_state.go +++ b/scanner/internal/event/agent_check_state.go @@ -61,14 +61,14 @@ func (t *AgentCheckState) Execute(execCtx ExecuteContext) { return } - agentClient, err := stgglb.AgentMQPool.Acquire(t.NodeID) + agtCli, err := stgglb.AgentMQPool.Acquire(t.NodeID) if err != nil { log.WithField("NodeID", t.NodeID).Warnf("create agent client failed, err: %s", err.Error()) return } - defer agentClient.Close() + defer stgglb.AgentMQPool.Release(agtCli) - getResp, err := agentClient.GetState(agtmq.NewGetState(), mq.RequestOption{Timeout: time.Second * 30}) + getResp, err := agtCli.GetState(agtmq.NewGetState(), mq.RequestOption{Timeout: time.Second * 30}) if err != nil { log.WithField("NodeID", t.NodeID).Warnf("getting state: %s", err.Error()) diff --git a/scanner/internal/event/agent_check_storage.go b/scanner/internal/event/agent_check_storage.go index e6ee7d1..ada76b9 100644 --- a/scanner/internal/event/agent_check_storage.go +++ b/scanner/internal/event/agent_check_storage.go @@ -143,14 +143,14 @@ func (t *AgentCheckStorage) startCheck(execCtx ExecuteContext, stg model.Storage log := logger.WithType[AgentCheckStorage]("Event") // 投递任务 - agentClient, err := stgglb.AgentMQPool.Acquire(stg.NodeID) + agtCli, err := stgglb.AgentMQPool.Acquire(stg.NodeID) if err != nil { log.WithField("NodeID", stg.NodeID).Warnf("create agent client failed, err: %s", err.Error()) return } - defer agentClient.Close() + defer stgglb.AgentMQPool.Release(agtCli) - checkResp, err := agentClient.StorageCheck(agtmq.NewStorageCheck(stg.StorageID, stg.Directory, isComplete, packages), mq.RequestOption{Timeout: time.Minute}) + checkResp, err := agtCli.StorageCheck(agtmq.NewStorageCheck(stg.StorageID, stg.Directory, isComplete, packages), mq.RequestOption{Timeout: time.Minute}) if err != nil { log.WithField("NodeID", stg.NodeID).Warnf("checking storage: %s", err.Error()) return From 3a13836b3ee762bd65bbada87db8529b1932cc6a Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Sat, 7 Oct 2023 11:16:28 +0800 Subject: [PATCH 2/2] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E8=B0=83=E8=AF=95?= =?UTF-8?q?=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- agent/internal/services/mq/cache.go | 4 +-- agent/internal/task/cache_move_package.go | 1 + agent/main.go | 5 ++-- common/pkgs/mq/agent/server.go | 6 +++-- common/pkgs/mq/coordinator/server.go | 6 +++-- common/pkgs/mq/scanner/event.go | 4 --- .../mq/scanner/event/agent_check_cache.go | 6 ++--- .../mq/scanner/event/agent_check_state.go | 6 ++--- .../mq/scanner/event/agent_check_storage.go | 6 ++--- common/pkgs/mq/scanner/event/check_cache.go | 6 ++--- common/pkgs/mq/scanner/event/check_package.go | 6 ++--- .../pkgs/mq/scanner/event/check_rep_count.go | 6 ++--- common/pkgs/mq/scanner/event/event.go | 10 ++++++- common/pkgs/mq/scanner/server.go | 6 +++-- coordinator/main.go | 4 +-- magefiles/main.go | 26 +++++++++++++------ scanner/internal/event/agent_check_cache.go | 2 +- scanner/internal/event/agent_check_state.go | 2 +- scanner/internal/event/agent_check_storage.go | 2 +- scanner/internal/event/check_cache.go | 2 +- scanner/internal/event/check_package.go | 2 +- scanner/internal/event/check_rep_count.go | 2 +- scanner/internal/event/event.go | 2 +- scanner/main.go | 5 ++-- 24 files changed, 69 insertions(+), 58 deletions(-) diff --git a/agent/internal/services/mq/cache.go b/agent/internal/services/mq/cache.go index 13f6e57..9441b2b 100644 --- a/agent/internal/services/mq/cache.go +++ b/agent/internal/services/mq/cache.go @@ -147,9 +147,9 @@ func (svc *Service) WaitCacheMovePackage(msg *agtmq.WaitCacheMovePackage) (*agtm errMsg = tsk.Error().Error() } - return mq.ReplyOK(agtmq.NewWaitCacheMovePackageResp(true, errMsg, nil)) + return mq.ReplyOK(agtmq.NewWaitCacheMovePackageResp(true, errMsg, mvPkgTask.ResultCacheInfos)) } - return mq.ReplyOK(agtmq.NewWaitCacheMovePackageResp(false, "", mvPkgTask.ResultCacheInfos)) + return mq.ReplyOK(agtmq.NewWaitCacheMovePackageResp(false, "", nil)) } } diff --git a/agent/internal/task/cache_move_package.go b/agent/internal/task/cache_move_package.go index 1be99e3..c7c121b 100644 --- a/agent/internal/task/cache_move_package.go +++ b/agent/internal/task/cache_move_package.go @@ -71,6 +71,7 @@ func (t *CacheMovePackage) do(ctx TaskContext) error { if pkgResp.Redundancy.IsRepInfo() { return t.moveRep(ctx, coorCli, pkgResp.Package) } else { + return fmt.Errorf("not implement yet!") // TODO EC的CacheMove逻辑 } diff --git a/agent/main.go b/agent/main.go index 188c850..f6d4ad5 100644 --- a/agent/main.go +++ b/agent/main.go @@ -69,9 +69,10 @@ func main() { if err != nil { log.Fatalf("new agent server failed, err: %s", err.Error()) } - agtSvr.OnError = func(err error) { + agtSvr.OnError(func(err error) { log.Warnf("agent server err: %s", err.Error()) - } + }) + go serveAgentServer(agtSvr, &wg) go reportStatus(&wg) //网络延迟感知 diff --git a/common/pkgs/mq/agent/server.go b/common/pkgs/mq/agent/server.go index b2e6dd0..8819c17 100644 --- a/common/pkgs/mq/agent/server.go +++ b/common/pkgs/mq/agent/server.go @@ -18,8 +18,6 @@ type Service interface { type Server struct { service Service rabbitSvr mq.RabbitMQServer - - OnError func(err error) } func NewServer(svc Service, id int64, cfg *mymq.Config) (*Server, error) { @@ -51,6 +49,10 @@ func (s *Server) Serve() error { return s.rabbitSvr.Serve() } +func (s *Server) OnError(callback func(error)) { + s.rabbitSvr.OnError = callback +} + var msgDispatcher mq.MessageDispatcher = mq.NewMessageDispatcher() // Register 将Service中的一个接口函数作为指定类型消息的处理函数,同时会注册请求和响应的消息类型 diff --git a/common/pkgs/mq/coordinator/server.go b/common/pkgs/mq/coordinator/server.go index 437a3f9..76b57a7 100644 --- a/common/pkgs/mq/coordinator/server.go +++ b/common/pkgs/mq/coordinator/server.go @@ -27,8 +27,6 @@ type Service interface { type Server struct { service Service rabbitSvr mq.RabbitMQServer - - OnError func(err error) } func NewServer(svc Service, cfg *mymq.Config) (*Server, error) { @@ -59,6 +57,10 @@ func (s *Server) Serve() error { return s.rabbitSvr.Serve() } +func (s *Server) OnError(callback func(error)) { + s.rabbitSvr.OnError = callback +} + var msgDispatcher mq.MessageDispatcher = mq.NewMessageDispatcher() // Register 将Service中的一个接口函数作为指定类型消息的处理函数,同时会注册请求和响应的消息类型 diff --git a/common/pkgs/mq/scanner/event.go b/common/pkgs/mq/scanner/event.go index 09b0ba0..bc1238a 100644 --- a/common/pkgs/mq/scanner/event.go +++ b/common/pkgs/mq/scanner/event.go @@ -29,7 +29,3 @@ func NewPostEvent(event scevt.Event, isEmergency bool, dontMerge bool) *PostEven func (client *Client) PostEvent(msg *PostEvent) error { return mq.Send(Service.PostEvent, client.rabbitCli, msg) } - -func init() { - mq.RegisterUnionType(scevt.EventTypeUnino) -} diff --git a/common/pkgs/mq/scanner/event/agent_check_cache.go b/common/pkgs/mq/scanner/event/agent_check_cache.go index a050647..08b241a 100644 --- a/common/pkgs/mq/scanner/event/agent_check_cache.go +++ b/common/pkgs/mq/scanner/event/agent_check_cache.go @@ -1,5 +1,7 @@ package event +var _ = Register[*AgentCheckCache]() + type AgentCheckCache struct { EventBase NodeID int64 `json:"nodeID"` @@ -12,7 +14,3 @@ func NewAgentCheckCache(nodeID int64, fileHashes []string) *AgentCheckCache { FileHashes: fileHashes, } } - -func init() { - Register[AgentCheckCache]() -} diff --git a/common/pkgs/mq/scanner/event/agent_check_state.go b/common/pkgs/mq/scanner/event/agent_check_state.go index c369735..54b9741 100644 --- a/common/pkgs/mq/scanner/event/agent_check_state.go +++ b/common/pkgs/mq/scanner/event/agent_check_state.go @@ -1,5 +1,7 @@ package event +var _ = Register[*AgentCheckState]() + type AgentCheckState struct { EventBase NodeID int64 `json:"nodeID"` @@ -10,7 +12,3 @@ func NewAgentCheckState(nodeID int64) *AgentCheckState { NodeID: nodeID, } } - -func init() { - Register[AgentCheckState]() -} diff --git a/common/pkgs/mq/scanner/event/agent_check_storage.go b/common/pkgs/mq/scanner/event/agent_check_storage.go index a7fe7e1..3c971f2 100644 --- a/common/pkgs/mq/scanner/event/agent_check_storage.go +++ b/common/pkgs/mq/scanner/event/agent_check_storage.go @@ -1,5 +1,7 @@ package event +var _ = Register[*AgentCheckStorage]() + type AgentCheckStorage struct { EventBase StorageID int64 `json:"storageID"` @@ -12,7 +14,3 @@ func NewAgentCheckStorage(storageID int64, packageIDs []int64) *AgentCheckStorag PackageIDs: packageIDs, } } - -func init() { - Register[AgentCheckStorage]() -} diff --git a/common/pkgs/mq/scanner/event/check_cache.go b/common/pkgs/mq/scanner/event/check_cache.go index ffb9278..3325a0b 100644 --- a/common/pkgs/mq/scanner/event/check_cache.go +++ b/common/pkgs/mq/scanner/event/check_cache.go @@ -1,5 +1,7 @@ package event +var _ = Register[*CheckCache]() + type CheckCache struct { EventBase NodeID int64 `json:"nodeID"` @@ -10,7 +12,3 @@ func NewCheckCache(nodeID int64) *CheckCache { NodeID: nodeID, } } - -func init() { - Register[CheckCache]() -} diff --git a/common/pkgs/mq/scanner/event/check_package.go b/common/pkgs/mq/scanner/event/check_package.go index 842a893..526dda0 100644 --- a/common/pkgs/mq/scanner/event/check_package.go +++ b/common/pkgs/mq/scanner/event/check_package.go @@ -1,5 +1,7 @@ package event +var _ = Register[*CheckPackage]() + type CheckPackage struct { EventBase PackageIDs []int64 `json:"packageIDs"` @@ -10,7 +12,3 @@ func NewCheckPackage(packageIDs []int64) *CheckPackage { PackageIDs: packageIDs, } } - -func init() { - Register[CheckPackage]() -} diff --git a/common/pkgs/mq/scanner/event/check_rep_count.go b/common/pkgs/mq/scanner/event/check_rep_count.go index d393cb1..39f2c74 100644 --- a/common/pkgs/mq/scanner/event/check_rep_count.go +++ b/common/pkgs/mq/scanner/event/check_rep_count.go @@ -1,5 +1,7 @@ package event +var _ = Register[*CheckRepCount]() + type CheckRepCount struct { EventBase FileHashes []string `json:"fileHashes"` @@ -10,7 +12,3 @@ func NewCheckRepCount(fileHashes []string) *CheckRepCount { FileHashes: fileHashes, } } - -func init() { - Register[CheckRepCount]() -} diff --git a/common/pkgs/mq/scanner/event/event.go b/common/pkgs/mq/scanner/event/event.go index 0eafbd7..8fc6578 100644 --- a/common/pkgs/mq/scanner/event/event.go +++ b/common/pkgs/mq/scanner/event/event.go @@ -1,6 +1,7 @@ package event import ( + "gitlink.org.cn/cloudream/common/pkgs/mq" "gitlink.org.cn/cloudream/common/pkgs/types" myreflect "gitlink.org.cn/cloudream/common/utils/reflect" ) @@ -15,6 +16,13 @@ type EventBase struct{} func (e *EventBase) Noop() {} -func Register[T any]() { +// 注:此函数必须以var _ = Register[xxx]()的形式被调用,这样才能保证init中RegisterUnionType时 +// TypeUnion不是空的。(因为包级变量初始化比init函数调用先进行) +func Register[T Event]() any { EventTypeUnino.Add(myreflect.TypeOf[T]()) + return nil +} + +func init() { + mq.RegisterUnionType(EventTypeUnino) } diff --git a/common/pkgs/mq/scanner/server.go b/common/pkgs/mq/scanner/server.go index 73fd65f..8219519 100644 --- a/common/pkgs/mq/scanner/server.go +++ b/common/pkgs/mq/scanner/server.go @@ -12,8 +12,6 @@ type Service interface { type Server struct { service Service rabbitSvr mq.RabbitMQServer - - OnError func(err error) } func NewServer(svc Service, cfg *mymq.Config) (*Server, error) { @@ -45,6 +43,10 @@ func (s *Server) Serve() error { return s.rabbitSvr.Serve() } +func (s *Server) OnError(callback func(error)) { + s.rabbitSvr.OnError = callback +} + var msgDispatcher mq.MessageDispatcher = mq.NewMessageDispatcher() // Register 将Service中的一个接口函数作为指定类型消息的处理函数,同时会注册请求和响应的消息类型 diff --git a/coordinator/main.go b/coordinator/main.go index 7474169..ced3f98 100644 --- a/coordinator/main.go +++ b/coordinator/main.go @@ -40,9 +40,9 @@ func main() { logger.Fatalf("new coordinator server failed, err: %s", err.Error()) } - coorSvr.OnError = func(err error) { + coorSvr.OnError(func(err error) { logger.Warnf("coordinator server err: %s", err.Error()) - } + }) // 启动服务 go serveCoorServer(coorSvr) diff --git a/magefiles/main.go b/magefiles/main.go index bd70eb4..b1fa0db 100644 --- a/magefiles/main.go +++ b/magefiles/main.go @@ -69,8 +69,13 @@ func Scripts() error { scriptsDir := "./common/assets/scripts" info, err := os.Stat(scriptsDir) - if errors.Is(err, os.ErrNotExist) || !info.IsDir() { - return fmt.Errorf("script directory not exists or is not a directory") + if errors.Is(err, os.ErrNotExist) { + fmt.Printf("no scripts.\n") + return nil + } + + if !info.IsDir() { + return fmt.Errorf("scripts is not a directory") } fullDirPath, err := filepath.Abs(filepath.Join(BuildDir, "scripts")) @@ -87,8 +92,13 @@ func Confs() error { confDir := "./common/assets/confs" info, err := os.Stat(confDir) - if errors.Is(err, os.ErrNotExist) || !info.IsDir() { - return fmt.Errorf("conf directory not exists or is not a directory") + if errors.Is(err, os.ErrNotExist) { + fmt.Printf("no confs.\n") + return nil + } + + if !info.IsDir() { + return fmt.Errorf("confs is not a directory") } fullDirPath, err := filepath.Abs(filepath.Join(BuildDir, "confs")) @@ -106,7 +116,7 @@ func Agent() error { OutputName: "agent", OutputDir: "agent", AssetsDir: "assets", - EntryFile:"agent/main.go", + EntryFile: "agent/main.go", }) } @@ -115,7 +125,7 @@ func Client() error { OutputName: "client", OutputDir: "client", AssetsDir: "assets", - EntryFile:"client/main.go", + EntryFile: "client/main.go", }) } @@ -124,7 +134,7 @@ func Coordinator() error { OutputName: "coordinator", OutputDir: "coordinator", AssetsDir: "assets", - EntryFile:"coordinator/main.go", + EntryFile: "coordinator/main.go", }) } @@ -133,6 +143,6 @@ func Scanner() error { OutputName: "scanner", OutputDir: "scanner", AssetsDir: "assets", - EntryFile:"scanner/main.go", + EntryFile: "scanner/main.go", }) } diff --git a/scanner/internal/event/agent_check_cache.go b/scanner/internal/event/agent_check_cache.go index 8500776..a838d50 100644 --- a/scanner/internal/event/agent_check_cache.go +++ b/scanner/internal/event/agent_check_cache.go @@ -170,5 +170,5 @@ func (t *AgentCheckCache) startCheck(execCtx ExecuteContext, isComplete bool, ca } func init() { - RegisterMessageConvertor(func(msg scevt.AgentCheckCache) Event { return NewAgentCheckCache(msg.NodeID, msg.FileHashes) }) + RegisterMessageConvertor(func(msg *scevt.AgentCheckCache) Event { return NewAgentCheckCache(msg.NodeID, msg.FileHashes) }) } diff --git a/scanner/internal/event/agent_check_state.go b/scanner/internal/event/agent_check_state.go index de59a4c..314059f 100644 --- a/scanner/internal/event/agent_check_state.go +++ b/scanner/internal/event/agent_check_state.go @@ -113,5 +113,5 @@ func (t *AgentCheckState) Execute(execCtx ExecuteContext) { } func init() { - RegisterMessageConvertor(func(msg scevt.AgentCheckState) Event { return NewAgentCheckState(msg.NodeID) }) + RegisterMessageConvertor(func(msg *scevt.AgentCheckState) Event { return NewAgentCheckState(msg.NodeID) }) } diff --git a/scanner/internal/event/agent_check_storage.go b/scanner/internal/event/agent_check_storage.go index ada76b9..26670c9 100644 --- a/scanner/internal/event/agent_check_storage.go +++ b/scanner/internal/event/agent_check_storage.go @@ -195,5 +195,5 @@ func (t *AgentCheckStorage) startCheck(execCtx ExecuteContext, stg model.Storage } func init() { - RegisterMessageConvertor(func(msg scevt.AgentCheckStorage) Event { return NewAgentCheckStorage(msg.StorageID, msg.PackageIDs) }) + RegisterMessageConvertor(func(msg *scevt.AgentCheckStorage) Event { return NewAgentCheckStorage(msg.StorageID, msg.PackageIDs) }) } diff --git a/scanner/internal/event/check_cache.go b/scanner/internal/event/check_cache.go index 89dbec5..fd3c594 100644 --- a/scanner/internal/event/check_cache.go +++ b/scanner/internal/event/check_cache.go @@ -80,5 +80,5 @@ func (t *CheckCache) Execute(execCtx ExecuteContext) { } func init() { - RegisterMessageConvertor(func(msg scevt.CheckCache) Event { return NewCheckCache(msg.NodeID) }) + RegisterMessageConvertor(func(msg *scevt.CheckCache) Event { return NewCheckCache(msg.NodeID) }) } diff --git a/scanner/internal/event/check_package.go b/scanner/internal/event/check_package.go index 0a5bc91..8f2c653 100644 --- a/scanner/internal/event/check_package.go +++ b/scanner/internal/event/check_package.go @@ -53,5 +53,5 @@ func (t *CheckPackage) Execute(execCtx ExecuteContext) { } func init() { - RegisterMessageConvertor(func(msg scevt.CheckPackage) Event { return NewCheckPackage(msg.PackageIDs) }) + RegisterMessageConvertor(func(msg *scevt.CheckPackage) Event { return NewCheckPackage(msg.PackageIDs) }) } diff --git a/scanner/internal/event/check_rep_count.go b/scanner/internal/event/check_rep_count.go index 78fb874..72f5f44 100644 --- a/scanner/internal/event/check_rep_count.go +++ b/scanner/internal/event/check_rep_count.go @@ -211,5 +211,5 @@ func chooseDeleteAvaiRepNodes(allNodes []model.Node, curAvaiRepNodes []model.Nod } func init() { - RegisterMessageConvertor(func(msg scevt.CheckRepCount) Event { return NewCheckRepCount(msg.FileHashes) }) + RegisterMessageConvertor(func(msg *scevt.CheckRepCount) Event { return NewCheckRepCount(msg.FileHashes) }) } diff --git a/scanner/internal/event/event.go b/scanner/internal/event/event.go index 27a1aba..2533ade 100644 --- a/scanner/internal/event/event.go +++ b/scanner/internal/event/event.go @@ -36,7 +36,7 @@ var msgDispatcher = typedispatcher.NewTypeDispatcher[Event]() func FromMessage(msg scevt.Event) (Event, error) { event, ok := msgDispatcher.Dispatch(msg) if !ok { - return nil, fmt.Errorf("unknow event message type: %s", reflect.TypeOf(msg).Name()) + return nil, fmt.Errorf("unknow event message type: %s", reflect.TypeOf(msg).String()) } return event, nil diff --git a/scanner/main.go b/scanner/main.go index 173af98..ca659b4 100644 --- a/scanner/main.go +++ b/scanner/main.go @@ -53,9 +53,10 @@ func main() { if err != nil { logger.Fatalf("new agent server failed, err: %s", err.Error()) } - agtSvr.OnError = func(err error) { + agtSvr.OnError(func(err error) { logger.Warnf("agent server err: %s", err.Error()) - } + }) + go serveScannerServer(agtSvr, &wg) tickExecutor := tickevent.NewExecutor(tickevent.ExecuteArgs{