| @@ -8,7 +8,6 @@ import ( | |||
| "gitlink.org.cn/cloudream/common/consts" | |||
| "gitlink.org.cn/cloudream/common/consts/errorcode" | |||
| "gitlink.org.cn/cloudream/common/pkg/logger" | |||
| log "gitlink.org.cn/cloudream/common/pkg/logger" | |||
| "gitlink.org.cn/cloudream/db/model" | |||
| ramsg "gitlink.org.cn/cloudream/rabbitmq/message" | |||
| coormsg "gitlink.org.cn/cloudream/rabbitmq/message/coordinator" | |||
| @@ -20,7 +19,7 @@ func (svc *Service) PreDownloadObject(msg *coormsg.PreDownloadObject) *coormsg.P | |||
| // 查询文件对象 | |||
| object, err := svc.db.Object().GetUserObject(msg.Body.UserID, msg.Body.ObjectID) | |||
| if err != nil { | |||
| log.WithField("ObjectID", msg.Body.ObjectID). | |||
| logger.WithField("ObjectID", msg.Body.ObjectID). | |||
| Warnf("query Object failed, err: %s", err.Error()) | |||
| return ramsg.ReplyFailed[coormsg.PreDownloadObjectResp](errorcode.OPERATION_FAILED, "query Object failed") | |||
| } | |||
| @@ -28,17 +27,17 @@ func (svc *Service) PreDownloadObject(msg *coormsg.PreDownloadObject) *coormsg.P | |||
| // 查询客户端所属节点 | |||
| belongNode, err := svc.db.Node().GetByExternalIP(msg.Body.ClientExternalIP) | |||
| if err != nil { | |||
| log.WithField("ClientExternalIP", msg.Body.ClientExternalIP). | |||
| logger.WithField("ClientExternalIP", msg.Body.ClientExternalIP). | |||
| Warnf("query client belong node failed, err: %s", err.Error()) | |||
| return ramsg.ReplyFailed[coormsg.PreDownloadObjectResp](errorcode.OPERATION_FAILED, "query client belong node failed") | |||
| } | |||
| log.Debugf("client address %s is at location %d", msg.Body.ClientExternalIP, belongNode.LocationID) | |||
| logger.Debugf("client address %s is at location %d", msg.Body.ClientExternalIP, belongNode.LocationID) | |||
| //-若redundancy是rep,查询对象副本表, 获得FileHash | |||
| if object.Redundancy == consts.REDUNDANCY_REP { | |||
| objectRep, err := svc.db.ObjectRep().GetObjectRep(object.ObjectID) | |||
| if err != nil { | |||
| log.WithField("ObjectID", object.ObjectID). | |||
| logger.WithField("ObjectID", object.ObjectID). | |||
| Warnf("get ObjectRep failed, err: %s", err.Error()) | |||
| return ramsg.ReplyFailed[coormsg.PreDownloadObjectResp](errorcode.OPERATION_FAILED, "query ObjectRep failed") | |||
| } | |||
| @@ -46,7 +45,7 @@ func (svc *Service) PreDownloadObject(msg *coormsg.PreDownloadObject) *coormsg.P | |||
| // 注:由于采用了IPFS存储,因此每个备份文件的FileHash都是一样的 | |||
| nodes, err := svc.db.Cache().FindCachingFileUserNodes(svc.db.SQLCtx(), msg.Body.UserID, objectRep.FileHash) | |||
| if err != nil { | |||
| log.WithField("FileHash", objectRep.FileHash). | |||
| logger.WithField("FileHash", objectRep.FileHash). | |||
| Warnf("query Cache failed, err: %s", err.Error()) | |||
| return ramsg.ReplyFailed[coormsg.PreDownloadObjectResp](errorcode.OPERATION_FAILED, "query Cache failed") | |||
| } | |||
| @@ -119,25 +118,25 @@ func (svc *Service) PreUploadRepObject(msg *coormsg.PreUploadRepObject) *coormsg | |||
| // 此次的判断只作为参考,具体是否成功还是看UploadRepObject的结果 | |||
| isBucketAvai, err := svc.db.Bucket().IsAvailable(msg.Body.BucketID, msg.Body.UserID) | |||
| if err != nil { | |||
| log.WithField("BucketID", msg.Body.BucketID). | |||
| logger.WithField("BucketID", msg.Body.BucketID). | |||
| Warnf("check bucket available failed, err: %s", err.Error()) | |||
| return ramsg.ReplyFailed[coormsg.PreUploadResp](errorcode.OPERATION_FAILED, "check bucket available failed") | |||
| } | |||
| if !isBucketAvai { | |||
| log.WithField("BucketID", msg.Body.BucketID). | |||
| logger.WithField("BucketID", msg.Body.BucketID). | |||
| Warnf("bucket is not available to user") | |||
| return ramsg.ReplyFailed[coormsg.PreUploadResp](errorcode.OPERATION_FAILED, "bucket is not available to user") | |||
| } | |||
| _, err = svc.db.Object().GetByName(msg.Body.BucketID, msg.Body.ObjectName) | |||
| if err == nil { | |||
| log.WithField("BucketID", msg.Body.BucketID). | |||
| logger.WithField("BucketID", msg.Body.BucketID). | |||
| WithField("ObjectName", msg.Body.ObjectName). | |||
| Warnf("object with given Name and BucketID already exists") | |||
| return ramsg.ReplyFailed[coormsg.PreUploadResp](errorcode.OPERATION_FAILED, "object with given Name and BucketID already exists") | |||
| } | |||
| if !errors.Is(err, sql.ErrNoRows) { | |||
| log.WithField("BucketID", msg.Body.BucketID). | |||
| logger.WithField("BucketID", msg.Body.BucketID). | |||
| WithField("ObjectName", msg.Body.ObjectName). | |||
| Warnf("get object by name failed, err: %s", err.Error()) | |||
| return ramsg.ReplyFailed[coormsg.PreUploadResp](errorcode.OPERATION_FAILED, "get object by name failed") | |||
| @@ -146,7 +145,7 @@ func (svc *Service) PreUploadRepObject(msg *coormsg.PreUploadRepObject) *coormsg | |||
| //查询用户可用的节点IP | |||
| nodes, err := svc.db.Node().GetUserNodes(msg.Body.UserID) | |||
| if err != nil { | |||
| log.WithField("UserID", msg.Body.UserID). | |||
| logger.WithField("UserID", msg.Body.UserID). | |||
| Warnf("query user nodes failed, err: %s", err.Error()) | |||
| return ramsg.ReplyFailed[coormsg.PreUploadResp](errorcode.OPERATION_FAILED, "query user nodes failed") | |||
| } | |||
| @@ -154,7 +153,7 @@ func (svc *Service) PreUploadRepObject(msg *coormsg.PreUploadRepObject) *coormsg | |||
| // 查询客户端所属节点 | |||
| belongNode, err := svc.db.Node().GetByExternalIP(msg.Body.ClientExternalIP) | |||
| if err != nil { | |||
| log.WithField("ClientExternalIP", msg.Body.ClientExternalIP). | |||
| logger.WithField("ClientExternalIP", msg.Body.ClientExternalIP). | |||
| Warnf("query client belong node failed, err: %s", err.Error()) | |||
| return ramsg.ReplyFailed[coormsg.PreUploadResp](errorcode.OPERATION_FAILED, "query client belong node failed") | |||
| } | |||
| @@ -176,7 +175,7 @@ func (svc *Service) PreUploadRepObject(msg *coormsg.PreUploadRepObject) *coormsg | |||
| func (svc *Service) CreateRepObject(msg *coormsg.CreateRepObject) *coormsg.CreateObjectResp { | |||
| _, err := svc.db.Object().CreateRepObject(msg.Body.BucketID, msg.Body.ObjectName, msg.Body.FileSize, msg.Body.RepCount, msg.Body.NodeIDs, msg.Body.FileHash) | |||
| if err != nil { | |||
| log.WithField("BucketName", msg.Body.BucketID). | |||
| logger.WithField("BucketName", msg.Body.BucketID). | |||
| WithField("ObjectName", msg.Body.ObjectName). | |||
| Warnf("create rep object failed, err: %s", err.Error()) | |||
| return ramsg.ReplyFailed[coormsg.CreateObjectResp](errorcode.OPERATION_FAILED, "create rep object failed") | |||
| @@ -192,15 +191,16 @@ func (svc *Service) CreateRepObject(msg *coormsg.CreateRepObject) *coormsg.Creat | |||
| } | |||
| func (svc *Service) PreUpdateRepObject(msg *coormsg.PreUpdateRepObject) *coormsg.PreUpdateRepObjectResp { | |||
| // TODO 检查用户是否有Object的权限 | |||
| // 获取对象信息 | |||
| obj, err := svc.db.Object().GetByID(msg.Body.ObjectID) | |||
| if err != nil { | |||
| log.WithField("ObjectID", msg.Body.ObjectID). | |||
| logger.WithField("ObjectID", msg.Body.ObjectID). | |||
| Warnf("get object failed, err: %s", err.Error()) | |||
| return ramsg.ReplyFailed[coormsg.PreUpdateRepObjectResp](errorcode.OPERATION_FAILED, "get object failed") | |||
| } | |||
| if obj.Redundancy != consts.REDUNDANCY_REP { | |||
| log.WithField("ObjectID", msg.Body.ObjectID). | |||
| logger.WithField("ObjectID", msg.Body.ObjectID). | |||
| Warnf("this object is not a rep object") | |||
| return ramsg.ReplyFailed[coormsg.PreUpdateRepObjectResp](errorcode.OPERATION_FAILED, "this object is not a rep object") | |||
| } | |||
| @@ -208,7 +208,7 @@ func (svc *Service) PreUpdateRepObject(msg *coormsg.PreUpdateRepObject) *coormsg | |||
| // 获取对象Rep信息 | |||
| objRep, err := svc.db.ObjectRep().GetObjectRep(msg.Body.ObjectID) | |||
| if err != nil { | |||
| log.WithField("ObjectID", msg.Body.ObjectID). | |||
| logger.WithField("ObjectID", msg.Body.ObjectID). | |||
| Warnf("get object rep failed, err: %s", err.Error()) | |||
| return ramsg.ReplyFailed[coormsg.PreUpdateRepObjectResp](errorcode.OPERATION_FAILED, "get object rep failed") | |||
| } | |||
| @@ -216,7 +216,7 @@ func (svc *Service) PreUpdateRepObject(msg *coormsg.PreUpdateRepObject) *coormsg | |||
| //查询用户可用的节点IP | |||
| nodes, err := svc.db.Node().GetUserNodes(msg.Body.UserID) | |||
| if err != nil { | |||
| log.WithField("UserID", msg.Body.UserID). | |||
| logger.WithField("UserID", msg.Body.UserID). | |||
| Warnf("query user nodes failed, err: %s", err.Error()) | |||
| return ramsg.ReplyFailed[coormsg.PreUpdateRepObjectResp](errorcode.OPERATION_FAILED, "query user nodes failed") | |||
| } | |||
| @@ -224,7 +224,7 @@ func (svc *Service) PreUpdateRepObject(msg *coormsg.PreUpdateRepObject) *coormsg | |||
| // 查询客户端所属节点 | |||
| belongNode, err := svc.db.Node().GetByExternalIP(msg.Body.ClientExternalIP) | |||
| if err != nil { | |||
| log.WithField("ClientExternalIP", msg.Body.ClientExternalIP). | |||
| logger.WithField("ClientExternalIP", msg.Body.ClientExternalIP). | |||
| Warnf("query client belong node failed, err: %s", err.Error()) | |||
| return ramsg.ReplyFailed[coormsg.PreUpdateRepObjectResp](errorcode.OPERATION_FAILED, "query client belong node failed") | |||
| } | |||
| @@ -232,7 +232,7 @@ func (svc *Service) PreUpdateRepObject(msg *coormsg.PreUpdateRepObject) *coormsg | |||
| // 查询保存了旧文件的节点信息 | |||
| cachingNodes, err := svc.db.Cache().FindCachingFileUserNodes(svc.db.SQLCtx(), msg.Body.UserID, objRep.FileHash) | |||
| if err != nil { | |||
| log.Warnf("find caching file user nodes failed, err: %s", err.Error()) | |||
| logger.Warnf("find caching file user nodes failed, err: %s", err.Error()) | |||
| return ramsg.ReplyFailed[coormsg.PreUpdateRepObjectResp](errorcode.OPERATION_FAILED, "find caching file user nodes failed") | |||
| } | |||
| @@ -255,7 +255,7 @@ func (svc *Service) PreUpdateRepObject(msg *coormsg.PreUpdateRepObject) *coormsg | |||
| func (svc *Service) UpdateRepObject(msg *coormsg.UpdateRepObject) *coormsg.UpdateRepObjectResp { | |||
| err := svc.db.Object().UpdateRepObject(msg.Body.ObjectID, msg.Body.FileSize, msg.Body.NodeIDs, msg.Body.FileHash) | |||
| if err != nil { | |||
| log.WithField("ObjectID", msg.Body.ObjectID). | |||
| logger.WithField("ObjectID", msg.Body.ObjectID). | |||
| Warnf("update rep object failed, err: %s", err.Error()) | |||
| return ramsg.ReplyFailed[coormsg.UpdateRepObjectResp](errorcode.OPERATION_FAILED, "update rep object failed") | |||
| } | |||
| @@ -273,7 +273,7 @@ func (svc *Service) DeleteObject(msg *coormsg.DeleteObject) *coormsg.DeleteObjec | |||
| err := svc.db.Object().SoftDelete(msg.Body.UserID, msg.Body.ObjectID) | |||
| if err != nil { | |||
| log.WithField("UserID", msg.Body.UserID). | |||
| logger.WithField("UserID", msg.Body.UserID). | |||
| WithField("ObjectID", msg.Body.ObjectID). | |||
| Warnf("set object deleted failed, err: %s", err.Error()) | |||
| return ramsg.ReplyFailed[coormsg.DeleteObjectResp](errorcode.OPERATION_FAILED, "set object deleted failed") | |||