From 24c549f8c95220819a8460f6ad111ea6032ca3f1 Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Fri, 21 Apr 2023 15:47:20 +0800 Subject: [PATCH] =?UTF-8?q?Read=E5=91=BD=E4=BB=A4=E4=BD=BF=E7=94=A8BucketI?= =?UTF-8?q?D=E5=92=8CObjectID=E8=BF=9B=E8=A1=8C=E6=93=8D=E4=BD=9C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- command_service.go | 174 ++++++++++++++++++++++----------------------- 1 file changed, 87 insertions(+), 87 deletions(-) diff --git a/command_service.go b/command_service.go index 80e3caf..1c92ecd 100644 --- a/command_service.go +++ b/command_service.go @@ -20,6 +20,93 @@ func NewCommandService(db *mydb.DB) *CommandService { } } +func (service *CommandService) Read(msg *ramsg.ReadCommand) ramsg.ReadResp { + var hashes []string + blockIDs := []int{0} + + // 查询文件对象 + object, err := service.db.QueryObjectByID(msg.BucketID, msg.ObjectID) + if err != nil { + log.WithField("BucketID", msg.BucketID). + WithField("ObjectID", msg.ObjectID). + Warnf("query Object failed, err: %s", err.Error()) + return ramsg.NewCoorReadRespFailed(errorcode.OPERATION_FAILED, "query Object failed") + } + + var nodeIPs []string + //-若redundancy是rep,查询对象副本表, 获得repHash + if object.Redundancy == consts.REDUNDANCY_REP { + objectRep, err := service.db.QueryObjectRep(object.ObjectID) + if err != nil { + log.WithField("ObjectID", object.ObjectID). + Warnf("query ObjectRep failed, err: %s", err.Error()) + return ramsg.NewCoorReadRespFailed(errorcode.OPERATION_FAILED, "query ObjectRep failed") + } + + hashes = append(hashes, objectRep.RepHash) + + nodes, err := service.db.QueryCacheNodeByBlockHash(objectRep.RepHash) + if err != nil { + log.WithField("RepHash", objectRep.RepHash). + Warnf("query Cache failed, err: %s", err.Error()) + return ramsg.NewCoorReadRespFailed(errorcode.OPERATION_FAILED, "query Cache failed") + } + + for _, node := range nodes { + nodeIPs = append(nodeIPs, node.IP) + } + + } else { + blocks, err := service.db.QueryObjectBlock(object.ObjectID) + if err != nil { + log.WithField("ObjectID", object.ObjectID). + Warnf("query Object Block failed, err: %s", err.Error()) + return ramsg.NewCoorReadRespFailed(errorcode.OPERATION_FAILED, "query Object Block failed") + } + + ecPolicies := *utils.GetEcPolicy() + ecPolicy := ecPolicies[*object.ECName] + ecN := ecPolicy.GetN() + ecK := ecPolicy.GetK() + nodeIPs = make([]string, ecN) + hashes = make([]string, ecN) + + for _, tt := range blocks { + id := tt.InnerID + hash := tt.BlockHash + hashes[id] = hash //这里有问题,采取的其实是直接顺序读的方式,等待加入自适应读模块 + + nodes, err := service.db.QueryCacheNodeByBlockHash(hash) + if err != nil { + log.WithField("BlockHash", hash). + Warnf("query Cache failed, err: %s", err.Error()) + return ramsg.NewCoorReadRespFailed(errorcode.OPERATION_FAILED, "query Cache failed") + } + + if len(nodes) == 0 { + log.WithField("BlockHash", hash). + Warnf("No node cache the block data for the BlockHash") + return ramsg.NewCoorReadRespFailed(errorcode.OPERATION_FAILED, "No node cache the block data for the BlockHash") + } + + nodeIPs[id] = nodes[0].IP + } + //这里也有和上面一样的问题 + for i := 1; i < ecK; i++ { + blockIDs = append(blockIDs, i) + } + } + + return ramsg.NewCoorReadRespOK( + object.Redundancy, + nodeIPs, + hashes, + blockIDs, + object.ECName, + object.FileSizeInBytes, + ) +} + func (service *CommandService) Move(msg *ramsg.MoveCommand) ramsg.MoveResp { //查询数据库,获取冗余类型,冗余参数 //jh:使用command中的bucketname和objectname查询对象表,获得redundancy,EcName,fileSizeInBytes @@ -154,93 +241,6 @@ func (service *CommandService) WriteRepHash(msg *ramsg.WriteRepHashCommand) rams return ramsg.NewCoorWriteHashRespOK() } -func (service *CommandService) Read(msg *ramsg.ReadCommand) ramsg.ReadResp { - var hashes []string - blockIDs := []int{0} - - // 查询文件对象 - object, err := service.db.QueryObjectByFullName(msg.BucketName, msg.ObjectName) - if err != nil { - log.WithField("BucketName", msg.BucketName). - WithField("ObjectName", msg.ObjectName). - Warnf("query Object failed, err: %s", err.Error()) - return ramsg.NewCoorReadRespFailed(errorcode.OPERATION_FAILED, "query Object failed") - } - - var nodeIPs []string - //-若redundancy是rep,查询对象副本表, 获得repHash - if object.Redundancy == consts.REDUNDANCY_REP { - objectRep, err := service.db.QueryObjectRep(object.ObjectID) - if err != nil { - log.WithField("ObjectID", object.ObjectID). - Warnf("query ObjectRep failed, err: %s", err.Error()) - return ramsg.NewCoorReadRespFailed(errorcode.OPERATION_FAILED, "query ObjectRep failed") - } - - hashes = append(hashes, objectRep.RepHash) - - nodes, err := service.db.QueryCacheNodeByBlockHash(objectRep.RepHash) - if err != nil { - log.WithField("RepHash", objectRep.RepHash). - Warnf("query Cache failed, err: %s", err.Error()) - return ramsg.NewCoorReadRespFailed(errorcode.OPERATION_FAILED, "query Cache failed") - } - - for _, node := range nodes { - nodeIPs = append(nodeIPs, node.IP) - } - - } else { - blocks, err := service.db.QueryObjectBlock(object.ObjectID) - if err != nil { - log.WithField("ObjectID", object.ObjectID). - Warnf("query Object Block failed, err: %s", err.Error()) - return ramsg.NewCoorReadRespFailed(errorcode.OPERATION_FAILED, "query Object Block failed") - } - - ecPolicies := *utils.GetEcPolicy() - ecPolicy := ecPolicies[*object.ECName] - ecN := ecPolicy.GetN() - ecK := ecPolicy.GetK() - nodeIPs = make([]string, ecN) - hashes = make([]string, ecN) - - for _, tt := range blocks { - id := tt.InnerID - hash := tt.BlockHash - hashes[id] = hash //这里有问题,采取的其实是直接顺序读的方式,等待加入自适应读模块 - - nodes, err := service.db.QueryCacheNodeByBlockHash(hash) - if err != nil { - log.WithField("BlockHash", hash). - Warnf("query Cache failed, err: %s", err.Error()) - return ramsg.NewCoorReadRespFailed(errorcode.OPERATION_FAILED, "query Cache failed") - } - - if len(nodes) == 0 { - log.WithField("BlockHash", hash). - Warnf("No node cache the block data for the BlockHash") - return ramsg.NewCoorReadRespFailed(errorcode.OPERATION_FAILED, "No node cache the block data for the BlockHash") - } - - nodeIPs[id] = nodes[0].IP - } - //这里也有和上面一样的问题 - for i := 1; i < ecK; i++ { - blockIDs = append(blockIDs, i) - } - } - - return ramsg.NewCoorReadRespOK( - object.Redundancy, - nodeIPs, - hashes, - blockIDs, - object.ECName, - object.FileSizeInBytes, - ) -} - func (service *CommandService) TempCacheReport(msg *ramsg.TempCacheReport) { service.db.BatchInsertOrUpdateCache(msg.Hashes, msg.NodeID) }