diff --git a/internal/services/agent.go b/internal/services/agent.go index 298ebd2..ade07e1 100644 --- a/internal/services/agent.go +++ b/internal/services/agent.go @@ -5,7 +5,7 @@ import ( ) func (service *Service) TempCacheReport(msg *coormsg.TempCacheReport) { - service.db.BatchInsertOrUpdateCache(msg.Hashes, msg.NodeID) + service.db.BatchInsertOrUpdateCache(msg.Body.Hashes, msg.Body.NodeID) } func (service *Service) AgentStatusReport(msg *coormsg.AgentStatusReport) { @@ -15,10 +15,10 @@ func (service *Service) AgentStatusReport(msg *coormsg.AgentStatusReport) { // TODO /* ips := utils.GetAgentIps() - Insert_NodeDelay(msg.IP, ips, msg.AgentDelay) + Insert_NodeDelay(msg.Body.IP, ips, msg.Body.AgentDelay) //从配置表里读取节点地域NodeLocation //插入节点表的NodeStatus - Insert_Node(msg.IP, msg.IP, msg.IPFSStatus, msg.LocalDirStatus) + Insert_Node(msg.Body.IP, msg.Body.IP, msg.Body.IPFSStatus, msg.Body.LocalDirStatus) */ } diff --git a/internal/services/bucket.go b/internal/services/bucket.go index 305cf12..95ae260 100644 --- a/internal/services/bucket.go +++ b/internal/services/bucket.go @@ -3,6 +3,7 @@ package services import ( log "github.com/sirupsen/logrus" "gitlink.org.cn/cloudream/db/model" + ramsg "gitlink.org.cn/cloudream/rabbitmq/message" coormsg "gitlink.org.cn/cloudream/rabbitmq/message/coordinator" "gitlink.org.cn/cloudream/utils/consts/errorcode" ) @@ -13,52 +14,52 @@ func (svc *Service) GetBucket(userID int, bucketID int) (model.Bucket, error) { } func (svc *Service) GetUserBuckets(msg *coormsg.GetUserBuckets) *coormsg.GetUserBucketsResp { - buckets, err := svc.db.GetUserBuckets(msg.UserID) + buckets, err := svc.db.GetUserBuckets(msg.Body.UserID) if err != nil { - log.WithField("UserID", msg.UserID). + log.WithField("UserID", msg.Body.UserID). Warnf("get user buckets failed, err: %s", err.Error()) - return coormsg.NewGetUserBucketsRespFailed(errorcode.OPERATION_FAILED, "get all buckets failed") + return ramsg.ReplyFailed[coormsg.GetUserBucketsResp](errorcode.OPERATION_FAILED, "get all buckets failed") } - return coormsg.NewGetUserBucketsRespOK(buckets) + return ramsg.ReplyOK(coormsg.NewGetUserBucketsRespBody(buckets)) } func (svc *Service) GetBucketObjects(msg *coormsg.GetBucketObjects) *coormsg.GetBucketObjectsResp { - objects, err := svc.db.GetBucketObjects(msg.UserID, msg.BucketID) + objects, err := svc.db.GetBucketObjects(msg.Body.UserID, msg.Body.BucketID) if err != nil { - log.WithField("UserID", msg.UserID). - WithField("BucketID", msg.BucketID). + log.WithField("UserID", msg.Body.UserID). + WithField("BucketID", msg.Body.BucketID). Warnf("get bucket objects failed, err: %s", err.Error()) - return coormsg.NewGetBucketObjectsRespFailed(errorcode.OPERATION_FAILED, "get all buckets failed") + return ramsg.ReplyFailed[coormsg.GetBucketObjectsResp](errorcode.OPERATION_FAILED, "get bucket objects failed") } - return coormsg.NewGetBucketObjectsRespOK(objects) + return ramsg.ReplyOK(coormsg.NewGetBucketObjectsRespBody(objects)) } func (svc *Service) CreateBucket(msg *coormsg.CreateBucket) *coormsg.CreateBucketResp { - bucketID, err := svc.db.CreateBucket(msg.UserID, msg.BucketName) + bucketID, err := svc.db.CreateBucket(msg.Body.UserID, msg.Body.BucketName) if err != nil { - log.WithField("UserID", msg.UserID). - WithField("BucketName", msg.BucketName). + log.WithField("UserID", msg.Body.UserID). + WithField("BucketName", msg.Body.BucketName). Warnf("create bucket failed, err: %s", err.Error()) - return coormsg.NewCreateBucketRespFailed(errorcode.OPERATION_FAILED, "create bucket failed") + return ramsg.ReplyFailed[coormsg.CreateBucketResp](errorcode.OPERATION_FAILED, "create bucket failed") } - return coormsg.NewCreateBucketRespOK(bucketID) + return ramsg.ReplyOK(coormsg.NewCreateBucketRespBody(bucketID)) } func (svc *Service) DeleteBucket(msg *coormsg.DeleteBucket) *coormsg.DeleteBucketResp { - err := svc.db.DeleteBucket(msg.UserID, msg.BucketID) + err := svc.db.DeleteBucket(msg.Body.UserID, msg.Body.BucketID) if err != nil { - log.WithField("UserID", msg.UserID). - WithField("BucketID", msg.BucketID). + log.WithField("UserID", msg.Body.UserID). + WithField("BucketID", msg.Body.BucketID). Warnf("delete bucket failed, err: %s", err.Error()) - return coormsg.NewDeleteBucketRespFailed(errorcode.OPERATION_FAILED, "delete bucket failed") + return ramsg.ReplyFailed[coormsg.DeleteBucketResp](errorcode.OPERATION_FAILED, "delete bucket failed") } - return coormsg.NewDeleteBucketRespOK() + return ramsg.ReplyOK(coormsg.NewDeleteBucketRespBody()) } diff --git a/internal/services/command_service_ec.go b/internal/services/command_service_ec.go index fe46b6a..617f978 100644 --- a/internal/services/command_service_ec.go +++ b/internal/services/command_service_ec.go @@ -13,13 +13,13 @@ func (service *Service) ECWrite(msg *coormsg.ECWriteCommand) *coormsg.WriteResp //jh:完成对象表、对象编码块表的插入(对象编码块表的Hash字段先不插入) //返回消息 //查询用户可用的节点IP - nodes, err := service.db.QueryUserNodes(msg.UserID) + nodes, err := service.db.QueryUserNodes(msg.Body.UserID) if err != nil { log.Warnf("query user nodes failed, err: %s", err.Error()) return ramsg.NewCoorWriteRespFailed(errorcode.OPERATION_FAILED, "query user nodes failed") } - ecid := msg.ECName + ecid := msg.Body.ECName ecPolicies := *utils.GetEcPolicy() ecPolicy := ecPolicies[ecid] ecN := ecPolicy.GetN() @@ -41,13 +41,13 @@ func (service *Service) ECWrite(msg *coormsg.ECWriteCommand) *coormsg.WriteResp // TODO 参考RepWrite,将创建EC对象的逻辑移动到WriteECHash中,并合成成一个事务 //根据BucketName查询BucketID - BucketID := Query_BucketID(msg.BucketName) + BucketID := Query_BucketID(msg.Body.BucketName) if BucketID == -1 { // TODO 日志 - return ramsg.NewCoorWriteRespFailed(errorcode.OPERATION_FAILED, fmt.Sprintf("bucket id not found for %s", msg.BucketName)) + return ramsg.NewCoorWriteRespFailed(errorcode.OPERATION_FAILED, fmt.Sprintf("bucket id not found for %s", msg.Body.BucketName)) } //对象表插入Insert_Cache - ObjectID := Insert_EcObject(msg.ObjectName, BucketID, msg.FileSizeInBytes, msg.ECName) + ObjectID := Insert_EcObject(msg.Body.ObjectName, BucketID, msg.Body.FileSizeInBytes, msg.Body.ECName) //对象编码块表插入,hash暂时为空 for i := 0; i < ecN; i++ { Insert_EcObjectBlock(ObjectID, i) @@ -64,10 +64,10 @@ func (service *Service) WriteECHash(msg *coormsg.WriteECHashCommand) *coormsg.Wr //返回消息 //插入对象编码块表中的Hash字段 // TODO 参考WriteRepHash的逻辑 - ObjectId := Query_ObjectID(msg.ObjectName) - Insert_EcHash(ObjectId, msg.Hashes) + ObjectId := Query_ObjectID(msg.Body.ObjectName) + Insert_EcHash(ObjectId, msg.Body.Hashes) //缓存表的插入 - Insert_Cache(msg.Hashes, msg.NodeIDs, false) + Insert_Cache(msg.Body.Hashes, msg.Body.NodeIDs, false) return ramsg.NewCoorWriteHashRespOK() */ diff --git a/internal/services/storage.go b/internal/services/storage.go index 09f2e71..ff698b0 100644 --- a/internal/services/storage.go +++ b/internal/services/storage.go @@ -2,6 +2,7 @@ package services import ( log "github.com/sirupsen/logrus" + ramsg "gitlink.org.cn/cloudream/rabbitmq/message" coormsg "gitlink.org.cn/cloudream/rabbitmq/message/coordinator" "gitlink.org.cn/cloudream/utils" "gitlink.org.cn/cloudream/utils/consts" @@ -22,20 +23,20 @@ func (service *Service) Move(msg *coormsg.MoveObjectToStorage) *coormsg.MoveObje // TODO 需要在StorageData中增加记录 // 查询用户关联的存储服务 - stg, err := service.db.QueryUserStorage(msg.UserID, msg.StorageID) + stg, err := service.db.QueryUserStorage(msg.Body.UserID, msg.Body.StorageID) if err != nil { - log.WithField("UserID", msg.UserID). - WithField("StorageID", msg.StorageID). + log.WithField("UserID", msg.Body.UserID). + WithField("StorageID", msg.Body.StorageID). Warnf("query storage directory failed, err: %s", err.Error()) - return coormsg.NewCoorMoveRespFailed(errorcode.OPERATION_FAILED, "query storage directory failed") + return ramsg.ReplyFailed[coormsg.MoveObjectToStorageResp](errorcode.OPERATION_FAILED, "query storage directory failed") } // 查询文件对象 - object, err := service.db.QueryObjectByID(msg.ObjectID) + object, err := service.db.QueryObjectByID(msg.Body.ObjectID) if err != nil { - log.WithField("ObjectID", msg.ObjectID). + log.WithField("ObjectID", msg.Body.ObjectID). Warnf("query Object failed, err: %s", err.Error()) - return coormsg.NewCoorMoveRespFailed(errorcode.OPERATION_FAILED, "query Object failed") + return ramsg.ReplyFailed[coormsg.MoveObjectToStorageResp](errorcode.OPERATION_FAILED, "query Object failed") } //-若redundancy是rep,查询对象副本表, 获得repHash @@ -45,7 +46,7 @@ func (service *Service) Move(msg *coormsg.MoveObjectToStorage) *coormsg.MoveObje objectRep, err := service.db.QueryObjectRep(object.ObjectID) if err != nil { log.Warnf("query ObjectRep failed, err: %s", err.Error()) - return coormsg.NewCoorMoveRespFailed(errorcode.OPERATION_FAILED, "query ObjectRep failed") + return ramsg.ReplyFailed[coormsg.MoveObjectToStorageResp](errorcode.OPERATION_FAILED, "query ObjectRep failed") } hashs = append(hashs, objectRep.RepHash) @@ -54,7 +55,7 @@ func (service *Service) Move(msg *coormsg.MoveObjectToStorage) *coormsg.MoveObje blockHashs, err := service.db.QueryObjectBlock(object.ObjectID) if err != nil { log.Warnf("query ObjectBlock failed, err: %s", err.Error()) - return coormsg.NewCoorMoveRespFailed(errorcode.OPERATION_FAILED, "query ObjectBlock failed") + return ramsg.ReplyFailed[coormsg.MoveObjectToStorageResp](errorcode.OPERATION_FAILED, "query ObjectBlock failed") } ecPolicies := *utils.GetEcPolicy() @@ -88,7 +89,7 @@ func (service *Service) Move(msg *coormsg.MoveObjectToStorage) *coormsg.MoveObje }*/ } - return coormsg.NewCoorMoveRespOK( + return ramsg.ReplyOK(coormsg.NewMoveObjectToStorageRespBody( stg.NodeID, stg.Directory, object.Redundancy, @@ -96,5 +97,5 @@ func (service *Service) Move(msg *coormsg.MoveObjectToStorage) *coormsg.MoveObje hashs, ids, object.FileSizeInBytes, - ) + )) }