| @@ -9,7 +9,20 @@ import ( | |||||
| log "gitlink.org.cn/cloudream/utils/logger" | log "gitlink.org.cn/cloudream/utils/logger" | ||||
| ) | ) | ||||
| func (service *Service) Move(msg *coormsg.MoveObjectToStorage) *coormsg.MoveObjectToStorageResp { | |||||
| func (service *Service) MoveObjectToStorage(msg *coormsg.MoveObjectToStorage) *coormsg.MoveObjectToStorageResp { | |||||
| err := service.db.Storage().UserMoveObjectTo(msg.Body.UserID, msg.Body.ObjectID, msg.Body.StorageID) | |||||
| if err != nil { | |||||
| log.WithField("UserID", msg.Body.UserID). | |||||
| WithField("ObjectID", msg.Body.ObjectID). | |||||
| WithField("StorageID", msg.Body.StorageID). | |||||
| Warnf("user move object to storage failed, err: %s", err.Error()) | |||||
| return ramsg.ReplyFailed[coormsg.MoveObjectToStorageResp](errorcode.OPERATION_FAILED, "user move object to storage failed") | |||||
| } | |||||
| return ramsg.ReplyOK(coormsg.NewMoveObjectToStorageRespBody()) | |||||
| } | |||||
| func (service *Service) PreMoveObjectToStorage(msg *coormsg.PreMoveObjectToStorage) *coormsg.PreMoveObjectToStorageResp { | |||||
| //查询数据库,获取冗余类型,冗余参数 | //查询数据库,获取冗余类型,冗余参数 | ||||
| //jh:使用command中的bucketname和objectname查询对象表,获得redundancy,EcName,fileSizeInBytes | //jh:使用command中的bucketname和objectname查询对象表,获得redundancy,EcName,fileSizeInBytes | ||||
| //-若redundancy是rep,查询对象副本表, 获得repHash | //-若redundancy是rep,查询对象副本表, 获得repHash | ||||
| @@ -20,15 +33,13 @@ func (service *Service) Move(msg *coormsg.MoveObjectToStorage) *coormsg.MoveObje | |||||
| //--查询节点延迟表,得到command.destination与各个nodeIps的的延迟,存到一个map类型中(Delay) | //--查询节点延迟表,得到command.destination与各个nodeIps的的延迟,存到一个map类型中(Delay) | ||||
| //--kx:根据查出来的hash/hashs、nodeIps、TempOrPins、Times(移动/读取策略)、Delay确定hashs、ids | //--kx:根据查出来的hash/hashs、nodeIps、TempOrPins、Times(移动/读取策略)、Delay确定hashs、ids | ||||
| // TODO 需要在StorageData中增加记录 | |||||
| // 查询用户关联的存储服务 | // 查询用户关联的存储服务 | ||||
| stg, err := service.db.QueryUserStorage(msg.Body.UserID, msg.Body.StorageID) | stg, err := service.db.QueryUserStorage(msg.Body.UserID, msg.Body.StorageID) | ||||
| if err != nil { | if err != nil { | ||||
| log.WithField("UserID", msg.Body.UserID). | log.WithField("UserID", msg.Body.UserID). | ||||
| WithField("StorageID", msg.Body.StorageID). | WithField("StorageID", msg.Body.StorageID). | ||||
| Warnf("query storage directory failed, err: %s", err.Error()) | Warnf("query storage directory failed, err: %s", err.Error()) | ||||
| return ramsg.ReplyFailed[coormsg.MoveObjectToStorageResp](errorcode.OPERATION_FAILED, "query storage directory failed") | |||||
| return ramsg.ReplyFailed[coormsg.PreMoveObjectToStorageResp](errorcode.OPERATION_FAILED, "query storage directory failed") | |||||
| } | } | ||||
| // 查询文件对象 | // 查询文件对象 | ||||
| @@ -36,7 +47,7 @@ func (service *Service) Move(msg *coormsg.MoveObjectToStorage) *coormsg.MoveObje | |||||
| if err != nil { | if err != nil { | ||||
| log.WithField("ObjectID", msg.Body.ObjectID). | log.WithField("ObjectID", msg.Body.ObjectID). | ||||
| Warnf("query Object failed, err: %s", err.Error()) | Warnf("query Object failed, err: %s", err.Error()) | ||||
| return ramsg.ReplyFailed[coormsg.MoveObjectToStorageResp](errorcode.OPERATION_FAILED, "query Object failed") | |||||
| return ramsg.ReplyFailed[coormsg.PreMoveObjectToStorageResp](errorcode.OPERATION_FAILED, "query Object failed") | |||||
| } | } | ||||
| //-若redundancy是rep,查询对象副本表, 获得repHash | //-若redundancy是rep,查询对象副本表, 获得repHash | ||||
| @@ -46,7 +57,7 @@ func (service *Service) Move(msg *coormsg.MoveObjectToStorage) *coormsg.MoveObje | |||||
| objectRep, err := service.db.GetObjectRep(object.ObjectID) | objectRep, err := service.db.GetObjectRep(object.ObjectID) | ||||
| if err != nil { | if err != nil { | ||||
| log.Warnf("query ObjectRep failed, err: %s", err.Error()) | log.Warnf("query ObjectRep failed, err: %s", err.Error()) | ||||
| return ramsg.ReplyFailed[coormsg.MoveObjectToStorageResp](errorcode.OPERATION_FAILED, "query ObjectRep failed") | |||||
| return ramsg.ReplyFailed[coormsg.PreMoveObjectToStorageResp](errorcode.OPERATION_FAILED, "query ObjectRep failed") | |||||
| } | } | ||||
| hashs = append(hashs, objectRep.RepHash) | hashs = append(hashs, objectRep.RepHash) | ||||
| @@ -55,7 +66,7 @@ func (service *Service) Move(msg *coormsg.MoveObjectToStorage) *coormsg.MoveObje | |||||
| blockHashs, err := service.db.QueryObjectBlock(object.ObjectID) | blockHashs, err := service.db.QueryObjectBlock(object.ObjectID) | ||||
| if err != nil { | if err != nil { | ||||
| log.Warnf("query ObjectBlock failed, err: %s", err.Error()) | log.Warnf("query ObjectBlock failed, err: %s", err.Error()) | ||||
| return ramsg.ReplyFailed[coormsg.MoveObjectToStorageResp](errorcode.OPERATION_FAILED, "query ObjectBlock failed") | |||||
| return ramsg.ReplyFailed[coormsg.PreMoveObjectToStorageResp](errorcode.OPERATION_FAILED, "query ObjectBlock failed") | |||||
| } | } | ||||
| ecPolicies := *utils.GetEcPolicy() | ecPolicies := *utils.GetEcPolicy() | ||||
| @@ -89,7 +100,7 @@ func (service *Service) Move(msg *coormsg.MoveObjectToStorage) *coormsg.MoveObje | |||||
| }*/ | }*/ | ||||
| } | } | ||||
| return ramsg.ReplyOK(coormsg.NewMoveObjectToStorageRespBody( | |||||
| return ramsg.ReplyOK(coormsg.NewPreMoveObjectToStorageRespBody( | |||||
| stg.NodeID, | stg.NodeID, | ||||
| stg.Directory, | stg.Directory, | ||||
| object.Redundancy, | object.Redundancy, | ||||