diff --git a/agent/main.go b/agent/main.go index f6d4ad5..29ba784 100644 --- a/agent/main.go +++ b/agent/main.go @@ -47,9 +47,7 @@ func main() { stgglb.InitLocal(&config.Cfg().Local) stgglb.InitMQPool(&config.Cfg().RabbitMQ) - stgglb.InitAgentRPCPool(&agtrpc.PoolConfig{ - Port: config.Cfg().GRPC.Port, - }) + stgglb.InitAgentRPCPool(&agtrpc.PoolConfig{}) stgglb.InitIPFSPool(&config.Cfg().IPFS) distlock, err := distlock.NewService(&config.Cfg().DistLock) diff --git a/common/assets/scripts/create_database.sql b/common/assets/scripts/create_database.sql index 0a1c9d4..ee77537 100644 --- a/common/assets/scripts/create_database.sql +++ b/common/assets/scripts/create_database.sql @@ -9,6 +9,8 @@ create table Node ( Name varchar(128) not null comment '节点名称', LocalIP varchar(128) not null comment '节点的内网IP', ExternalIP varchar(128) not null comment '节点的外网IP', + LocalGRPCPort int not null comment '节点的内网GRCP端口', + ExternalGRPCPort int not null comment '节点的外网GRCP端口', LocationID int not null comment '节点的地域', State varchar(128) comment '节点的状态', LastReportTime timestamp comment '节点上次上报时间' diff --git a/common/pkgs/cmd/create_rep_package.go b/common/pkgs/cmd/create_rep_package.go index f6024e2..c75a79f 100644 --- a/common/pkgs/cmd/create_rep_package.go +++ b/common/pkgs/cmd/create_rep_package.go @@ -201,13 +201,15 @@ func uploadFile(file io.Reader, uploadNode UploadNodeInfo) (string, error) { // 否则发送到agent上传 // 如果客户端与节点在同一个地域,则使用内网地址连接节点 nodeIP := uploadNode.Node.ExternalIP + grpcPort := uploadNode.Node.ExternalGRPCPort if uploadNode.IsSameLocation { nodeIP = uploadNode.Node.LocalIP + grpcPort = uploadNode.Node.LocalGRPCPort logger.Infof("client and node %d are at the same location, use local ip", uploadNode.Node.NodeID) } - fileHash, err := uploadToNode(file, nodeIP) + fileHash, err := uploadToNode(file, nodeIP, grpcPort) if err != nil { return "", fmt.Errorf("upload to node %s failed, err: %w", nodeIP, err) } @@ -235,8 +237,8 @@ func (t *CreateRepPackage) chooseUploadNode(nodes []UploadNodeInfo, nodeAffinity return nodes[rand.Intn(len(nodes))] } -func uploadToNode(file io.Reader, nodeIP string) (string, error) { - rpcCli, err := stgglb.AgentRPCPool.Acquire(nodeIP) +func uploadToNode(file io.Reader, nodeIP string, grpcPort int) (string, error) { + rpcCli, err := stgglb.AgentRPCPool.Acquire(nodeIP, grpcPort) if err != nil { return "", fmt.Errorf("new agent rpc client: %w", err) } diff --git a/common/pkgs/db/location.go b/common/pkgs/db/location.go index 5a652aa..d28534e 100644 --- a/common/pkgs/db/location.go +++ b/common/pkgs/db/location.go @@ -25,8 +25,13 @@ func (db *LocationDB) FindLocationByExternalIP(ctx SQLContext, ip string) (model var locID int64 err := sqlx.Get(ctx, &locID, "select LocationID from Node where ExternalIP = ?", ip) if err != nil { - return model.Location{}, fmt.Errorf("find node by external ip: %w", err) + return model.Location{}, fmt.Errorf("finding node by external ip: %w", err) } - return db.GetByID(ctx, locID) + loc, err := db.GetByID(ctx, locID) + if err != nil { + return model.Location{}, fmt.Errorf("getting location by id: %w", err) + } + + return loc, nil } diff --git a/common/pkgs/db/model/model.go b/common/pkgs/db/model/model.go index c11af65..c984e09 100644 --- a/common/pkgs/db/model/model.go +++ b/common/pkgs/db/model/model.go @@ -9,13 +9,15 @@ import ( // TODO 可以考虑逐步迁移到stgsdk中。迁移思路:数据对象应该包含的字段都迁移到stgsdk中,内部使用的一些特殊字段则留在这里 type Node struct { - NodeID int64 `db:"NodeID" json:"nodeID"` - Name string `db:"Name" json:"name"` - LocalIP string `db:"LocalIP" json:"localIP"` - ExternalIP string `db:"ExternalIP" json:"externalIP"` - LocationID int64 `db:"LocationID" json:"locationID"` - State string `db:"State" json:"state"` - LastReportTime *time.Time `db:"LastReportTime" json:"lastReportTime"` + NodeID int64 `db:"NodeID" json:"nodeID"` + Name string `db:"Name" json:"name"` + LocalIP string `db:"LocalIP" json:"localIP"` + ExternalIP string `db:"ExternalIP" json:"externalIP"` + LocalGRPCPort int `db:"LocalGRPCPort" json:"localGRPCPort"` + ExternalGRPCPort int `db:"ExternalGRPCPort" json:"externalGRPCPort"` + LocationID int64 `db:"LocationID" json:"locationID"` + State string `db:"State" json:"state"` + LastReportTime *time.Time `db:"LastReportTime" json:"lastReportTime"` } type Storage struct { diff --git a/common/pkgs/grpc/agent/pool.go b/common/pkgs/grpc/agent/pool.go index 9e6b2da..9d00619 100644 --- a/common/pkgs/grpc/agent/pool.go +++ b/common/pkgs/grpc/agent/pool.go @@ -5,7 +5,6 @@ import ( ) type PoolConfig struct { - Port int `json:"port"` } type PoolClient struct { @@ -26,8 +25,11 @@ func NewPool(grpcCfg *PoolConfig) *Pool { grpcCfg: grpcCfg, } } -func (p *Pool) Acquire(ip string) (*PoolClient, error) { - cli, err := NewClient(fmt.Sprintf("%s:%d", ip, p.grpcCfg.Port)) + +// 获取一个GRPC客户端。由于事先不能知道所有agent的GRPC配置信息,所以只能让调用者把建立连接所需的配置都传递进来, +// Pool来决定要不要新建客户端。 +func (p *Pool) Acquire(ip string, port int) (*PoolClient, error) { + cli, err := NewClient(fmt.Sprintf("%s:%d", ip, port)) if err != nil { return nil, err } diff --git a/common/pkgs/iterator/ec_object_iterator.go b/common/pkgs/iterator/ec_object_iterator.go index 9d5454a..771cea2 100644 --- a/common/pkgs/iterator/ec_object_iterator.go +++ b/common/pkgs/iterator/ec_object_iterator.go @@ -101,11 +101,14 @@ func (iter *ECObjectIterator) doMove(coorCli *coormq.Client) (*IterDownloadingOb //nodeIDs, nodeIPs直接按照第1~ecK个排列 nodeIDs := make([]int64, ecK) nodeIPs := make([]string, ecK) + grpcPorts := make([]int, ecK) for i := 0; i < ecK; i++ { nodeIDs[i] = nds[i].Node.NodeID nodeIPs[i] = nds[i].Node.ExternalIP + grpcPorts[i] = nds[i].Node.ExternalGRPCPort if nds[i].IsSameLocation { nodeIPs[i] = nds[i].Node.LocalIP + grpcPorts[i] = nds[i].Node.LocalGRPCPort logger.Infof("client and node %d are at the same location, use local ip", nds[i].Node.NodeID) } } @@ -115,7 +118,7 @@ func (iter *ECObjectIterator) doMove(coorCli *coormq.Client) (*IterDownloadingOb for i := 0; i < ecK; i++ { blockIDs[i] = i } - reader, err := iter.downloadEcObject(fileSize, ecK, ecN, blockIDs, nodeIDs, nodeIPs, hashs) + reader, err := iter.downloadEcObject(fileSize, ecK, ecN, blockIDs, nodeIDs, nodeIPs, grpcPorts, hashs) if err != nil { return nil, fmt.Errorf("ec read failed, err: %w", err) } @@ -143,7 +146,7 @@ func (i *ECObjectIterator) chooseDownloadNode(entries []DownloadNodeInfo) Downlo return entries[rand.Intn(len(entries))] } -func (iter *ECObjectIterator) downloadEcObject(fileSize int64, ecK int, ecN int, blockIDs []int, nodeIDs []int64, nodeIPs []string, hashs []string) (io.ReadCloser, error) { +func (iter *ECObjectIterator) downloadEcObject(fileSize int64, ecK int, ecN int, blockIDs []int, nodeIDs []int64, nodeIPs []string, grpcPorts []int, hashs []string) (io.ReadCloser, error) { // TODO zkx 先试用同步方式实现逻辑,做好错误处理。同时也方便下面直接使用uploadToNode和uploadToLocalIPFS来优化代码结构 //wg := sync.WaitGroup{} numPacket := (fileSize + int64(ecK)*iter.ecInfo.PacketSize - 1) / (int64(ecK) * iter.ecInfo.PacketSize) @@ -159,7 +162,7 @@ func (iter *ECObjectIterator) downloadEcObject(fileSize int64, ecK int, ecN int, i := idx go func() { // TODO 处理错误 - file, _ := downloadFile(iter.downloadCtx, nodeIDs[i], nodeIPs[i], hashs[i]) + file, _ := downloadFile(iter.downloadCtx, nodeIDs[i], nodeIPs[i], grpcPorts[i], hashs[i]) for p := int64(0); p < numPacket; p++ { buf := make([]byte, iter.ecInfo.PacketSize) diff --git a/common/pkgs/iterator/rep_object_iterator.go b/common/pkgs/iterator/rep_object_iterator.go index 8bff9b1..ef111bd 100644 --- a/common/pkgs/iterator/rep_object_iterator.go +++ b/common/pkgs/iterator/rep_object_iterator.go @@ -104,13 +104,15 @@ func (i *RepObjectIterator) doMove(coorCli *coormq.Client) (*IterDownloadingObje // 如果客户端与节点在同一个地域,则使用内网地址连接节点 nodeIP := downloadNode.Node.ExternalIP + grpcPort := downloadNode.Node.ExternalGRPCPort if downloadNode.IsSameLocation { nodeIP = downloadNode.Node.LocalIP + grpcPort = downloadNode.Node.LocalGRPCPort logger.Infof("client and node %d are at the same location, use local ip", downloadNode.Node.NodeID) } - reader, err := downloadFile(i.downloadCtx, downloadNode.Node.NodeID, nodeIP, repData.FileHash) + reader, err := downloadFile(i.downloadCtx, downloadNode.Node.NodeID, nodeIP, grpcPort, repData.FileHash) if err != nil { return nil, fmt.Errorf("rep read failed, err: %w", err) } @@ -138,7 +140,7 @@ func (i *RepObjectIterator) chooseDownloadNode(entries []DownloadNodeInfo) Downl return entries[rand.Intn(len(entries))] } -func downloadFile(ctx *DownloadContext, nodeID int64, nodeIP string, fileHash string) (io.ReadCloser, error) { +func downloadFile(ctx *DownloadContext, nodeID int64, nodeIP string, grpcPort int, fileHash string) (io.ReadCloser, error) { if stgglb.IPFSPool != nil { logger.Infof("try to use local IPFS to download file") @@ -150,10 +152,10 @@ func downloadFile(ctx *DownloadContext, nodeID int64, nodeIP string, fileHash st logger.Warnf("download from local IPFS failed, so try to download from node %s, err: %s", nodeIP, err.Error()) } - return downloadFromNode(ctx, nodeID, nodeIP, fileHash) + return downloadFromNode(ctx, nodeID, nodeIP, grpcPort, fileHash) } -func downloadFromNode(ctx *DownloadContext, nodeID int64, nodeIP string, fileHash string) (io.ReadCloser, error) { +func downloadFromNode(ctx *DownloadContext, nodeID int64, nodeIP string, grpcPort int, fileHash string) (io.ReadCloser, error) { // 二次获取锁 mutex, err := reqbuilder.NewBuilder(). // 用于从IPFS下载文件 @@ -164,7 +166,7 @@ func downloadFromNode(ctx *DownloadContext, nodeID int64, nodeIP string, fileHas } // 连接grpc - agtCli, err := stgglb.AgentRPCPool.Acquire(nodeIP) + agtCli, err := stgglb.AgentRPCPool.Acquire(nodeIP, grpcPort) if err != nil { return nil, fmt.Errorf("new agent grpc client: %w", err) } diff --git a/coordinator/internal/services/conmmon.go b/coordinator/internal/services/conmmon.go index 8c07e5d..063f6da 100644 --- a/coordinator/internal/services/conmmon.go +++ b/coordinator/internal/services/conmmon.go @@ -11,7 +11,7 @@ func (svc *Service) FindClientLocation(msg *coormq.FindClientLocation) (*coormq. location, err := svc.db.Location().FindLocationByExternalIP(svc.db.SQLCtx(), msg.IP) if err != nil { logger.WithField("IP", msg.IP). - Warnf("query client location failed, err: %s", err.Error()) + Warnf("finding location by external ip: %s", err.Error()) return nil, mq.Failed(errorcode.OperationFailed, "query client location failed") }