diff --git a/internal/services/bucket.go b/internal/services/bucket.go index 362608f..3c81028 100644 --- a/internal/services/bucket.go +++ b/internal/services/bucket.go @@ -14,7 +14,7 @@ func (svc *Service) GetBucket(userID int, bucketID int) (model.Bucket, error) { } func (svc *Service) GetUserBuckets(msg *coormsg.GetUserBuckets) *coormsg.GetUserBucketsResp { - buckets, err := svc.db.GetUserBuckets(msg.Body.UserID) + buckets, err := svc.db.Bucket().GetUserBuckets(msg.Body.UserID) if err != nil { log.WithField("UserID", msg.Body.UserID). @@ -26,7 +26,7 @@ func (svc *Service) GetUserBuckets(msg *coormsg.GetUserBuckets) *coormsg.GetUser } func (svc *Service) GetBucketObjects(msg *coormsg.GetBucketObjects) *coormsg.GetBucketObjectsResp { - objects, err := svc.db.GetBucketObjects(msg.Body.UserID, msg.Body.BucketID) + objects, err := svc.db.Object().GetBucketObjects(msg.Body.UserID, msg.Body.BucketID) if err != nil { log.WithField("UserID", msg.Body.UserID). @@ -39,7 +39,7 @@ func (svc *Service) GetBucketObjects(msg *coormsg.GetBucketObjects) *coormsg.Get } func (svc *Service) CreateBucket(msg *coormsg.CreateBucket) *coormsg.CreateBucketResp { - bucketID, err := svc.db.CreateBucket(msg.Body.UserID, msg.Body.BucketName) + bucketID, err := svc.db.Bucket().Create(msg.Body.UserID, msg.Body.BucketName) if err != nil { log.WithField("UserID", msg.Body.UserID). @@ -52,7 +52,7 @@ func (svc *Service) CreateBucket(msg *coormsg.CreateBucket) *coormsg.CreateBucke } func (svc *Service) DeleteBucket(msg *coormsg.DeleteBucket) *coormsg.DeleteBucketResp { - err := svc.db.DeleteBucket(msg.Body.UserID, msg.Body.BucketID) + err := svc.db.Bucket().Delete(msg.Body.UserID, msg.Body.BucketID) if err != nil { log.WithField("UserID", msg.Body.UserID). diff --git a/internal/services/object.go b/internal/services/object.go index dd35c36..0fc09e2 100644 --- a/internal/services/object.go +++ b/internal/services/object.go @@ -16,7 +16,7 @@ import ( func (svc *Service) PreDownloadObject(msg *coormsg.PreDownloadObject) *coormsg.PreDownloadObjectResp { // 查询文件对象 - object, err := svc.db.QueryObjectByID(msg.Body.ObjectID) + object, err := svc.db.Object().GetUserObject(msg.Body.UserID, msg.Body.ObjectID) if err != nil { log.WithField("ObjectID", msg.Body.ObjectID). Warnf("query Object failed, err: %s", err.Error()) @@ -24,7 +24,7 @@ func (svc *Service) PreDownloadObject(msg *coormsg.PreDownloadObject) *coormsg.P } // 查询客户端所属节点 - belongNode, err := svc.db.FindNodeByExternalIP(msg.Body.ClientExternalIP) + belongNode, err := svc.db.Node().GetByExternalIP(msg.Body.ClientExternalIP) if err != nil { log.WithField("ClientExternalIP", msg.Body.ClientExternalIP). Warnf("query client belong node failed, err: %s", err.Error()) @@ -35,7 +35,7 @@ func (svc *Service) PreDownloadObject(msg *coormsg.PreDownloadObject) *coormsg.P var entries []coormsg.PreDownloadObjectRespEntry //-若redundancy是rep,查询对象副本表, 获得repHash if object.Redundancy == consts.REDUNDANCY_REP { - objectRep, err := svc.db.GetObjectRep(object.ObjectID) + objectRep, err := svc.db.ObjectRep().GetObjectRep(object.ObjectID) if err != nil { log.WithField("ObjectID", object.ObjectID). Warnf("get ObjectRep failed, err: %s", err.Error()) @@ -43,7 +43,7 @@ func (svc *Service) PreDownloadObject(msg *coormsg.PreDownloadObject) *coormsg.P } // 注:由于采用了IPFS存储,因此每个备份文件的FileHash都是一样的 - nodes, err := svc.db.FindCachingFileUserNodes(msg.Body.UserID, objectRep.RepHash) + nodes, err := svc.db.Cache().FindCachingFileUserNodes(msg.Body.UserID, objectRep.RepHash) if err != nil { log.WithField("RepHash", objectRep.RepHash). Warnf("query Cache failed, err: %s", err.Error()) @@ -115,7 +115,7 @@ func (svc *Service) PreUploadRepObject(msg *coormsg.PreUploadRepObject) *coormsg // 判断同名对象是否存在。等到WriteRepHash时再判断一次。 // 此次的判断只作为参考,具体是否成功还是看WriteRepHash的结果 - isBucketAvai, err := svc.db.IsBucketAvailable(msg.Body.BucketID, msg.Body.UserID) + isBucketAvai, err := svc.db.Bucket().IsAvailable(msg.Body.BucketID, msg.Body.UserID) if err != nil { log.WithField("BucketID", msg.Body.BucketID). Warnf("check bucket available failed, err: %s", err.Error()) @@ -127,7 +127,7 @@ func (svc *Service) PreUploadRepObject(msg *coormsg.PreUploadRepObject) *coormsg return ramsg.ReplyFailed[coormsg.PreUploadResp](errorcode.OPERATION_FAILED, "bucket is not available to user") } - _, err = svc.db.GetObjectByName(msg.Body.BucketID, msg.Body.ObjectName) + _, err = svc.db.Object().GetByName(msg.Body.BucketID, msg.Body.ObjectName) if err == nil { log.WithField("BucketID", msg.Body.BucketID). WithField("ObjectName", msg.Body.ObjectName). @@ -142,7 +142,7 @@ func (svc *Service) PreUploadRepObject(msg *coormsg.PreUploadRepObject) *coormsg } //查询用户可用的节点IP - nodes, err := svc.db.GetUserNodes(msg.Body.UserID) + nodes, err := svc.db.Node().GetUserNodes(msg.Body.UserID) if err != nil { log.WithField("UserID", msg.Body.UserID). Warnf("query user nodes failed, err: %s", err.Error()) @@ -150,7 +150,7 @@ func (svc *Service) PreUploadRepObject(msg *coormsg.PreUploadRepObject) *coormsg } // 查询客户端所属节点 - belongNode, err := svc.db.FindNodeByExternalIP(msg.Body.ClientExternalIP) + belongNode, err := svc.db.Node().GetByExternalIP(msg.Body.ClientExternalIP) if err != nil { log.WithField("ClientExternalIP", msg.Body.ClientExternalIP). Warnf("query client belong node failed, err: %s", err.Error()) @@ -172,7 +172,7 @@ func (svc *Service) PreUploadRepObject(msg *coormsg.PreUploadRepObject) *coormsg } func (svc *Service) CreateRepObject(msg *coormsg.CreateRepObject) *coormsg.CreateObjectResp { - _, err := svc.db.CreateRepObject(msg.Body.BucketID, msg.Body.ObjectName, msg.Body.FileSizeInBytes, msg.Body.ReplicateNumber, msg.Body.NodeIDs, msg.Body.FileHash) + _, err := svc.db.Object().CreateRepObject(msg.Body.BucketID, msg.Body.ObjectName, msg.Body.FileSizeInBytes, msg.Body.ReplicateNumber, msg.Body.NodeIDs, msg.Body.FileHash) if err != nil { log.WithField("BucketName", msg.Body.BucketID). WithField("ObjectName", msg.Body.ObjectName). @@ -187,7 +187,7 @@ func (svc *Service) CreateRepObject(msg *coormsg.CreateRepObject) *coormsg.Creat func (svc *Service) PreUpdateRepObject(msg *coormsg.PreUpdateRepObject) *coormsg.PreUpdateRepObjectResp { // 获取对象信息 - obj, err := svc.db.GetObject(msg.Body.ObjectID) + obj, err := svc.db.Object().GetByID(msg.Body.ObjectID) if err != nil { log.WithField("ObjectID", msg.Body.ObjectID). Warnf("get object failed, err: %s", err.Error()) @@ -200,7 +200,7 @@ func (svc *Service) PreUpdateRepObject(msg *coormsg.PreUpdateRepObject) *coormsg } // 获取对象Rep信息 - objRep, err := svc.db.GetObjectRep(msg.Body.ObjectID) + objRep, err := svc.db.ObjectRep().GetObjectRep(msg.Body.ObjectID) if err != nil { log.WithField("ObjectID", msg.Body.ObjectID). Warnf("get object rep failed, err: %s", err.Error()) @@ -208,7 +208,7 @@ func (svc *Service) PreUpdateRepObject(msg *coormsg.PreUpdateRepObject) *coormsg } //查询用户可用的节点IP - nodes, err := svc.db.GetUserNodes(msg.Body.UserID) + nodes, err := svc.db.Node().GetUserNodes(msg.Body.UserID) if err != nil { log.WithField("UserID", msg.Body.UserID). Warnf("query user nodes failed, err: %s", err.Error()) @@ -216,7 +216,7 @@ func (svc *Service) PreUpdateRepObject(msg *coormsg.PreUpdateRepObject) *coormsg } // 查询客户端所属节点 - belongNode, err := svc.db.FindNodeByExternalIP(msg.Body.ClientExternalIP) + belongNode, err := svc.db.Node().GetByExternalIP(msg.Body.ClientExternalIP) if err != nil { log.WithField("ClientExternalIP", msg.Body.ClientExternalIP). Warnf("query client belong node failed, err: %s", err.Error()) @@ -224,7 +224,7 @@ func (svc *Service) PreUpdateRepObject(msg *coormsg.PreUpdateRepObject) *coormsg } // 查询保存了旧文件的节点信息 - cachingNodes, err := svc.db.FindCachingFileUserNodes(msg.Body.UserID, objRep.RepHash) + cachingNodes, err := svc.db.Cache().FindCachingFileUserNodes(msg.Body.UserID, objRep.RepHash) if err != nil { log.Warnf("find caching file user nodes failed, err: %s", err.Error()) return ramsg.ReplyFailed[coormsg.PreUpdateRepObjectResp](errorcode.OPERATION_FAILED, "find caching file user nodes failed") @@ -247,7 +247,7 @@ func (svc *Service) PreUpdateRepObject(msg *coormsg.PreUpdateRepObject) *coormsg } func (svc *Service) UpdateRepObject(msg *coormsg.UpdateRepObject) *coormsg.UpdateRepObjectResp { - err := svc.db.UpdateRepObject(msg.Body.ObjectID, msg.Body.FileSizeInBytes, msg.Body.NodeIDs, msg.Body.FileHash) + err := svc.db.Object().UpdateRepObject(msg.Body.ObjectID, msg.Body.FileSizeInBytes, msg.Body.NodeIDs, msg.Body.FileHash) if err != nil { log.WithField("ObjectID", msg.Body.ObjectID). Warnf("update rep object failed, err: %s", err.Error()) @@ -259,7 +259,7 @@ func (svc *Service) UpdateRepObject(msg *coormsg.UpdateRepObject) *coormsg.Updat } func (svc *Service) DeleteObject(msg *coormsg.DeleteObject) *coormsg.DeleteObjectResp { - err := svc.db.SoftDeleteObject(msg.Body.UserID, msg.Body.ObjectID) + err := svc.db.Object().SoftDelete(msg.Body.UserID, msg.Body.ObjectID) if err != nil { log.WithField("UserID", msg.Body.UserID). diff --git a/internal/services/storage.go b/internal/services/storage.go index 8883675..264a744 100644 --- a/internal/services/storage.go +++ b/internal/services/storage.go @@ -34,30 +34,30 @@ func (service *Service) PreMoveObjectToStorage(msg *coormsg.PreMoveObjectToStora //--kx:根据查出来的hash/hashs、nodeIps、TempOrPins、Times(移动/读取策略)、Delay确定hashs、ids // 查询用户关联的存储服务 - stg, err := service.db.QueryUserStorage(msg.Body.UserID, msg.Body.StorageID) + stg, err := service.db.Storage().GetUserStorage(msg.Body.UserID, msg.Body.StorageID) if err != nil { log.WithField("UserID", msg.Body.UserID). WithField("StorageID", msg.Body.StorageID). - Warnf("query storage directory failed, err: %s", err.Error()) - return ramsg.ReplyFailed[coormsg.PreMoveObjectToStorageResp](errorcode.OPERATION_FAILED, "query storage directory failed") + Warnf("get user Storage failed, err: %s", err.Error()) + return ramsg.ReplyFailed[coormsg.PreMoveObjectToStorageResp](errorcode.OPERATION_FAILED, "get user Storage failed") } // 查询文件对象 - object, err := service.db.QueryObjectByID(msg.Body.ObjectID) + object, err := service.db.Object().GetUserObject(msg.Body.UserID, msg.Body.ObjectID) if err != nil { log.WithField("ObjectID", msg.Body.ObjectID). - Warnf("query Object failed, err: %s", err.Error()) - return ramsg.ReplyFailed[coormsg.PreMoveObjectToStorageResp](errorcode.OPERATION_FAILED, "query Object failed") + Warnf("get user Object failed, err: %s", err.Error()) + return ramsg.ReplyFailed[coormsg.PreMoveObjectToStorageResp](errorcode.OPERATION_FAILED, "get user Object failed") } //-若redundancy是rep,查询对象副本表, 获得repHash var hashs []string ids := []int{0} if object.Redundancy == consts.REDUNDANCY_REP { - objectRep, err := service.db.GetObjectRep(object.ObjectID) + objectRep, err := service.db.ObjectRep().GetObjectRep(object.ObjectID) if err != nil { - log.Warnf("query ObjectRep failed, err: %s", err.Error()) - return ramsg.ReplyFailed[coormsg.PreMoveObjectToStorageResp](errorcode.OPERATION_FAILED, "query ObjectRep failed") + log.Warnf("get ObjectRep failed, err: %s", err.Error()) + return ramsg.ReplyFailed[coormsg.PreMoveObjectToStorageResp](errorcode.OPERATION_FAILED, "get ObjectRep failed") } hashs = append(hashs, objectRep.RepHash)