diff --git a/command_service.go b/command_service.go index 3286665..788019d 100644 --- a/command_service.go +++ b/command_service.go @@ -133,8 +133,8 @@ func (service *CommandService) RepWrite(msg *ramsg.RepWriteCommand) ramsg.WriteR start := utils.GetRandInt(len(nodes)) for i := 0; i < numRep; i++ { index := (start + i) % len(nodes) - ips[i] = nodes[index].IP ids[i] = nodes[index].NodeID + ips[i] = nodes[index].IP } return ramsg.NewCoorWriteRespOK(ids, ips) @@ -157,20 +157,34 @@ func (service *CommandService) ECWrite(msg *ramsg.ECWriteCommand) ramsg.WriteRes //kx:根据command中的ecName,得到ecN,然后从jh查到的NodeIp中选择ecN个,赋值给Ips //jh:完成对象表、对象编码块表的插入(对象编码块表的Hash字段先不插入) //返回消息 - nodeip := Query_UserNode(msg.UserID) //nodeip格式:[]string + //查询用户可用的节点IP + nodes, err := service.db.QueryUserNodes(msg.UserID) + if err != nil { + log.Warn("query user nodes failed, err: %s", err.Error()) + return ramsg.NewCoorWriteRespFailed(errorcode.OPERATION_FAILED, "query user nodes failed") + } + ecid := msg.ECName ecPolicies := *utils.GetEcPolicy() ecPolicy := ecPolicies[ecid] ecN := ecPolicy.GetN() + if len(nodes) < ecN { + log.Warn("user nodes are not enough, err: %s", err.Error()) + return ramsg.NewCoorWriteRespFailed(errorcode.OPERATION_FAILED, "user nodes are not enough") + } + + ids := make([]int, ecN) ips := make([]string, ecN) - //kx:从jh查到的NodeIp中选择ecN个,赋值给Ips - //根据BucketName查询BucketID - start := utils.GetRandInt(len(nodeip)) + //随机选取numRep个nodeIp + start := utils.GetRandInt(len(nodes)) for i := 0; i < ecN; i++ { - ips[i] = nodeip[(start+i)%len(nodeip)] + index := (start + i) % len(nodes) + ids[i] = nodes[index].NodeID + ips[i] = nodes[index].IP } + // TODO 参考RepWrite,将创建EC对象的逻辑移动到WriteECHash中,并合成成一个事务 //根据BucketName查询BucketID BucketID := Query_BucketID(msg.BucketName) if BucketID == -1 { @@ -183,13 +197,14 @@ func (service *CommandService) ECWrite(msg *ramsg.ECWriteCommand) ramsg.WriteRes for i := 0; i < ecN; i++ { Insert_EcObjectBlock(ObjectID, i) } - return ramsg.NewCoorWriteRespOK(ips) + return ramsg.NewCoorWriteRespOK(ids, ips) } func (service *CommandService) WriteECHash(msg *ramsg.WriteECHashCommand) ramsg.WriteHashResp { //jh:根据command中的信息,插入对象编码块表中的Hash字段,并完成缓存表的插入 //返回消息 //插入对象编码块表中的Hash字段 + // TODO 参考WriteRepHash的逻辑 ObjectId := Query_ObjectID(msg.ObjectName) Insert_EcHash(ObjectId, msg.Hashes) //缓存表的插入 @@ -292,10 +307,14 @@ func (service *CommandService) TempCacheReport(msg *ramsg.TempCacheReport) { func (service *CommandService) AgentStatusReport(msg *ramsg.AgentStatusReport) { //jh:根据command中的Ip,插入节点延迟表,和节点表的NodeStatus //根据command中的Ip,插入节点延迟表 - ips := utils.GetAgentIps() - Insert_NodeDelay(msg.IP, ips, msg.AgentDelay) - //从配置表里读取节点地域NodeLocation - //插入节点表的NodeStatus - Insert_Node(msg.IP, msg.IP, msg.IPFSStatus, msg.LocalDirStatus) + // TODO + /* + ips := utils.GetAgentIps() + Insert_NodeDelay(msg.IP, ips, msg.AgentDelay) + + //从配置表里读取节点地域NodeLocation + //插入节点表的NodeStatus + Insert_Node(msg.IP, msg.IP, msg.IPFSStatus, msg.LocalDirStatus) + */ }