Browse Source

上传、更新、调度文件接口改异步

gitlink
Sydonian 2 years ago
parent
commit
e81074e2bc
14 changed files with 684 additions and 357 deletions
  1. +4
    -3
      go.mod
  2. +5
    -0
      go.sum
  3. +4
    -1
      internal/cmdline/commandline.go
  4. +49
    -5
      internal/cmdline/object.go
  5. +24
    -1
      internal/cmdline/storage.go
  6. +31
    -274
      internal/services/object.go
  7. +4
    -1
      internal/services/service.go
  8. +11
    -67
      internal/services/storage.go
  9. +118
    -0
      internal/task/move_object_to_storage.go
  10. +34
    -0
      internal/task/task.go
  11. +153
    -0
      internal/task/update_rep_object.go
  12. +3
    -3
      internal/task/update_rep_object_test.go
  13. +239
    -0
      internal/task/upload_rep_object.go
  14. +5
    -2
      main.go

+ 4
- 3
go.mod View File

@@ -17,7 +17,7 @@ require (
require (
github.com/antonfisher/nested-logrus-formatter v1.3.1 // indirect
github.com/baohan10/reedsolomon v0.0.0-20230406042632-43574cac9fa7 // indirect
github.com/beevik/etree v1.1.0 // indirect
github.com/beevik/etree v1.2.0 // indirect
github.com/benbjohnson/clock v1.3.0 // indirect
github.com/crackcomm/go-gitignore v0.0.0-20170627025303-887ab5e44cc3 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.1.0 // indirect
@@ -33,6 +33,7 @@ require (
github.com/ipfs/go-cid v0.4.0 // indirect
github.com/ipfs/go-ipfs-api v0.6.0 // indirect
github.com/jtolds/gls v4.20.0+incompatible // indirect
github.com/juju/ratelimit v1.0.2 // indirect
github.com/klauspost/cpuid/v2 v2.2.4 // indirect
github.com/libp2p/go-buffer-pool v0.1.0 // indirect
github.com/libp2p/go-flow-metrics v0.1.0 // indirect
@@ -51,13 +52,13 @@ require (
github.com/multiformats/go-multistream v0.4.1 // indirect
github.com/multiformats/go-varint v0.0.7 // indirect
github.com/rivo/uniseg v0.2.0 // indirect
github.com/sirupsen/logrus v1.9.0 // indirect
github.com/sirupsen/logrus v1.9.2 // indirect
github.com/smartystreets/assertions v1.13.1 // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
github.com/streadway/amqp v1.0.0 // indirect
github.com/whyrusleeping/tar-utils v0.0.0-20180509141711-8c6c8ba81d5c // indirect
golang.org/x/crypto v0.6.0 // indirect
golang.org/x/exp v0.0.0-20230213192124-5e25df0256eb // indirect
golang.org/x/exp v0.0.0-20230519143937-03e91628a987 // indirect
golang.org/x/net v0.9.0 // indirect
golang.org/x/sync v0.1.0 // indirect
golang.org/x/sys v0.7.0 // indirect


+ 5
- 0
go.sum View File

@@ -4,6 +4,7 @@ github.com/baohan10/reedsolomon v0.0.0-20230406042632-43574cac9fa7 h1:wcvD6enR//
github.com/baohan10/reedsolomon v0.0.0-20230406042632-43574cac9fa7/go.mod h1:rAxMF6pVaFK/s6T4gGczvloccNbtwzuYaP2Y7W6flE8=
github.com/beevik/etree v1.1.0 h1:T0xke/WvNtMoCqgzPhkX2r4rjY3GDZFi+FjpRZY2Jbs=
github.com/beevik/etree v1.1.0/go.mod h1:r8Aw8JqVegEf0w2fDnATrX9VpkMcyFeM0FhwO62wh+A=
github.com/beevik/etree v1.2.0/go.mod h1:aiPf89g/1k3AShMVAzriilpcE4R/Vuor90y83zVZWFc=
github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A=
github.com/benbjohnson/clock v1.3.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/cheekybits/is v0.0.0-20150225183255-68e9c0620927 h1:SKI1/fuSdodxmNNyVBR8d7X/HuLnRpvvFO0AgyQk764=
@@ -45,6 +46,8 @@ github.com/jedib0t/go-pretty/v6 v6.4.6 h1:v6aG9h6Uby3IusSSEjHaZNXpHFhzqMmjXcPq1R
github.com/jedib0t/go-pretty/v6 v6.4.6/go.mod h1:Ndk3ase2CkQbXLLNf5QDHoYb6J9WtVfmHZu9n8rk2xs=
github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo=
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
github.com/juju/ratelimit v1.0.2 h1:sRxmtRiajbvrcLQT7S+JbqU0ntsb9W2yhSdNN8tWfaI=
github.com/juju/ratelimit v1.0.2/go.mod h1:qapgC/Gy+xNh9UxzV13HGGl/6UXNN+ct+vwSgWNm/qk=
github.com/klauspost/cpuid/v2 v2.0.4/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
github.com/klauspost/cpuid/v2 v2.2.4 h1:acbojRNwl3o09bUq+yDCtZFc1aiwaAAxtcn8YkZXnvk=
@@ -90,6 +93,7 @@ github.com/samber/lo v1.38.1 h1:j2XEAqXKb09Am4ebOg31SpvzUTTs6EN3VfgeLUhPdXM=
github.com/samber/lo v1.38.1/go.mod h1:+m/ZKRl6ClXCE2Lgf3MsQlWfh4bn1bz6CXEOxnEXnEA=
github.com/sirupsen/logrus v1.9.0 h1:trlNQbNUG3OdDrDil03MCb1H2o9nJ1x4/5LYw7byDE0=
github.com/sirupsen/logrus v1.9.0/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
github.com/sirupsen/logrus v1.9.2/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
github.com/smartystreets/assertions v1.13.1 h1:Ef7KhSmjZcK6AVf9YbJdvPYG9avaF0ZxudX+ThRdWfU=
github.com/smartystreets/assertions v1.13.1/go.mod h1:cXr/IwVfSo/RbCSPhoAPv73p3hlSdrBH/b3SdnW/LMY=
github.com/smartystreets/goconvey v1.8.0 h1:Oi49ha/2MURE0WexF052Z0m+BNSGirfjg5RL+JXWq3w=
@@ -110,6 +114,7 @@ golang.org/x/crypto v0.6.0 h1:qfktjS5LUO+fFKeJXZ+ikTRijMmljikvG68fpMMruSc=
golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58=
golang.org/x/exp v0.0.0-20230213192124-5e25df0256eb h1:PaBZQdo+iSDyHT053FjUCgZQ/9uqVwPOcl7KSWhKn6w=
golang.org/x/exp v0.0.0-20230213192124-5e25df0256eb/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc=
golang.org/x/exp v0.0.0-20230519143937-03e91628a987/go.mod h1:V1LtkGg67GoY2N1AnLN78QLrzxkLyJw7RJb1gzOOz9w=
golang.org/x/net v0.0.0-20210316092652-d523dce5a7f4/go.mod h1:RBQZq4jEuRlivfhVLdyRGr576XBO4/greRjx4P4O3yc=
golang.org/x/net v0.9.0 h1:aWJ/m6xSmxWBx+V0XRHTlrYrPG56jKsLdTFmsSsCzOM=
golang.org/x/net v0.9.0/go.mod h1:d48xBJpPfHeWQsugry2m+kC02ZBRGRgulfHnEXEuWns=


+ 4
- 1
internal/cmdline/commandline.go View File

@@ -7,6 +7,7 @@ import (
"gitlink.org.cn/cloudream/client/internal/services"
"gitlink.org.cn/cloudream/common/pkg/cmdtrie"
distlocksvc "gitlink.org.cn/cloudream/common/pkg/distlock/service"
"gitlink.org.cn/cloudream/common/utils/ipfs"
)

type CommandContext struct {
@@ -18,12 +19,14 @@ var commands cmdtrie.CommandTrie[CommandContext, error] = cmdtrie.NewCommandTrie
type Commandline struct {
Svc *services.Service
DistLock *distlocksvc.Service
IPFS *ipfs.IPFS
}

func NewCommandline(svc *services.Service, distLock *distlocksvc.Service) (*Commandline, error) {
func NewCommandline(svc *services.Service, distLock *distlocksvc.Service, ipfs *ipfs.IPFS) (*Commandline, error) {
return &Commandline{
Svc: svc,
DistLock: distLock,
IPFS: ipfs,
}, nil
}



+ 49
- 5
internal/cmdline/object.go View File

@@ -5,8 +5,11 @@ import (
"io"
"os"
"path/filepath"
"time"

"github.com/jedib0t/go-pretty/v6/table"
"github.com/juju/ratelimit"
myio "gitlink.org.cn/cloudream/common/utils/io"
)

func ObjectListBucketObjects(ctx CommandContext, bucketID int) error {
@@ -58,7 +61,8 @@ func ObjectDownloadObject(ctx CommandContext, localFilePath string, objectID int
}
defer reader.Close()

_, err = io.Copy(outputFile, reader)
bkt := ratelimit.NewBucketWithRate(10*1024, 10*1024)
_, err = io.Copy(outputFile, ratelimit.Reader(reader, bkt))
if err != nil {
// TODO 写入到文件失败,是否要考虑删除这个不完整的文件?
return fmt.Errorf("copy object data to local file failed, err: %w", err)
@@ -80,12 +84,33 @@ func ObjectUploadRepObject(ctx CommandContext, localFilePath string, bucketID in
}
fileSize := fileInfo.Size()

err = ctx.Cmdline.Svc.ObjectSvc().UploadRepObject(0, bucketID, objectName, file, fileSize, repCount)
// TODO 测试用
bkt := ratelimit.NewBucketWithRate(10*1024, 10*1024)
taskID, err := ctx.Cmdline.Svc.ObjectSvc().StartUploadingRepObject(0, bucketID, objectName,
myio.WithCloser(ratelimit.Reader(file, bkt),
func(reader io.Reader) error {
return file.Close()
}),
fileSize, repCount)
if err != nil {
return fmt.Errorf("upload file data failed, err: %w", err)
}

return nil
for {
complete, fileHash, err := ctx.Cmdline.Svc.ObjectSvc().WaitUploadingRepObject(taskID, time.Second*5)
if complete {
if err != nil {
return fmt.Errorf("uploading rep object: %w", err)
}

fmt.Print(fileHash)
return nil
}

if err != nil {
return fmt.Errorf("wait uploading: %w", err)
}
}
}

func ObjectEcWrite(ctx CommandContext, localFilePath string, bucketID int, objectName string, ecName string) error {
@@ -108,12 +133,31 @@ func ObjectUpdateRepObject(ctx CommandContext, objectID int, filePath string) er
}
fileSize := fileInfo.Size()

err = ctx.Cmdline.Svc.ObjectSvc().UpdateRepObject(userID, objectID, file, fileSize)
// TODO 测试用
bkt := ratelimit.NewBucketWithRate(10*1024, 10*1024)
taskID, err := ctx.Cmdline.Svc.ObjectSvc().StartUpdatingRepObject(userID, objectID,
myio.WithCloser(ratelimit.Reader(file, bkt),
func(reader io.Reader) error {
return file.Close()
}), fileSize)
if err != nil {
return fmt.Errorf("update object %d failed, err: %w", objectID, err)
}

return nil
for {
complete, err := ctx.Cmdline.Svc.ObjectSvc().WaitUpdatingRepObject(taskID, time.Second*5)
if complete {
if err != nil {
return fmt.Errorf("updating rep object: %w", err)
}

return nil
}

if err != nil {
return fmt.Errorf("wait updating: %w", err)
}
}
}

func ObjectDeleteObject(ctx CommandContext, objectID int) error {


+ 24
- 1
internal/cmdline/storage.go View File

@@ -1,7 +1,30 @@
package cmdline

import (
"fmt"
"time"
)

func StorageMoveObjectToStorage(ctx CommandContext, objectID int, storageID int) error {
return ctx.Cmdline.Svc.StorageSvc().MoveObjectToStorage(0, objectID, storageID)
taskID, err := ctx.Cmdline.Svc.StorageSvc().StartMovingObjectToStorage(0, objectID, storageID)
if err != nil {
return fmt.Errorf("start moving object to storage: %w", err)
}

for {
complete, err := ctx.Cmdline.Svc.StorageSvc().WaitMovingObjectToStorage(taskID, time.Second*5)
if complete {
if err != nil {
return fmt.Errorf("moving complete with: %w", err)
}

return nil
}

if err != nil {
return fmt.Errorf("wait moving: %w", err)
}
}
}

func init() {


+ 31
- 274
internal/services/object.go View File

@@ -4,20 +4,19 @@ import (
"fmt"
"io"
"math/rand"
"time"

"gitlink.org.cn/cloudream/client/internal/config"
"gitlink.org.cn/cloudream/client/internal/task"
"gitlink.org.cn/cloudream/common/consts"
"gitlink.org.cn/cloudream/common/pkg/distlock/reqbuilder"
log "gitlink.org.cn/cloudream/common/pkg/logger"
mygrpc "gitlink.org.cn/cloudream/common/utils/grpc"
myio "gitlink.org.cn/cloudream/common/utils/io"
serder "gitlink.org.cn/cloudream/common/utils/serder"
mysort "gitlink.org.cn/cloudream/common/utils/sort"
"gitlink.org.cn/cloudream/db/model"
agentcaller "gitlink.org.cn/cloudream/proto"
agtcli "gitlink.org.cn/cloudream/rabbitmq/client/agent"
ramsg "gitlink.org.cn/cloudream/rabbitmq/message"
agtmsg "gitlink.org.cn/cloudream/rabbitmq/message/agent"
coormsg "gitlink.org.cn/cloudream/rabbitmq/message/coordinator"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
@@ -44,6 +43,8 @@ func (svc *ObjectService) DownloadObject(userID int, objectID int) (io.ReadClose
Metadata().UserBucket().ReadAny().
// 用于查询可用的下载节点
Node().ReadAny().
// 用于读取文件信息
Object().ReadOne(objectID).
// 用于查询Rep配置
ObjectRep().ReadOne(objectID).
// 用于查询Block配置
@@ -54,13 +55,14 @@ func (svc *ObjectService) DownloadObject(userID int, objectID int) (io.ReadClose
if err != nil {
return nil, fmt.Errorf("acquire locks failed, err: %w", err)
}
defer mutex.Unlock()

preDownloadResp, err := svc.coordinator.PreDownloadObject(coormsg.NewPreDownloadObjectBody(objectID, userID, config.Cfg().ExternalIP))
if err != nil {
mutex.Unlock()
return nil, fmt.Errorf("request to coordinator failed, err: %w", err)
}
if preDownloadResp.IsFailed() {
mutex.Unlock()
return nil, fmt.Errorf("coordinator operation failed, code: %s, message: %s", preDownloadResp.ErrorCode, preDownloadResp.ErrorMessage)
}

@@ -69,10 +71,12 @@ func (svc *ObjectService) DownloadObject(userID int, objectID int) (io.ReadClose
var repInfo ramsg.RespObjectRepInfo
err := serder.MapToObject(preDownloadResp.Body.RedundancyData.(map[string]any), &repInfo)
if err != nil {
mutex.Unlock()
return nil, fmt.Errorf("redundancy data to rep info failed, err: %w", err)
}

if len(repInfo.Nodes) == 0 {
mutex.Unlock()
return nil, fmt.Errorf("no node has this file")
}

@@ -89,16 +93,21 @@ func (svc *ObjectService) DownloadObject(userID int, objectID int) (io.ReadClose

reader, err := svc.downloadRepObject(entry.ID, nodeIP, repInfo.FileHash)
if err != nil {
mutex.Unlock()
return nil, fmt.Errorf("rep read failed, err: %w", err)
}

return reader, nil
return myio.AfterReadClosed(reader, func(closer io.ReadCloser) {
// TODO 可以考虑在打开了读取流之后就解锁,而不是要等外部读取完毕
mutex.Unlock()
}), nil

//case consts.REDUNDANCY_EC:
// TODO EC部分的代码要考虑重构
// ecRead(readResp.FileSize, readResp.NodeIPs, readResp.Hashes, readResp.BlockIDs, *readResp.ECName)
}

mutex.Unlock()
return nil, fmt.Errorf("unsupported redundancy type: %s", preDownloadResp.Body.Redundancy)
}

@@ -171,171 +180,18 @@ func (svc *ObjectService) downloadFromLocalIPFS(fileHash string) (io.ReadCloser,
return reader, nil
}

func (svc *ObjectService) UploadRepObject(userID int, bucketID int, objectName string, file io.ReadCloser, fileSize int64, repCount int) error {
mutex, err := reqbuilder.NewBuilder().
Metadata().
// 用于判断用户是否有桶的权限
UserBucket().ReadOne(userID, bucketID).
// 用于防止创建了多个同名对象
Object().CreateOne(bucketID, objectName).
// 用于查询可用的上传节点
Node().ReadAny().
// 用于设置Rep配置
ObjectRep().CreateAny().
// 用于创建Cache记录
Cache().CreateAny().
MutexLock(svc.distlock)
if err != nil {
return fmt.Errorf("acquire locks failed, err: %w", err)
}
defer mutex.Unlock()

//发送写请求,请求Coor分配写入节点Ip
repWriteResp, err := svc.coordinator.PreUploadRepObject(coormsg.NewPreUploadRepObjectBody(bucketID, objectName, fileSize, userID, config.Cfg().ExternalIP))
if err != nil {
return fmt.Errorf("request to coordinator failed, err: %w", err)
}
if repWriteResp.IsFailed() {
return fmt.Errorf("coordinator RepWrite failed, code: %s, message: %s", repWriteResp.ErrorCode, repWriteResp.ErrorMessage)
}

if len(repWriteResp.Body.Nodes) == 0 {
return fmt.Errorf("no node to upload file")
}

uploadNode := svc.chooseUploadNode(repWriteResp.Body.Nodes)

var fileHash string
uploadedNodeIDs := []int{}
uploadToNode := true
// 本地有IPFS,则直接从本地IPFS上传
if svc.ipfs != nil {
log.Infof("try to use local IPFS to upload file")

fileHash, err = svc.uploadToLocalIPFS(file, uploadNode.ID)
if err != nil {
log.Warnf("upload to local IPFS failed, so try to upload to node %d, err: %s", uploadNode.ID, err.Error())
} else {
uploadToNode = false
}
}

// 否则发送到agent上传
if uploadToNode {
// 如果客户端与节点在同一个地域,则使用内网地址连接节点
nodeIP := uploadNode.ExternalIP
if uploadNode.IsSameLocation {
nodeIP = uploadNode.LocalIP

log.Infof("client and node %d are at the same location, use local ip\n", uploadNode.ID)
}

mutex, err := reqbuilder.NewBuilder().
// 防止上传的副本被清除
IPFS().CreateAnyRep(uploadNode.ID).
MutexLock(svc.distlock)
if err != nil {
return fmt.Errorf("acquire locks failed, err: %w", err)
}
defer mutex.Unlock()

fileHash, err = svc.uploadToNode(file, nodeIP)
if err != nil {
return fmt.Errorf("upload to node %s failed, err: %w", nodeIP, err)
}
uploadedNodeIDs = append(uploadedNodeIDs, uploadNode.ID)
}

// 记录写入的文件的Hash
createObjectResp, err := svc.coordinator.CreateRepObject(coormsg.NewCreateRepObjectBody(bucketID, objectName, fileSize, repCount, userID, uploadedNodeIDs, fileHash))
if err != nil {
return fmt.Errorf("request to coordinator failed, err: %w", err)
}
if createObjectResp.IsFailed() {
return fmt.Errorf("coordinator CreateRepObject failed, code: %s, message: %s", createObjectResp.ErrorCode, createObjectResp.ErrorMessage)
}

return nil
}

func (svc *ObjectService) uploadToNode(file io.ReadCloser, nodeIP string) (string, error) {
// 建立grpc连接,发送请求
grpcAddr := fmt.Sprintf("%s:%d", nodeIP, config.Cfg().GRPCPort)
grpcCon, err := grpc.Dial(grpcAddr, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return "", fmt.Errorf("connect to grpc server at %s failed, err: %w", grpcAddr, err)
}
defer grpcCon.Close()

client := agentcaller.NewFileTransportClient(grpcCon)
upload, err := mygrpc.SendFileAsStream(client)
if err != nil {
return "", fmt.Errorf("request to send file failed, err: %w", err)
}

// 发送文件数据
_, err = io.Copy(upload, file)
if err != nil {
// 发生错误则关闭连接
upload.Abort(io.ErrClosedPipe)
return "", fmt.Errorf("copy file date to upload stream failed, err: %w", err)
}

// 发送EOF消息,并获得FileHash
fileHash, err := upload.Finish()
if err != nil {
upload.Abort(io.ErrClosedPipe)
return "", fmt.Errorf("send EOF failed, err: %w", err)
}

return fileHash, nil
}

func (svc *ObjectService) uploadToLocalIPFS(file io.ReadCloser, nodeID int) (string, error) {
// 从本地IPFS上传文件
writer, err := svc.ipfs.CreateFile()
if err != nil {
return "", fmt.Errorf("create IPFS file failed, err: %w", err)
}

_, err = io.Copy(writer, file)
if err != nil {
return "", fmt.Errorf("copy file data to IPFS failed, err: %w", err)
}

fileHash, err := writer.Finish()
if err != nil {
return "", fmt.Errorf("finish writing IPFS failed, err: %w", err)
}

// 然后让最近节点pin本地上传的文件
agentClient, err := agtcli.NewClient(nodeID, &config.Cfg().RabbitMQ)
if err != nil {
return "", fmt.Errorf("create agent client to %d failed, err: %w", nodeID, err)
}
defer agentClient.Close()

pinObjResp, err := agentClient.PinObject(agtmsg.NewPinObjectBody(fileHash))
if err != nil {
return "", fmt.Errorf("request to agent %d failed, err: %w", nodeID, err)
}
if pinObjResp.IsFailed() {
return "", fmt.Errorf("agent %d PinObject failed, code: %s, message: %s", nodeID, pinObjResp.ErrorCode, pinObjResp.ErrorMessage)
}

return fileHash, nil
func (svc *ObjectService) StartUploadingRepObject(userID int, bucketID int, objectName string, file io.ReadCloser, fileSize int64, repCount int) (string, error) {
tsk := svc.taskMgr.StartNew(task.NewUploadRepObject(userID, bucketID, objectName, file, fileSize, repCount))
return tsk.ID(), nil
}

// chooseUploadNode 选择一个上传文件的节点
// 1. 从与当前客户端相同地域的节点中随机选一个
// 2. 没有用的话从所有节点中随机选一个
func (svc *ObjectService) chooseUploadNode(nodes []ramsg.RespNode) ramsg.RespNode {
sameLocationNodes := lo.Filter(nodes, func(e ramsg.RespNode, i int) bool { return e.IsSameLocation })
if len(sameLocationNodes) > 0 {
return sameLocationNodes[rand.Intn(len(sameLocationNodes))]
func (svc *ObjectService) WaitUploadingRepObject(taskID string, waitTimeout time.Duration) (bool, string, error) {
tsk := svc.taskMgr.FindByID(taskID)
if tsk.WaitTimeout(waitTimeout) {
return true, tsk.Body().(*task.UploadRepObject).ResultFileHash, tsk.Error()
}

return nodes[rand.Intn(len(nodes))]
return false, "", nil
}

func (svc *ObjectService) UploadECObject(userID int, file io.ReadCloser, fileSize int64, ecName string) error {
@@ -343,117 +199,18 @@ func (svc *ObjectService) UploadECObject(userID int, file io.ReadCloser, fileSiz
panic("not implement yet")
}

func (svc *ObjectService) UpdateRepObject(userID int, objectID int, file io.ReadCloser, fileSize int64) error {
mutex, err := reqbuilder.NewBuilder().
Metadata().
// 用于判断用户是否有对象的权限
UserBucket().ReadAny().
// 用于读取、修改对象信息
Object().WriteOne(objectID).
// 用于更新Rep配置
ObjectRep().WriteOne(objectID).
// 用于查询可用的上传节点
Node().ReadAny().
// 用于创建Cache记录
Cache().CreateAny().
// 用于修改Move此Object的记录的状态
StorageObject().WriteAny().
MutexLock(svc.distlock)
if err != nil {
return fmt.Errorf("acquire locks failed, err: %w", err)
}
defer mutex.Unlock()

preResp, err := svc.coordinator.PreUpdateRepObject(coormsg.NewPreUpdateRepObjectBody(
objectID,
fileSize,
userID,
config.Cfg().ExternalIP,
))
if err != nil {
return fmt.Errorf("request to coordinator failed, err: %w", err)
}
if preResp.IsFailed() {
return fmt.Errorf("coordinator PreUpdateRepObject failed, code: %s, message: %s", preResp.ErrorCode, preResp.ErrorMessage)
}

if len(preResp.Body.Nodes) == 0 {
return fmt.Errorf("no node to upload file")
}

// 上传文件的方式优先级:
// 1. 本地IPFS
// 2. 包含了旧文件,且与客户端在同地域的节点
// 3. 不在同地域,但包含了旧文件的节点
// 4. 同地域节点

uploadNode := svc.chooseUpdateRepObjectNode(preResp.Body.Nodes)

var fileHash string
uploadedNodeIDs := []int{}
uploadToNode := true
// 本地有IPFS,则直接从本地IPFS上传
if svc.ipfs != nil {
log.Infof("try to use local IPFS to upload file")

fileHash, err = svc.uploadToLocalIPFS(file, uploadNode.ID)
if err != nil {
log.Warnf("upload to local IPFS failed, so try to upload to node %d, err: %s", uploadNode.ID, err.Error())
} else {
uploadToNode = false
}
}

// 否则发送到agent上传
if uploadToNode {
// 如果客户端与节点在同一个地域,则使用内网地址连接节点
nodeIP := uploadNode.ExternalIP
if uploadNode.IsSameLocation {
nodeIP = uploadNode.LocalIP

log.Infof("client and node %d are at the same location, use local ip\n", uploadNode.ID)
}

mutex, err := reqbuilder.NewBuilder().
IPFS().
// 防止上传的副本被清除
CreateAnyRep(uploadNode.ID).
MutexLock(svc.distlock)
if err != nil {
return fmt.Errorf("acquire locks failed, err: %w", err)
}
defer mutex.Unlock()

fileHash, err = svc.uploadToNode(file, nodeIP)
if err != nil {
return fmt.Errorf("upload to node %s failed, err: %w", nodeIP, err)
}
uploadedNodeIDs = append(uploadedNodeIDs, uploadNode.ID)
}

// 更新Object
updateResp, err := svc.coordinator.UpdateRepObject(coormsg.NewUpdateRepObjectBody(objectID, fileHash, fileSize, uploadedNodeIDs, userID))
if err != nil {
return fmt.Errorf("request to coordinator failed, err: %w", err)
}
if updateResp.IsFailed() {
return fmt.Errorf("coordinator UpdateRepObject failed, code: %s, message: %s", updateResp.ErrorCode, updateResp.ErrorMessage)
}

return nil
func (svc *ObjectService) StartUpdatingRepObject(userID int, objectID int, file io.ReadCloser, fileSize int64) (string, error) {
tsk := svc.taskMgr.StartNew(task.NewUpdateRepObject(userID, objectID, file, fileSize))
return tsk.ID(), nil
}

func (svc *ObjectService) chooseUpdateRepObjectNode(nodes []coormsg.PreUpdateRepObjectRespNode) coormsg.PreUpdateRepObjectRespNode {
mysort.Sort(nodes, func(left, right coormsg.PreUpdateRepObjectRespNode) int {
v := -mysort.CmpBool(left.HasOldObject, right.HasOldObject)
if v != 0 {
return v
}

return -mysort.CmpBool(left.IsSameLocation, right.IsSameLocation)
})
func (svc *ObjectService) WaitUpdatingRepObject(taskID string, waitTimeout time.Duration) (bool, error) {
tsk := svc.taskMgr.FindByID(taskID)
if tsk.WaitTimeout(waitTimeout) {
return true, tsk.Error()
}

return nodes[0]
return false, nil
}

func (svc *ObjectService) DeleteObject(userID int, objectID int) error {


+ 4
- 1
internal/services/service.go View File

@@ -1,6 +1,7 @@
package services

import (
"gitlink.org.cn/cloudream/client/internal/task"
distlock "gitlink.org.cn/cloudream/common/pkg/distlock/service"
"gitlink.org.cn/cloudream/common/utils/ipfs"
racli "gitlink.org.cn/cloudream/rabbitmq/client/coordinator"
@@ -12,13 +13,15 @@ type Service struct {
ipfs *ipfs.IPFS
scanner *sccli.Client
distlock *distlock.Service
taskMgr *task.Manager
}

func NewService(coorClient *racli.Client, ipfsClient *ipfs.IPFS, scanner *sccli.Client, distlock *distlock.Service) (*Service, error) {
func NewService(coorClient *racli.Client, ipfsClient *ipfs.IPFS, scanner *sccli.Client, distlock *distlock.Service, taskMgr *task.Manager) (*Service, error) {
return &Service{
coordinator: coorClient,
ipfs: ipfsClient,
scanner: scanner,
distlock: distlock,
taskMgr: taskMgr,
}, nil
}

+ 11
- 67
internal/services/storage.go View File

@@ -1,13 +1,9 @@
package services

import (
"fmt"
"time"

"gitlink.org.cn/cloudream/client/internal/config"
"gitlink.org.cn/cloudream/common/pkg/distlock/reqbuilder"
agtcli "gitlink.org.cn/cloudream/rabbitmq/client/agent"
agtmsg "gitlink.org.cn/cloudream/rabbitmq/message/agent"
coormsg "gitlink.org.cn/cloudream/rabbitmq/message/coordinator"
"gitlink.org.cn/cloudream/client/internal/task"
)

type StorageService struct {
@@ -18,70 +14,18 @@ func (svc *Service) StorageSvc() *StorageService {
return &StorageService{Service: svc}
}

func (svc *StorageService) MoveObjectToStorage(userID int, objectID int, storageID int) error {
mutex, err := reqbuilder.NewBuilder().
Metadata().
// 用于判断用户是否有Storage权限
UserStorage().ReadOne(objectID, storageID).
// 用于判断用户是否有对象权限
UserBucket().ReadAny().
// 用于读取对象信息
Object().ReadOne(objectID).
// 用于查询Rep配置
ObjectRep().ReadOne(objectID).
// 用于查询Block配置
ObjectBlock().ReadAny().
// 用于创建Move记录
StorageObject().CreateOne(storageID, userID, objectID).
Storage().
// 用于创建对象文件
CreateOneObject(storageID, userID, objectID).
MutexLock(svc.distlock)
if err != nil {
return fmt.Errorf("acquire locks failed, err: %w", err)
}
defer mutex.Unlock()

// 先向协调端请求文件相关的元数据
preMoveResp, err := svc.coordinator.PreMoveObjectToStorage(coormsg.NewPreMoveObjectToStorageBody(objectID, storageID, userID))
if err != nil {
return fmt.Errorf("request to coordinator failed, err: %w", err)
}
if preMoveResp.IsFailed() {
return fmt.Errorf("coordinator PreMoveObjectToStorage failed, code: %s, message: %s", preMoveResp.ErrorCode, preMoveResp.ErrorMessage)
}

// 然后向代理端发送移动文件的请求
agentClient, err := agtcli.NewClient(preMoveResp.Body.NodeID, &config.Cfg().RabbitMQ)
if err != nil {
return fmt.Errorf("create agent client to %d failed, err: %w", preMoveResp.Body.NodeID, err)
}
defer agentClient.Close()

agentMoveResp, err := agentClient.MoveObjectToStorage(
agtmsg.NewMoveObjectToStorageBody(preMoveResp.Body.Directory,
objectID,
userID,
preMoveResp.Body.FileSize,
preMoveResp.Body.Redundancy,
preMoveResp.Body.RedundancyData,
))
if err != nil {
return fmt.Errorf("request to agent %d failed, err: %w", preMoveResp.Body.NodeID, err)
}
if agentMoveResp.IsFailed() {
return fmt.Errorf("agent %d operation failed, code: %s, messsage: %s", preMoveResp.Body.NodeID, agentMoveResp.ErrorCode, agentMoveResp.ErrorMessage)
}
func (svc *StorageService) StartMovingObjectToStorage(userID int, objectID int, storageID int) (string, error) {
tsk := svc.taskMgr.StartNew(task.NewMoveObjectToStorage(userID, objectID, storageID))
return tsk.ID(), nil
}

moveResp, err := svc.coordinator.MoveObjectToStorage(coormsg.NewMoveObjectToStorageBody(objectID, storageID, userID))
if err != nil {
return fmt.Errorf("request to coordinator failed, err: %w", err)
}
if preMoveResp.IsFailed() {
return fmt.Errorf("coordinator MoveObjectToStorage failed, code: %s, message: %s", moveResp.ErrorCode, moveResp.ErrorMessage)
func (svc *StorageService) WaitMovingObjectToStorage(taskID string, waitTimeout time.Duration) (bool, error) {
tsk := svc.taskMgr.FindByID(taskID)
if tsk.WaitTimeout(waitTimeout) {
return true, tsk.Error()
}

return nil
return false, nil
}

func (svc *StorageService) DeleteStorageObject(userID int, objectID int, storageID int) error {


+ 118
- 0
internal/task/move_object_to_storage.go View File

@@ -0,0 +1,118 @@
package task

import (
"fmt"
"time"

"gitlink.org.cn/cloudream/client/internal/config"
"gitlink.org.cn/cloudream/common/pkg/distlock/reqbuilder"

agtcli "gitlink.org.cn/cloudream/rabbitmq/client/agent"
agtmsg "gitlink.org.cn/cloudream/rabbitmq/message/agent"
coormsg "gitlink.org.cn/cloudream/rabbitmq/message/coordinator"
)

type MoveObjectToStorage struct {
userID int
objectID int
storageID int
}

func NewMoveObjectToStorage(userID int, objectID int, storageID int) *MoveObjectToStorage {
return &MoveObjectToStorage{
userID: userID,
objectID: objectID,
storageID: storageID,
}
}

func (t *MoveObjectToStorage) Execute(ctx TaskContext, complete CompleteFn) {
err := t.do(ctx)
complete(err, CompleteOption{
RemovingDelay: time.Minute,
})
}

func (t *MoveObjectToStorage) do(ctx TaskContext) error {
mutex, err := reqbuilder.NewBuilder().
Metadata().
// 用于判断用户是否有Storage权限
UserStorage().ReadOne(t.objectID, t.storageID).
// 用于判断用户是否有对象权限
UserBucket().ReadAny().
// 用于读取对象信息
Object().ReadOne(t.objectID).
// 用于查询Rep配置
ObjectRep().ReadOne(t.objectID).
// 用于查询Block配置
ObjectBlock().ReadAny().
// 用于创建Move记录
StorageObject().CreateOne(t.storageID, t.userID, t.objectID).
Storage().
// 用于创建对象文件
CreateOneObject(t.storageID, t.userID, t.objectID).
MutexLock(ctx.DistLock)
if err != nil {
return fmt.Errorf("acquire locks failed, err: %w", err)
}
defer mutex.Unlock()

// 先向协调端请求文件相关的元数据
preMoveResp, err := ctx.Coordinator.PreMoveObjectToStorage(coormsg.NewPreMoveObjectToStorageBody(t.objectID, t.storageID, t.userID))
if err != nil {
return fmt.Errorf("request to coordinator failed, err: %w", err)
}
if preMoveResp.IsFailed() {
return fmt.Errorf("coordinator PreMoveObjectToStorage failed, code: %s, message: %s", preMoveResp.ErrorCode, preMoveResp.ErrorMessage)
}

// 然后向代理端发送移动文件的请求
agentClient, err := agtcli.NewClient(preMoveResp.Body.NodeID, &config.Cfg().RabbitMQ)
if err != nil {
return fmt.Errorf("create agent client to %d failed, err: %w", preMoveResp.Body.NodeID, err)
}
defer agentClient.Close()

agentMoveResp, err := agentClient.StartMovingObjectToStorage(
agtmsg.NewStartMovingObjectToStorageBody(preMoveResp.Body.Directory,
t.objectID,
t.userID,
preMoveResp.Body.FileSize,
preMoveResp.Body.Redundancy,
preMoveResp.Body.RedundancyData,
))
if err != nil {
return fmt.Errorf("request to agent %d failed, err: %w", preMoveResp.Body.NodeID, err)
}
if agentMoveResp.IsFailed() {
return fmt.Errorf("agent %d operation failed, code: %s, messsage: %s", preMoveResp.Body.NodeID, agentMoveResp.ErrorCode, agentMoveResp.ErrorMessage)
}

for {
waitResp, err := agentClient.WaitMovingObject(agtmsg.NewWaitMovingObjectBody(agentMoveResp.Body.TaskID, int64(time.Second)*5))
if err != nil {
return fmt.Errorf("request to agent %d failed, err: %w", preMoveResp.Body.NodeID, err)
}
if preMoveResp.IsFailed() {
return fmt.Errorf("coordinator WaitMovingObject failed, code: %s, message: %s", waitResp.ErrorCode, waitResp.ErrorMessage)
}

if waitResp.Body.IsComplete {
if waitResp.Body.Error != "" {
return fmt.Errorf("agent moving object: %s", waitResp.Body.Error)
}

break
}
}

moveResp, err := ctx.Coordinator.MoveObjectToStorage(coormsg.NewMoveObjectToStorageBody(t.objectID, t.storageID, t.userID))
if err != nil {
return fmt.Errorf("request to coordinator failed, err: %w", err)
}
if preMoveResp.IsFailed() {
return fmt.Errorf("coordinator MoveObjectToStorage failed, code: %s, message: %s", moveResp.ErrorCode, moveResp.ErrorMessage)
}

return nil
}

+ 34
- 0
internal/task/task.go View File

@@ -0,0 +1,34 @@
package task

import (
distsvc "gitlink.org.cn/cloudream/common/pkg/distlock/service"
"gitlink.org.cn/cloudream/common/pkg/task"
"gitlink.org.cn/cloudream/common/utils/ipfs"
coorcli "gitlink.org.cn/cloudream/rabbitmq/client/coordinator"
)

type TaskContext struct {
IPFS *ipfs.IPFS
DistLock *distsvc.Service
Coordinator *coorcli.Client
}

// 需要在Task结束后主动调用,completing函数将在Manager加锁期间被调用,
// 因此适合进行执行结果的设置
type CompleteFn = task.CompleteFn

type Manager = task.Manager[TaskContext]

type TaskBody = task.TaskBody[TaskContext]

type Task = task.Task[TaskContext]

type CompleteOption = task.CompleteOption

func NewManager(ipfs *ipfs.IPFS, distlock *distsvc.Service, coorCli *coorcli.Client) Manager {
return task.NewManager(TaskContext{
IPFS: ipfs,
DistLock: distlock,
Coordinator: coorCli,
})
}

+ 153
- 0
internal/task/update_rep_object.go View File

@@ -0,0 +1,153 @@
package task

import (
"fmt"
"io"
"time"

"gitlink.org.cn/cloudream/client/internal/config"
"gitlink.org.cn/cloudream/common/pkg/distlock/reqbuilder"
"gitlink.org.cn/cloudream/common/pkg/logger"
mysort "gitlink.org.cn/cloudream/common/utils/sort"

coormsg "gitlink.org.cn/cloudream/rabbitmq/message/coordinator"
)

type UpdateRepObject struct {
userID int
objectID int
file io.ReadCloser
fileSize int64
}

func NewUpdateRepObject(userID int, objectID int, file io.ReadCloser, fileSize int64) *UpdateRepObject {
return &UpdateRepObject{
userID: userID,
objectID: objectID,
file: file,
fileSize: fileSize,
}
}

func (t *UpdateRepObject) Execute(ctx TaskContext, complete CompleteFn) {
err := t.do(ctx)
complete(err, CompleteOption{
RemovingDelay: time.Minute,
})
}

func (t *UpdateRepObject) do(ctx TaskContext) error {
mutex, err := reqbuilder.NewBuilder().
Metadata().
// 用于判断用户是否有对象的权限
UserBucket().ReadAny().
// 用于读取、修改对象信息
Object().WriteOne(t.objectID).
// 用于更新Rep配置
ObjectRep().WriteOne(t.objectID).
// 用于查询可用的上传节点
Node().ReadAny().
// 用于创建Cache记录
Cache().CreateAny().
// 用于修改Move此Object的记录的状态
StorageObject().WriteAny().
MutexLock(ctx.DistLock)
if err != nil {
return fmt.Errorf("acquire locks failed, err: %w", err)
}
defer mutex.Unlock()

preResp, err := ctx.Coordinator.PreUpdateRepObject(coormsg.NewPreUpdateRepObjectBody(
t.objectID,
t.fileSize,
t.userID,
config.Cfg().ExternalIP,
))
if err != nil {
return fmt.Errorf("request to coordinator failed, err: %w", err)
}
if preResp.IsFailed() {
return fmt.Errorf("coordinator PreUpdateRepObject failed, code: %s, message: %s", preResp.ErrorCode, preResp.ErrorMessage)
}

if len(preResp.Body.Nodes) == 0 {
return fmt.Errorf("no node to upload file")
}

// 上传文件的方式优先级:
// 1. 本地IPFS
// 2. 包含了旧文件,且与客户端在同地域的节点
// 3. 不在同地域,但包含了旧文件的节点
// 4. 同地域节点

uploadNode := t.chooseUpdateRepObjectNode(preResp.Body.Nodes)

var fileHash string
uploadedNodeIDs := []int{}
willUploadToNode := true
// 本地有IPFS,则直接从本地IPFS上传
if ctx.IPFS != nil {
logger.Infof("try to use local IPFS to upload file")

fileHash, err = uploadToLocalIPFS(ctx.IPFS, t.file, uploadNode.ID)
if err != nil {
logger.Warnf("upload to local IPFS failed, so try to upload to node %d, err: %s", uploadNode.ID, err.Error())
} else {
willUploadToNode = false
}
}

// 否则发送到agent上传
if willUploadToNode {
// 如果客户端与节点在同一个地域,则使用内网地址连接节点
nodeIP := uploadNode.ExternalIP
if uploadNode.IsSameLocation {
nodeIP = uploadNode.LocalIP

logger.Infof("client and node %d are at the same location, use local ip\n", uploadNode.ID)
}

mutex, err := reqbuilder.NewBuilder().
IPFS().
// 防止上传的副本被清除
CreateAnyRep(uploadNode.ID).
MutexLock(ctx.DistLock)
if err != nil {
return fmt.Errorf("acquire locks failed, err: %w", err)
}
defer mutex.Unlock()

fileHash, err = uploadToNode(t.file, nodeIP)
if err != nil {
return fmt.Errorf("upload to node %s failed, err: %w", nodeIP, err)
}
uploadedNodeIDs = append(uploadedNodeIDs, uploadNode.ID)
}

// 更新Object
updateResp, err := ctx.Coordinator.UpdateRepObject(coormsg.NewUpdateRepObjectBody(t.objectID, fileHash, t.fileSize, uploadedNodeIDs, t.userID))
if err != nil {
return fmt.Errorf("request to coordinator failed, err: %w", err)
}
if updateResp.IsFailed() {
return fmt.Errorf("coordinator UpdateRepObject failed, code: %s, message: %s", updateResp.ErrorCode, updateResp.ErrorMessage)
}

return nil
}

// chooseUploadNode 选择一个上传文件的节点
// 1. 从与当前客户端相同地域的节点中随机选一个
// 2. 没有用的话从所有节点中随机选一个
func (t *UpdateRepObject) chooseUpdateRepObjectNode(nodes []coormsg.PreUpdateRepObjectRespNode) coormsg.PreUpdateRepObjectRespNode {
mysort.Sort(nodes, func(left, right coormsg.PreUpdateRepObjectRespNode) int {
v := -mysort.CmpBool(left.HasOldObject, right.HasOldObject)
if v != 0 {
return v
}

return -mysort.CmpBool(left.IsSameLocation, right.IsSameLocation)
})

return nodes[0]
}

internal/services/object_test.go → internal/task/update_rep_object_test.go View File

@@ -1,4 +1,4 @@
package services
package task

import (
"testing"
@@ -44,10 +44,10 @@ func Test_chooseUpdateRepObjectNode(t *testing.T) {
},
}

var svc ObjectService
var tsk UpdateRepObject
for _, test := range testcases {
Convey(test.title, t, func() {
chooseNode := svc.chooseUpdateRepObjectNode(test.nodes)
chooseNode := tsk.chooseUpdateRepObjectNode(test.nodes)
So(chooseNode.ID, ShouldEqual, test.wantNodeID)
})
}

+ 239
- 0
internal/task/upload_rep_object.go View File

@@ -0,0 +1,239 @@
package task

import (
"fmt"
"io"
"math/rand"
"time"

"github.com/samber/lo"
"gitlink.org.cn/cloudream/client/internal/config"
"gitlink.org.cn/cloudream/common/pkg/distlock/reqbuilder"
"gitlink.org.cn/cloudream/common/pkg/logger"
mygrpc "gitlink.org.cn/cloudream/common/utils/grpc"
"gitlink.org.cn/cloudream/common/utils/ipfs"

agentcaller "gitlink.org.cn/cloudream/proto"
agtcli "gitlink.org.cn/cloudream/rabbitmq/client/agent"
ramsg "gitlink.org.cn/cloudream/rabbitmq/message"
agtmsg "gitlink.org.cn/cloudream/rabbitmq/message/agent"
coormsg "gitlink.org.cn/cloudream/rabbitmq/message/coordinator"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

type UploadRepObject struct {
userID int
bucketID int
objectName string
file io.ReadCloser
fileSize int64
repCount int

ResultFileHash string
}

func NewUploadRepObject(userID int, bucketID int, objectName string, file io.ReadCloser, fileSize int64, repCount int) *UploadRepObject {
return &UploadRepObject{
userID: userID,
bucketID: bucketID,
objectName: objectName,
file: file,
fileSize: fileSize,
repCount: repCount,
}
}

func (t *UploadRepObject) Execute(ctx TaskContext, complete CompleteFn) {
fileHash, err := t.do(ctx)
t.ResultFileHash = fileHash
complete(err, CompleteOption{
RemovingDelay: time.Minute,
})
}

func (t *UploadRepObject) do(ctx TaskContext) (string, error) {
mutex, err := reqbuilder.NewBuilder().
Metadata().
// 用于判断用户是否有桶的权限
UserBucket().ReadOne(t.userID, t.bucketID).
// 用于防止创建了多个同名对象
Object().CreateOne(t.bucketID, t.objectName).
// 用于查询可用的上传节点
Node().ReadAny().
// 用于设置Rep配置
ObjectRep().CreateAny().
// 用于创建Cache记录
Cache().CreateAny().
MutexLock(ctx.DistLock)
if err != nil {
return "", fmt.Errorf("acquire locks failed, err: %w", err)
}
defer mutex.Unlock()

//发送写请求,请求Coor分配写入节点Ip
repWriteResp, err := ctx.Coordinator.PreUploadRepObject(coormsg.NewPreUploadRepObjectBody(t.bucketID, t.objectName, t.fileSize, t.userID, config.Cfg().ExternalIP))
if err != nil {
return "", fmt.Errorf("request to coordinator failed, err: %w", err)
}
if repWriteResp.IsFailed() {
return "", fmt.Errorf("coordinator RepWrite failed, code: %s, message: %s", repWriteResp.ErrorCode, repWriteResp.ErrorMessage)
}

if len(repWriteResp.Body.Nodes) == 0 {
return "", fmt.Errorf("no node to upload file")
}

uploadNode := t.chooseUploadNode(repWriteResp.Body.Nodes)

var fileHash string
uploadedNodeIDs := []int{}
willUploadToNode := true
// 本地有IPFS,则直接从本地IPFS上传
if ctx.IPFS != nil {
logger.Infof("try to use local IPFS to upload file")

fileHash, err = uploadToLocalIPFS(ctx.IPFS, t.file, uploadNode.ID)
if err != nil {
logger.Warnf("upload to local IPFS failed, so try to upload to node %d, err: %s", uploadNode.ID, err.Error())
} else {
willUploadToNode = false
}
}

// 否则发送到agent上传
if willUploadToNode {
// 如果客户端与节点在同一个地域,则使用内网地址连接节点
nodeIP := uploadNode.ExternalIP
if uploadNode.IsSameLocation {
nodeIP = uploadNode.LocalIP

logger.Infof("client and node %d are at the same location, use local ip\n", uploadNode.ID)
}

mutex, err := reqbuilder.NewBuilder().
// 防止上传的副本被清除
IPFS().CreateAnyRep(uploadNode.ID).
MutexLock(ctx.DistLock)
if err != nil {
return "", fmt.Errorf("acquire locks failed, err: %w", err)
}
defer mutex.Unlock()

fileHash, err = uploadToNode(t.file, nodeIP)
if err != nil {
return "", fmt.Errorf("upload to node %s failed, err: %w", nodeIP, err)
}
uploadedNodeIDs = append(uploadedNodeIDs, uploadNode.ID)
}

// 记录写入的文件的Hash
createObjectResp, err := ctx.Coordinator.CreateRepObject(coormsg.NewCreateRepObjectBody(t.bucketID, t.objectName, t.fileSize, t.repCount, t.userID, uploadedNodeIDs, fileHash))
if err != nil {
return "", fmt.Errorf("request to coordinator failed, err: %w", err)
}
if createObjectResp.IsFailed() {
return "", fmt.Errorf("coordinator CreateRepObject failed, code: %s, message: %s", createObjectResp.ErrorCode, createObjectResp.ErrorMessage)
}

return fileHash, nil
}

// chooseUploadNode 选择一个上传文件的节点
// 1. 从与当前客户端相同地域的节点中随机选一个
// 2. 没有用的话从所有节点中随机选一个
func (t *UploadRepObject) chooseUploadNode(nodes []ramsg.RespNode) ramsg.RespNode {
sameLocationNodes := lo.Filter(nodes, func(e ramsg.RespNode, i int) bool { return e.IsSameLocation })
if len(sameLocationNodes) > 0 {
return sameLocationNodes[rand.Intn(len(sameLocationNodes))]
}

return nodes[rand.Intn(len(nodes))]
}

func uploadToNode(file io.ReadCloser, nodeIP string) (string, error) {
// 建立grpc连接,发送请求
grpcAddr := fmt.Sprintf("%s:%d", nodeIP, config.Cfg().GRPCPort)
grpcCon, err := grpc.Dial(grpcAddr, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return "", fmt.Errorf("connect to grpc server at %s failed, err: %w", grpcAddr, err)
}
defer grpcCon.Close()

client := agentcaller.NewFileTransportClient(grpcCon)
upload, err := mygrpc.SendFileAsStream(client)
if err != nil {
return "", fmt.Errorf("request to send file failed, err: %w", err)
}

// 发送文件数据
_, err = io.Copy(upload, file)
if err != nil {
// 发生错误则关闭连接
upload.Abort(io.ErrClosedPipe)
return "", fmt.Errorf("copy file date to upload stream failed, err: %w", err)
}

// 发送EOF消息,并获得FileHash
fileHash, err := upload.Finish()
if err != nil {
upload.Abort(io.ErrClosedPipe)
return "", fmt.Errorf("send EOF failed, err: %w", err)
}

return fileHash, nil
}

func uploadToLocalIPFS(ipfs *ipfs.IPFS, file io.ReadCloser, nodeID int) (string, error) {
// 从本地IPFS上传文件
writer, err := ipfs.CreateFile()
if err != nil {
return "", fmt.Errorf("create IPFS file failed, err: %w", err)
}

_, err = io.Copy(writer, file)
if err != nil {
return "", fmt.Errorf("copy file data to IPFS failed, err: %w", err)
}

fileHash, err := writer.Finish()
if err != nil {
return "", fmt.Errorf("finish writing IPFS failed, err: %w", err)
}

// 然后让最近节点pin本地上传的文件
agentClient, err := agtcli.NewClient(nodeID, &config.Cfg().RabbitMQ)
if err != nil {
return "", fmt.Errorf("create agent client to %d failed, err: %w", nodeID, err)
}
defer agentClient.Close()

pinObjResp, err := agentClient.StartPinningObject(agtmsg.NewStartPinningObjectBody(fileHash))
if err != nil {
return "", fmt.Errorf("request to agent %d failed, err: %w", nodeID, err)
}
if pinObjResp.IsFailed() {
return "", fmt.Errorf("agent %d PinObject failed, code: %s, message: %s", nodeID, pinObjResp.ErrorCode, pinObjResp.ErrorMessage)
}

for {
waitResp, err := agentClient.WaitPinningObject(agtmsg.NewWaitPinningObjectBody(pinObjResp.Body.TaskID, int64(time.Second)*5))
if err != nil {
return "", fmt.Errorf("request to agent %d failed, err: %w", nodeID, err)
}
if waitResp.IsFailed() {
return "", fmt.Errorf("agent %d WaitPinningObject failed, code: %s, message: %s", nodeID, waitResp.ErrorCode, waitResp.ErrorMessage)
}

if waitResp.Body.IsComplete {
if waitResp.Body.Error != "" {
return "", fmt.Errorf("agent pinning object: %s", waitResp.Body.Error)
}

break
}
}

return fileHash, nil
}

+ 5
- 2
main.go View File

@@ -9,6 +9,7 @@ import (
"gitlink.org.cn/cloudream/client/internal/cmdline"
"gitlink.org.cn/cloudream/client/internal/config"
"gitlink.org.cn/cloudream/client/internal/services"
"gitlink.org.cn/cloudream/client/internal/task"
distlocksvc "gitlink.org.cn/cloudream/common/pkg/distlock/service"
log "gitlink.org.cn/cloudream/common/pkg/logger"
"gitlink.org.cn/cloudream/common/utils/ipfs"
@@ -59,13 +60,15 @@ func main() {
}
go serveDistLock(distlockSvc)

svc, err := services.NewService(coorClient, ipfsCli, scanner, distlockSvc)
taskMgr := task.NewManager(ipfsCli, distlockSvc, coorClient)

svc, err := services.NewService(coorClient, ipfsCli, scanner, distlockSvc, &taskMgr)
if err != nil {
log.Warnf("new services failed, err: %s", err.Error())
os.Exit(1)
}

cmds, err := cmdline.NewCommandline(svc, distlockSvc)
cmds, err := cmdline.NewCommandline(svc, distlockSvc, ipfsCli)
if err != nil {
log.Warnf("new command line failed, err: %s", err.Error())
os.Exit(1)


Loading…
Cancel
Save