diff --git a/internal/services/bucket.go b/internal/services/bucket.go index 9eaf381..8f7ad8f 100644 --- a/internal/services/bucket.go +++ b/internal/services/bucket.go @@ -46,6 +46,9 @@ func (svc *BucketService) GetBucketObjects(userID int, bucketID int) ([]model.Ob } func (svc *BucketService) CreateBucket(userID int, bucketName string) (int, error) { + // TODO 只有阅读了系统操作的源码,才能知道要加哪些锁,但用户的命令可能会调用不止一个系统操作。 + // 因此加锁的操作还是必须在用户命令里完成,但具体加锁的内容,则需要被封装起来与系统操作放到一起,方便管理,避免分散改动。 + mutex, err := reqbuilder.NewBuilder(). Metadata().Bucket().CreateOne(userID, bucketName). // TODO 可以考虑二次加锁,加的更精确 diff --git a/internal/services/object.go b/internal/services/object.go index b1cc166..c403747 100644 --- a/internal/services/object.go +++ b/internal/services/object.go @@ -7,6 +7,7 @@ import ( "gitlink.org.cn/cloudream/client/internal/config" "gitlink.org.cn/cloudream/common/consts" + "gitlink.org.cn/cloudream/common/pkg/distlock/reqbuilder" log "gitlink.org.cn/cloudream/common/pkg/logger" mygrpc "gitlink.org.cn/cloudream/common/utils/grpc" myio "gitlink.org.cn/cloudream/common/utils/io" @@ -38,6 +39,23 @@ func (svc *ObjectService) GetObject(userID int, objectID int) (model.Object, err } func (svc *ObjectService) DownloadObject(userID int, objectID int) (io.ReadCloser, error) { + mutex, err := reqbuilder.NewBuilder(). + // 用于判断用户是否有对象权限 + Metadata().UserBucket().ReadAny(). + // 用于查询可用的下载节点 + Node().ReadAny(). + // 用于查询Rep配置 + ObjectRep().ReadOne(objectID). + // 用于查询Block配置 + ObjectBlock().ReadAny(). + // 用于查询包含了副本的节点 + Cache().ReadAny(). + MutexLock(svc.distlock) + if err != nil { + return nil, fmt.Errorf("acquire locks failed, err: %w", err) + } + defer mutex.Unlock() + preDownloadResp, err := svc.coordinator.PreDownloadObject(coormsg.NewPreDownloadObjectBody(objectID, userID, config.Cfg().ExternalIP)) if err != nil { return nil, fmt.Errorf("request to coordinator failed, err: %w", err) @@ -69,7 +87,7 @@ func (svc *ObjectService) DownloadObject(userID int, objectID int) (io.ReadClose log.Infof("client and node %d are at the same location, use local ip\n", entry.ID) } - reader, err := svc.downloadRepObject(nodeIP, repInfo.FileHash) + reader, err := svc.downloadRepObject(entry.ID, nodeIP, repInfo.FileHash) if err != nil { return nil, fmt.Errorf("rep read failed, err: %w", err) } @@ -96,7 +114,7 @@ func (svc *ObjectService) chooseDownloadNode(entries []ramsg.RespNode) ramsg.Res return entries[rand.Intn(len(entries))] } -func (svc *ObjectService) downloadRepObject(nodeIP string, fileHash string) (io.ReadCloser, error) { +func (svc *ObjectService) downloadRepObject(nodeID int, nodeIP string, fileHash string) (io.ReadCloser, error) { if svc.ipfs != nil { log.Infof("try to use local IPFS to download file") @@ -108,10 +126,19 @@ func (svc *ObjectService) downloadRepObject(nodeIP string, fileHash string) (io. log.Warnf("download from local IPFS failed, so try to download from node %s, err: %s", nodeIP, err.Error()) } - return svc.downloadFromNode(nodeIP, fileHash) + return svc.downloadFromNode(nodeID, nodeIP, fileHash) } -func (svc *ObjectService) downloadFromNode(nodeIP string, fileHash string) (io.ReadCloser, error) { +func (svc *ObjectService) downloadFromNode(nodeID int, nodeIP string, fileHash string) (io.ReadCloser, error) { + // 二次获取锁 + mutex, err := reqbuilder.NewBuilder(). + // 用于从IPFS下载文件 + IPFS().ReadOneRep(nodeID, fileHash). + MutexLock(svc.distlock) + if err != nil { + return nil, fmt.Errorf("acquire locks failed, err: %w", err) + } + // 连接grpc grpcAddr := fmt.Sprintf("%s:%d", nodeIP, config.Cfg().GRPCPort) conn, err := grpc.Dial(grpcAddr, grpc.WithTransportCredentials(insecure.NewCredentials())) @@ -127,7 +154,10 @@ func (svc *ObjectService) downloadFromNode(nodeIP string, fileHash string) (io.R return nil, fmt.Errorf("request to get file failed, err: %w", err) } - reader = myio.AfterReadClosed(reader, func(io.ReadCloser) { conn.Close() }) + reader = myio.AfterReadClosed(reader, func(io.ReadCloser) { + conn.Close() + mutex.Unlock() + }) return reader, nil } @@ -142,6 +172,23 @@ func (svc *ObjectService) downloadFromLocalIPFS(fileHash string) (io.ReadCloser, } func (svc *ObjectService) UploadRepObject(userID int, bucketID int, objectName string, file io.ReadCloser, fileSize int64, repCount int) error { + mutex, err := reqbuilder.NewBuilder(). + Metadata(). + // 用于判断用户是否有桶的权限 + UserBucket().ReadOne(userID, bucketID). + // 用于防止创建了多个同名对象 + Object().CreateOne(bucketID, objectName). + // 用于查询可用的上传节点 + Node().ReadAny(). + // 用于设置Rep配置 + ObjectRep().CreateAny(). + // 用于创建Cache记录 + Cache().CreateAny(). + MutexLock(svc.distlock) + if err != nil { + return fmt.Errorf("acquire locks failed, err: %w", err) + } + defer mutex.Unlock() //发送写请求,请求Coor分配写入节点Ip repWriteResp, err := svc.coordinator.PreUploadRepObject(coormsg.NewPreUploadRepObjectBody(bucketID, objectName, fileSize, userID, config.Cfg().ExternalIP)) @@ -183,6 +230,15 @@ func (svc *ObjectService) UploadRepObject(userID int, bucketID int, objectName s log.Infof("client and node %d are at the same location, use local ip\n", uploadNode.ID) } + mutex, err := reqbuilder.NewBuilder(). + // 防止上传的副本被清除 + IPFS().CreateAnyRep(uploadNode.ID). + MutexLock(svc.distlock) + if err != nil { + return fmt.Errorf("acquire locks failed, err: %w", err) + } + defer mutex.Unlock() + fileHash, err = svc.uploadToNode(file, nodeIP) if err != nil { return fmt.Errorf("upload to node %s failed, err: %w", nodeIP, err) @@ -288,6 +344,26 @@ func (svc *ObjectService) UploadECObject(userID int, file io.ReadCloser, fileSiz } func (svc *ObjectService) UpdateRepObject(userID int, objectID int, file io.ReadCloser, fileSize int64) error { + mutex, err := reqbuilder.NewBuilder(). + Metadata(). + // 用于判断用户是否有对象的权限 + UserBucket().ReadAny(). + // 用于读取、修改对象信息 + Object().WriteOne(objectID). + // 用于更新Rep配置 + ObjectRep().WriteOne(objectID). + // 用于查询可用的上传节点 + Node().ReadAny(). + // 用于创建Cache记录 + Cache().CreateAny(). + // 用于修改Move此Object的记录的状态 + StorageObject().WriteAny(). + MutexLock(svc.distlock) + if err != nil { + return fmt.Errorf("acquire locks failed, err: %w", err) + } + defer mutex.Unlock() + preResp, err := svc.coordinator.PreUpdateRepObject(coormsg.NewPreUpdateRepObjectBody( objectID, fileSize, @@ -338,6 +414,16 @@ func (svc *ObjectService) UpdateRepObject(userID int, objectID int, file io.Read log.Infof("client and node %d are at the same location, use local ip\n", uploadNode.ID) } + mutex, err := reqbuilder.NewBuilder(). + IPFS(). + // 防止上传的副本被清除 + CreateAnyRep(uploadNode.ID). + MutexLock(svc.distlock) + if err != nil { + return fmt.Errorf("acquire locks failed, err: %w", err) + } + defer mutex.Unlock() + fileHash, err = svc.uploadToNode(file, nodeIP) if err != nil { return fmt.Errorf("upload to node %s failed, err: %w", nodeIP, err) @@ -371,6 +457,24 @@ func (svc *ObjectService) chooseUpdateRepObjectNode(nodes []coormsg.PreUpdateRep } func (svc *ObjectService) DeleteObject(userID int, objectID int) error { + mutex, err := reqbuilder.NewBuilder(). + Metadata(). + // 用于判断用户是否有对象的权限 + UserBucket().ReadAny(). + // 用于读取、修改对象信息 + Object().WriteOne(objectID). + // 用于删除Rep配置 + ObjectRep().WriteOne(objectID). + // 用于删除Block配置 + ObjectBlock().WriteAny(). + // 用于修改Move此Object的记录的状态 + StorageObject().WriteAny(). + MutexLock(svc.distlock) + if err != nil { + return fmt.Errorf("acquire locks failed, err: %w", err) + } + defer mutex.Unlock() + resp, err := svc.coordinator.DeleteObject(coormsg.NewDeleteObjectBody(userID, objectID)) if err != nil { return fmt.Errorf("request to coordinator failed, err: %w", err)