Browse Source

解决调试问题

feature_gxh
Sydonian 10 months ago
parent
commit
3c2007dc48
8 changed files with 37 additions and 20 deletions
  1. +3
    -3
      client/internal/ticktock/shardstore_gc.go
  2. +3
    -3
      client/internal/uploader/user_space_upload.go
  3. +5
    -5
      common/pkgs/ioswitch2/hub_worker.go
  4. +5
    -5
      common/pkgs/ioswitchlrc/hub_worker.go
  5. +1
    -0
      common/pkgs/rpc/hub/pool.go
  6. +4
    -2
      common/pkgs/rpc/hub/server.go
  7. +14
    -2
      common/pkgs/rpc/utils.go
  8. +2
    -0
      coordinator/internal/cmd/serve.go

+ 3
- 3
client/internal/ticktock/shardstore_gc.go View File

@@ -98,12 +98,12 @@ func (j *ShardStoreGC) gcOne(t *TickTock, space *types.UserSpaceDetail) error {
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Minute))
defer cancel()

_, err = agtCli.CacheGC(ctx, &hubrpc.CacheGC{
_, cerr := agtCli.CacheGC(ctx, &hubrpc.CacheGC{
UserSpace: *space,
Availables: allFileHashes,
})
if err != nil {
return fmt.Errorf("request to cache gc: %w", err)
if cerr != nil {
return fmt.Errorf("request to cache gc: %w", cerr)
}
return nil
}

+ 3
- 3
client/internal/uploader/user_space_upload.go View File

@@ -109,13 +109,13 @@ func (u *Uploader) UserSpaceUpload(userSpaceID clitypes.UserSpaceID, rootPath st
srcHubCli := stgglb.HubRPCPool.Get(stgglb.SelectGRPCAddress(*srcSpace.MasterHub, *addr))
defer srcHubCli.Release()

listAllResp, err := srcHubCli.PublicStoreListAll(context.Background(), &hubrpc.PublicStoreListAll{
listAllResp, cerr := srcHubCli.PublicStoreListAll(context.Background(), &hubrpc.PublicStoreListAll{
UserSpace: *srcSpace,
Path: rootPath,
})
if err != nil {
if cerr != nil {
delPkg()
return nil, fmt.Errorf("listing public store: %w", err)
return nil, fmt.Errorf("listing public store: %w", cerr)
}

adds, err := u.uploadFromPublicStore(srcSpace, &targetSapce.Space, listAllResp.Entries, rootPath)


+ 5
- 5
common/pkgs/ioswitch2/hub_worker.go View File

@@ -48,7 +48,7 @@ type HubWorkerClient struct {

func (c *HubWorkerClient) ExecutePlan(ctx context.Context, plan exec.Plan) error {
_, err := c.cli.ExecuteIOPlan(ctx, &hubrpc.ExecuteIOPlan{Plan: plan})
return err
return err.ToError()
}
func (c *HubWorkerClient) SendStream(ctx context.Context, planID exec.PlanID, id exec.VarID, stream io.ReadCloser) error {
_, err := c.cli.SendIOStream(ctx, &hubrpc.SendIOStream{
@@ -60,20 +60,20 @@ func (c *HubWorkerClient) SendStream(ctx context.Context, planID exec.PlanID, id
}
}),
})
return err
return err.ToError()
}
func (c *HubWorkerClient) SendVar(ctx context.Context, planID exec.PlanID, id exec.VarID, value exec.VarValue) error {
_, err := c.cli.SendIOVar(ctx, &hubrpc.SendIOVar{
PlanID: planID, VarID: id, Value: value,
})
return err
return err.ToError()
}
func (c *HubWorkerClient) GetStream(ctx context.Context, planID exec.PlanID, streamID exec.VarID, signalID exec.VarID, signal exec.VarValue) (io.ReadCloser, error) {
resp, err := c.cli.GetIOStream(ctx, &hubrpc.GetIOStream{
PlanID: planID, VarID: streamID, SignalID: signalID, Signal: signal,
})
if err != nil {
return nil, err
return nil, err.ToError()
}

return io2.CounterCloser(resp.Stream, func(cnt int64, err error) {
@@ -87,7 +87,7 @@ func (c *HubWorkerClient) GetVar(ctx context.Context, planID exec.PlanID, varID
PlanID: planID, VarID: varID, SignalID: signalID, Signal: signal,
})
if err != nil {
return nil, err
return nil, err.ToError()
}
return resp.Value, nil
}


+ 5
- 5
common/pkgs/ioswitchlrc/hub_worker.go View File

@@ -43,7 +43,7 @@ type HubWorkerClient struct {

func (c *HubWorkerClient) ExecutePlan(ctx context.Context, plan exec.Plan) error {
_, err := c.cli.ExecuteIOPlan(ctx, &hubrpc.ExecuteIOPlan{Plan: plan})
return err
return err.ToError()
}
func (c *HubWorkerClient) SendStream(ctx context.Context, planID exec.PlanID, id exec.VarID, stream io.ReadCloser) error {
_, err := c.cli.SendIOStream(ctx, &hubrpc.SendIOStream{
@@ -51,20 +51,20 @@ func (c *HubWorkerClient) SendStream(ctx context.Context, planID exec.PlanID, id
VarID: id,
Stream: stream,
})
return err
return err.ToError()
}
func (c *HubWorkerClient) SendVar(ctx context.Context, planID exec.PlanID, id exec.VarID, value exec.VarValue) error {
_, err := c.cli.SendIOVar(ctx, &hubrpc.SendIOVar{
PlanID: planID, VarID: id, Value: value,
})
return err
return err.ToError()
}
func (c *HubWorkerClient) GetStream(ctx context.Context, planID exec.PlanID, streamID exec.VarID, signalID exec.VarID, signal exec.VarValue) (io.ReadCloser, error) {
resp, err := c.cli.GetIOStream(ctx, &hubrpc.GetIOStream{
PlanID: planID, VarID: streamID, SignalID: signalID, Signal: signal,
})
if err != nil {
return nil, err
return nil, err.ToError()
}

return resp.Stream, nil
@@ -74,7 +74,7 @@ func (c *HubWorkerClient) GetVar(ctx context.Context, planID exec.PlanID, varID
PlanID: planID, VarID: varID, SignalID: signalID, Signal: signal,
})
if err != nil {
return nil, err
return nil, err.ToError()
}
return resp.Value, nil
}


+ 1
- 0
common/pkgs/rpc/hub/pool.go View File

@@ -66,6 +66,7 @@ func (p *Pool) Get(ip string, port int) *Client {
return &Client{
addr: ga,
con: con.grpcCon,
cli: NewHubClient(con.grpcCon),
pool: p,
}
}


+ 4
- 2
common/pkgs/rpc/hub/server.go View File

@@ -18,9 +18,11 @@ type Server struct {
}

func NewServer(cfg rpc.Config, impl HubAPI) *Server {
return &Server{
ServerBase: rpc.NewServerBase(cfg, impl, &Hub_ServiceDesc),
svr := &Server{
svrImpl: impl,
}
svr.ServerBase = rpc.NewServerBase(cfg, svr, &Hub_ServiceDesc)
return svr
}

var _ HubServer = (*Server)(nil)

+ 14
- 2
common/pkgs/rpc/utils.go View File

@@ -245,8 +245,20 @@ func Failed(errCode string, format string, args ...any) *CodeError {
}
}

func (c *CodeError) Error() string {
return fmt.Sprintf("code: %s, message: %s", c.Code, c.Message)
// 定义一个额外的结构体,防止陷入 (*CodeError)(nil) != nil 的陷阱
type ErrorCodeError struct {
CE *CodeError
}

func (c *ErrorCodeError) Error() string {
return fmt.Sprintf("code: %s, message: %s", c.CE.Code, c.CE.Message)
}

func (c *CodeError) ToError() error {
if c == nil {
return nil
}
return &ErrorCodeError{CE: c}
}

func getCodeError(err error) *CodeError {


+ 2
- 0
coordinator/internal/cmd/serve.go View File

@@ -9,6 +9,7 @@ import (
"gitlink.org.cn/cloudream/common/pkgs/mq"
stgglb "gitlink.org.cn/cloudream/jcs-pub/common/globals"
coormq "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/mq/coordinator"
hubrpc "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc/hub"
"gitlink.org.cn/cloudream/jcs-pub/coordinator/internal/config"
"gitlink.org.cn/cloudream/jcs-pub/coordinator/internal/db"
mymq "gitlink.org.cn/cloudream/jcs-pub/coordinator/internal/mq"
@@ -44,6 +45,7 @@ func serve(configPath string) {
}

stgglb.InitMQPool(config.Cfg().RabbitMQ)
stgglb.InitHubRPCPool(hubrpc.PoolConfig{})

db2, err := db.NewDB(&config.Cfg().DB)
if err != nil {


Loading…
Cancel
Save