diff --git a/client/internal/ticktock/shardstore_gc.go b/client/internal/ticktock/shardstore_gc.go index 9cad74e..8dafb6b 100644 --- a/client/internal/ticktock/shardstore_gc.go +++ b/client/internal/ticktock/shardstore_gc.go @@ -98,12 +98,12 @@ func (j *ShardStoreGC) gcOne(t *TickTock, space *types.UserSpaceDetail) error { ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Minute)) defer cancel() - _, err = agtCli.CacheGC(ctx, &hubrpc.CacheGC{ + _, cerr := agtCli.CacheGC(ctx, &hubrpc.CacheGC{ UserSpace: *space, Availables: allFileHashes, }) - if err != nil { - return fmt.Errorf("request to cache gc: %w", err) + if cerr != nil { + return fmt.Errorf("request to cache gc: %w", cerr) } return nil } diff --git a/client/internal/uploader/user_space_upload.go b/client/internal/uploader/user_space_upload.go index da3d1b3..9223aa4 100644 --- a/client/internal/uploader/user_space_upload.go +++ b/client/internal/uploader/user_space_upload.go @@ -109,13 +109,13 @@ func (u *Uploader) UserSpaceUpload(userSpaceID clitypes.UserSpaceID, rootPath st srcHubCli := stgglb.HubRPCPool.Get(stgglb.SelectGRPCAddress(*srcSpace.MasterHub, *addr)) defer srcHubCli.Release() - listAllResp, err := srcHubCli.PublicStoreListAll(context.Background(), &hubrpc.PublicStoreListAll{ + listAllResp, cerr := srcHubCli.PublicStoreListAll(context.Background(), &hubrpc.PublicStoreListAll{ UserSpace: *srcSpace, Path: rootPath, }) - if err != nil { + if cerr != nil { delPkg() - return nil, fmt.Errorf("listing public store: %w", err) + return nil, fmt.Errorf("listing public store: %w", cerr) } adds, err := u.uploadFromPublicStore(srcSpace, &targetSapce.Space, listAllResp.Entries, rootPath) diff --git a/common/pkgs/ioswitch2/hub_worker.go b/common/pkgs/ioswitch2/hub_worker.go index 7aa4b21..a6f2c97 100644 --- a/common/pkgs/ioswitch2/hub_worker.go +++ b/common/pkgs/ioswitch2/hub_worker.go @@ -48,7 +48,7 @@ type HubWorkerClient struct { func (c *HubWorkerClient) ExecutePlan(ctx context.Context, plan exec.Plan) error { _, err := c.cli.ExecuteIOPlan(ctx, &hubrpc.ExecuteIOPlan{Plan: plan}) - return err + return err.ToError() } func (c *HubWorkerClient) SendStream(ctx context.Context, planID exec.PlanID, id exec.VarID, stream io.ReadCloser) error { _, err := c.cli.SendIOStream(ctx, &hubrpc.SendIOStream{ @@ -60,20 +60,20 @@ func (c *HubWorkerClient) SendStream(ctx context.Context, planID exec.PlanID, id } }), }) - return err + return err.ToError() } func (c *HubWorkerClient) SendVar(ctx context.Context, planID exec.PlanID, id exec.VarID, value exec.VarValue) error { _, err := c.cli.SendIOVar(ctx, &hubrpc.SendIOVar{ PlanID: planID, VarID: id, Value: value, }) - return err + return err.ToError() } func (c *HubWorkerClient) GetStream(ctx context.Context, planID exec.PlanID, streamID exec.VarID, signalID exec.VarID, signal exec.VarValue) (io.ReadCloser, error) { resp, err := c.cli.GetIOStream(ctx, &hubrpc.GetIOStream{ PlanID: planID, VarID: streamID, SignalID: signalID, Signal: signal, }) if err != nil { - return nil, err + return nil, err.ToError() } return io2.CounterCloser(resp.Stream, func(cnt int64, err error) { @@ -87,7 +87,7 @@ func (c *HubWorkerClient) GetVar(ctx context.Context, planID exec.PlanID, varID PlanID: planID, VarID: varID, SignalID: signalID, Signal: signal, }) if err != nil { - return nil, err + return nil, err.ToError() } return resp.Value, nil } diff --git a/common/pkgs/ioswitchlrc/hub_worker.go b/common/pkgs/ioswitchlrc/hub_worker.go index 28ecddd..63a7c98 100644 --- a/common/pkgs/ioswitchlrc/hub_worker.go +++ b/common/pkgs/ioswitchlrc/hub_worker.go @@ -43,7 +43,7 @@ type HubWorkerClient struct { func (c *HubWorkerClient) ExecutePlan(ctx context.Context, plan exec.Plan) error { _, err := c.cli.ExecuteIOPlan(ctx, &hubrpc.ExecuteIOPlan{Plan: plan}) - return err + return err.ToError() } func (c *HubWorkerClient) SendStream(ctx context.Context, planID exec.PlanID, id exec.VarID, stream io.ReadCloser) error { _, err := c.cli.SendIOStream(ctx, &hubrpc.SendIOStream{ @@ -51,20 +51,20 @@ func (c *HubWorkerClient) SendStream(ctx context.Context, planID exec.PlanID, id VarID: id, Stream: stream, }) - return err + return err.ToError() } func (c *HubWorkerClient) SendVar(ctx context.Context, planID exec.PlanID, id exec.VarID, value exec.VarValue) error { _, err := c.cli.SendIOVar(ctx, &hubrpc.SendIOVar{ PlanID: planID, VarID: id, Value: value, }) - return err + return err.ToError() } func (c *HubWorkerClient) GetStream(ctx context.Context, planID exec.PlanID, streamID exec.VarID, signalID exec.VarID, signal exec.VarValue) (io.ReadCloser, error) { resp, err := c.cli.GetIOStream(ctx, &hubrpc.GetIOStream{ PlanID: planID, VarID: streamID, SignalID: signalID, Signal: signal, }) if err != nil { - return nil, err + return nil, err.ToError() } return resp.Stream, nil @@ -74,7 +74,7 @@ func (c *HubWorkerClient) GetVar(ctx context.Context, planID exec.PlanID, varID PlanID: planID, VarID: varID, SignalID: signalID, Signal: signal, }) if err != nil { - return nil, err + return nil, err.ToError() } return resp.Value, nil } diff --git a/common/pkgs/rpc/hub/pool.go b/common/pkgs/rpc/hub/pool.go index d6c1c9b..71875f1 100644 --- a/common/pkgs/rpc/hub/pool.go +++ b/common/pkgs/rpc/hub/pool.go @@ -66,6 +66,7 @@ func (p *Pool) Get(ip string, port int) *Client { return &Client{ addr: ga, con: con.grpcCon, + cli: NewHubClient(con.grpcCon), pool: p, } } diff --git a/common/pkgs/rpc/hub/server.go b/common/pkgs/rpc/hub/server.go index d7f5c25..549e692 100644 --- a/common/pkgs/rpc/hub/server.go +++ b/common/pkgs/rpc/hub/server.go @@ -18,9 +18,11 @@ type Server struct { } func NewServer(cfg rpc.Config, impl HubAPI) *Server { - return &Server{ - ServerBase: rpc.NewServerBase(cfg, impl, &Hub_ServiceDesc), + svr := &Server{ + svrImpl: impl, } + svr.ServerBase = rpc.NewServerBase(cfg, svr, &Hub_ServiceDesc) + return svr } var _ HubServer = (*Server)(nil) diff --git a/common/pkgs/rpc/utils.go b/common/pkgs/rpc/utils.go index 562b1bd..98a0f1b 100644 --- a/common/pkgs/rpc/utils.go +++ b/common/pkgs/rpc/utils.go @@ -245,8 +245,20 @@ func Failed(errCode string, format string, args ...any) *CodeError { } } -func (c *CodeError) Error() string { - return fmt.Sprintf("code: %s, message: %s", c.Code, c.Message) +// 定义一个额外的结构体,防止陷入 (*CodeError)(nil) != nil 的陷阱 +type ErrorCodeError struct { + CE *CodeError +} + +func (c *ErrorCodeError) Error() string { + return fmt.Sprintf("code: %s, message: %s", c.CE.Code, c.CE.Message) +} + +func (c *CodeError) ToError() error { + if c == nil { + return nil + } + return &ErrorCodeError{CE: c} } func getCodeError(err error) *CodeError { diff --git a/coordinator/internal/cmd/serve.go b/coordinator/internal/cmd/serve.go index 0a20081..e03aa22 100644 --- a/coordinator/internal/cmd/serve.go +++ b/coordinator/internal/cmd/serve.go @@ -9,6 +9,7 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/mq" stgglb "gitlink.org.cn/cloudream/jcs-pub/common/globals" coormq "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/mq/coordinator" + hubrpc "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc/hub" "gitlink.org.cn/cloudream/jcs-pub/coordinator/internal/config" "gitlink.org.cn/cloudream/jcs-pub/coordinator/internal/db" mymq "gitlink.org.cn/cloudream/jcs-pub/coordinator/internal/mq" @@ -44,6 +45,7 @@ func serve(configPath string) { } stgglb.InitMQPool(config.Cfg().RabbitMQ) + stgglb.InitHubRPCPool(hubrpc.PoolConfig{}) db2, err := db.NewDB(&config.Cfg().DB) if err != nil {