Browse Source

Merge remote-tracking branch 'origin/mv_cli'

gitlink
Sydonian 2 years ago
parent
commit
0257dca072
34 changed files with 2542 additions and 0 deletions
  1. +2
    -0
      client/README.md
  2. +101
    -0
      client/go.mod
  3. +251
    -0
      client/go.sum
  4. +60
    -0
      client/internal/cmdline/bucket.go
  5. +32
    -0
      client/internal/cmdline/cache.go
  6. +40
    -0
      client/internal/cmdline/commandline.go
  7. +72
    -0
      client/internal/cmdline/distlock.go
  8. +313
    -0
      client/internal/cmdline/package.go
  9. +41
    -0
      client/internal/cmdline/scanner.go
  10. +30
    -0
      client/internal/cmdline/serve.go
  11. +60
    -0
      client/internal/cmdline/storage.go
  12. +32
    -0
      client/internal/config/config.go
  13. +64
    -0
      client/internal/http/cacah.go
  14. +24
    -0
      client/internal/http/http.go
  15. +70
    -0
      client/internal/http/object.go
  16. +234
    -0
      client/internal/http/package.go
  17. +52
    -0
      client/internal/http/server.go
  18. +119
    -0
      client/internal/http/storage.go
  19. +9
    -0
      client/internal/services/agent.go
  20. +113
    -0
      client/internal/services/bucket.go
  21. +55
    -0
      client/internal/services/cacah.go
  22. +15
    -0
      client/internal/services/object.go
  23. +249
    -0
      client/internal/services/package.go
  24. +30
    -0
      client/internal/services/scanner.go
  25. +18
    -0
      client/internal/services/service.go
  26. +90
    -0
      client/internal/services/storage.go
  27. +34
    -0
      client/internal/task/create_ec_package.go
  28. +34
    -0
      client/internal/task/create_rep_package.go
  29. +108
    -0
      client/internal/task/storage_load_package.go
  30. +28
    -0
      client/internal/task/task.go
  31. +34
    -0
      client/internal/task/update_ec_package.go
  32. +34
    -0
      client/internal/task/update_rep_package.go
  33. +20
    -0
      client/magefiles/magefile.go
  34. +74
    -0
      client/main.go

+ 2
- 0
client/README.md View File

@@ -0,0 +1,2 @@
# storage-client


+ 101
- 0
client/go.mod View File

@@ -0,0 +1,101 @@
module gitlink.org.cn/cloudream/storage-client

go 1.20

require (
github.com/gin-gonic/gin v1.9.1
github.com/jedib0t/go-pretty/v6 v6.4.6
github.com/samber/lo v1.38.1
github.com/smartystreets/goconvey v1.8.0
gitlink.org.cn/cloudream/common v0.0.0
gitlink.org.cn/cloudream/storage-common v0.0.0
google.golang.org/grpc v1.54.0
)

require (
github.com/antonfisher/nested-logrus-formatter v1.3.1 // indirect
github.com/baohan10/reedsolomon v0.0.0-20230406042632-43574cac9fa7 // indirect
github.com/beevik/etree v1.2.0 // indirect
github.com/benbjohnson/clock v1.3.0 // indirect
github.com/bytedance/sonic v1.9.1 // indirect
github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 // indirect
github.com/coreos/go-semver v0.3.0 // indirect
github.com/coreos/go-systemd/v22 v22.5.0 // indirect
github.com/crackcomm/go-gitignore v0.0.0-20170627025303-887ab5e44cc3 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.1.0 // indirect
github.com/gabriel-vasile/mimetype v1.4.2 // indirect
github.com/gin-contrib/sse v0.1.0 // indirect
github.com/go-ping/ping v1.1.0 // indirect
github.com/go-playground/locales v0.14.1 // indirect
github.com/go-playground/universal-translator v0.18.1 // indirect
github.com/go-playground/validator/v10 v10.14.0 // indirect
github.com/goccy/go-json v0.10.2 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/go-cmp v0.5.9 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/gopherjs/gopherjs v1.17.2 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/imdario/mergo v0.3.15 // indirect
github.com/ipfs/boxo v0.8.0 // indirect
github.com/ipfs/go-cid v0.4.0 // indirect
github.com/ipfs/go-ipfs-api v0.6.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/jtolds/gls v4.20.0+incompatible // indirect
github.com/klauspost/cpuid/v2 v2.2.4 // indirect
github.com/leodido/go-urn v1.2.4 // indirect
github.com/libp2p/go-buffer-pool v0.1.0 // indirect
github.com/libp2p/go-flow-metrics v0.1.0 // indirect
github.com/libp2p/go-libp2p v0.26.3 // indirect
github.com/magefile/mage v1.15.0 // indirect
github.com/mattn/go-isatty v0.0.19 // indirect
github.com/mattn/go-runewidth v0.0.13 // indirect
github.com/minio/sha256-simd v1.0.0 // indirect
github.com/mitchellh/go-homedir v1.1.0 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/mr-tron/base58 v1.2.0 // indirect
github.com/multiformats/go-base32 v0.1.0 // indirect
github.com/multiformats/go-base36 v0.2.0 // indirect
github.com/multiformats/go-multiaddr v0.8.0 // indirect
github.com/multiformats/go-multibase v0.1.1 // indirect
github.com/multiformats/go-multicodec v0.8.1 // indirect
github.com/multiformats/go-multihash v0.2.1 // indirect
github.com/multiformats/go-multistream v0.4.1 // indirect
github.com/multiformats/go-varint v0.0.7 // indirect
github.com/otiai10/copy v1.12.0 // indirect
github.com/pelletier/go-toml/v2 v2.0.8 // indirect
github.com/rivo/uniseg v0.2.0 // indirect
github.com/sirupsen/logrus v1.9.2 // indirect
github.com/smartystreets/assertions v1.13.1 // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
github.com/streadway/amqp v1.1.0 // indirect
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
github.com/ugorji/go/codec v1.2.11 // indirect
github.com/whyrusleeping/tar-utils v0.0.0-20180509141711-8c6c8ba81d5c // indirect
github.com/zyedidia/generic v1.2.1 // indirect
go.etcd.io/etcd/api/v3 v3.5.9 // indirect
go.etcd.io/etcd/client/pkg/v3 v3.5.9 // indirect
go.etcd.io/etcd/client/v3 v3.5.9 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/multierr v1.9.0 // indirect
go.uber.org/zap v1.24.0 // indirect
golang.org/x/arch v0.3.0 // indirect
golang.org/x/crypto v0.9.0 // indirect
golang.org/x/exp v0.0.0-20230519143937-03e91628a987 // indirect
golang.org/x/net v0.10.0 // indirect
golang.org/x/sync v0.1.0 // indirect
golang.org/x/sys v0.8.0 // indirect
golang.org/x/text v0.9.0 // indirect
google.golang.org/genproto v0.0.0-20230403163135-c38d8f061ccd // indirect
google.golang.org/protobuf v1.30.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
lukechampine.com/blake3 v1.1.7 // indirect
)

// 运行go mod tidy时需要将下面几行取消注释
replace gitlink.org.cn/cloudream/common => ../../common

replace gitlink.org.cn/cloudream/storage-common => ../storage-common

+ 251
- 0
client/go.sum View File

@@ -0,0 +1,251 @@
github.com/antonfisher/nested-logrus-formatter v1.3.1 h1:NFJIr+pzwv5QLHTPyKz9UMEoHck02Q9L0FP13b/xSbQ=
github.com/antonfisher/nested-logrus-formatter v1.3.1/go.mod h1:6WTfyWFkBc9+zyBaKIqRrg/KwMqBbodBjgbHjDz7zjA=
github.com/baohan10/reedsolomon v0.0.0-20230406042632-43574cac9fa7 h1:wcvD6enR///dFvb9cRodx5SGbPH4G4jPjw+aVIWkAKE=
github.com/baohan10/reedsolomon v0.0.0-20230406042632-43574cac9fa7/go.mod h1:rAxMF6pVaFK/s6T4gGczvloccNbtwzuYaP2Y7W6flE8=
github.com/beevik/etree v1.2.0 h1:l7WETslUG/T+xOPs47dtd6jov2Ii/8/OjCldk5fYfQw=
github.com/beevik/etree v1.2.0/go.mod h1:aiPf89g/1k3AShMVAzriilpcE4R/Vuor90y83zVZWFc=
github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A=
github.com/benbjohnson/clock v1.3.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/bytedance/sonic v1.5.0/go.mod h1:ED5hyg4y6t3/9Ku1R6dU/4KyJ48DZ4jPhfY1O2AihPM=
github.com/bytedance/sonic v1.9.1 h1:6iJ6NqdoxCDr6mbY8h18oSO+cShGSMRGCEo7F2h0x8s=
github.com/bytedance/sonic v1.9.1/go.mod h1:i736AoUSYt75HyZLoJW9ERYxcy6eaN6h4BZXU064P/U=
github.com/cheekybits/is v0.0.0-20150225183255-68e9c0620927 h1:SKI1/fuSdodxmNNyVBR8d7X/HuLnRpvvFO0AgyQk764=
github.com/chenzhuoyu/base64x v0.0.0-20211019084208-fb5309c8db06/go.mod h1:DH46F32mSOjUmXrMHnKwZdA8wcEefY7UVqBKYGjpdQY=
github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 h1:qSGYFH7+jGhDF8vLC+iwCD4WpbV1EBDSzWkJODFLams=
github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311/go.mod h1:b583jCggY9gE99b6G5LEC39OIiVsWj+R97kbl5odCEk=
github.com/coreos/go-semver v0.3.0 h1:wkHLiw0WNATZnSG7epLsujiMCgPAc9xhjJ4tgnAxmfM=
github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
github.com/coreos/go-systemd/v22 v22.5.0 h1:RrqgGjYQKalulkV8NGVIfkXQf6YYmOyiJKk8iXXhfZs=
github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
github.com/crackcomm/go-gitignore v0.0.0-20170627025303-887ab5e44cc3 h1:HVTnpeuvF6Owjd5mniCL8DEXo7uYXdQEmOP4FJbV5tg=
github.com/crackcomm/go-gitignore v0.0.0-20170627025303-887ab5e44cc3/go.mod h1:p1d6YEZWvFzEh4KLyvBcVSnrfNDDvK2zfK/4x2v/4pE=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/decred/dcrd/crypto/blake256 v1.0.0 h1:/8DMNYp9SGi5f0w7uCm6d6M4OU2rGFK09Y2A4Xv7EE0=
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.1.0 h1:HbphB4TFFXpv7MNrT52FGrrgVXF1owhMVTHFZIlnvd4=
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.1.0/go.mod h1:DZGJHZMqrU4JJqFAWUS2UO1+lbSKsdiOoYi9Zzey7Fc=
github.com/gabriel-vasile/mimetype v1.4.2 h1:w5qFW6JKBz9Y393Y4q372O9A7cUSequkh1Q7OhCmWKU=
github.com/gabriel-vasile/mimetype v1.4.2/go.mod h1:zApsH/mKG4w07erKIaJPFiX0Tsq9BFQgN3qGY5GnNgA=
github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE=
github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm+fLHvGI=
github.com/gin-gonic/gin v1.9.1 h1:4idEAncQnU5cB7BeOkPtxjfCSye0AAm1R0RVIqJ+Jmg=
github.com/gin-gonic/gin v1.9.1/go.mod h1:hPrL7YrpYKXt5YId3A/Tnip5kqbEAP+KLuI3SUcPTeU=
github.com/go-ping/ping v1.1.0 h1:3MCGhVX4fyEUuhsfwPrsEdQw6xspHkv5zHsiSoDFZYw=
github.com/go-ping/ping v1.1.0/go.mod h1:xIFjORFzTxqIV/tDVGO4eDy/bLuSyawEeojSm3GfRGk=
github.com/go-playground/assert/v2 v2.2.0 h1:JvknZsQTYeFEAhQwI4qEt9cyV5ONwRHC+lYKSsYSR8s=
github.com/go-playground/locales v0.14.1 h1:EWaQ/wswjilfKLTECiXz7Rh+3BjFhfDFKv/oXslEjJA=
github.com/go-playground/locales v0.14.1/go.mod h1:hxrqLVvrK65+Rwrd5Fc6F2O76J/NuW9t0sjnWqG1slY=
github.com/go-playground/universal-translator v0.18.1 h1:Bcnm0ZwsGyWbCzImXv+pAJnYK9S473LQFuzCbDbfSFY=
github.com/go-playground/universal-translator v0.18.1/go.mod h1:xekY+UJKNuX9WP91TpwSH2VMlDf28Uj24BCp08ZFTUY=
github.com/go-playground/validator/v10 v10.14.0 h1:vgvQWe3XCz3gIeFDm/HnTIbj6UGmg/+t63MyGU2n5js=
github.com/go-playground/validator/v10 v10.14.0/go.mod h1:9iXMNT7sEkjXb0I+enO7QXmzG6QCsPWY4zveKFVRSyU=
github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU=
github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gopherjs/gopherjs v1.17.2 h1:fQnZVsXk8uxXIStYb0N4bGk7jeyTalG/wsZjQ25dO0g=
github.com/gopherjs/gopherjs v1.17.2/go.mod h1:pRRIvn/QzFLrKfvEz3qUuEhtE/zLCWfreZ6J5gM2i+k=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I=
github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM=
github.com/imdario/mergo v0.3.15 h1:M8XP7IuFNsqUx6VPK2P9OSmsYsI/YFaGil0uD21V3dM=
github.com/imdario/mergo v0.3.15/go.mod h1:WBLT9ZmE3lPoWsEzCh9LPo3TiwVN+ZKEjmz+hD27ysY=
github.com/ipfs/boxo v0.8.0 h1:UdjAJmHzQHo/j3g3b1bAcAXCj/GM6iTwvSlBDvPBNBs=
github.com/ipfs/boxo v0.8.0/go.mod h1:RIsi4CnTyQ7AUsNn5gXljJYZlQrHBMnJp94p73liFiA=
github.com/ipfs/go-cid v0.4.0 h1:a4pdZq0sx6ZSxbCizebnKiMCx/xI/aBBFlB73IgH4rA=
github.com/ipfs/go-cid v0.4.0/go.mod h1:uQHwDeX4c6CtyrFwdqyhpNcxVewur1M7l7fNU7LKwZk=
github.com/ipfs/go-ipfs-api v0.6.0 h1:JARgG0VTbjyVhO5ZfesnbXv9wTcMvoKRBLF1SzJqzmg=
github.com/ipfs/go-ipfs-api v0.6.0/go.mod h1:iDC2VMwN9LUpQV/GzEeZ2zNqd8NUdRmWcFM+K/6odf0=
github.com/jedib0t/go-pretty/v6 v6.4.6 h1:v6aG9h6Uby3IusSSEjHaZNXpHFhzqMmjXcPq1Rjl9Jw=
github.com/jedib0t/go-pretty/v6 v6.4.6/go.mod h1:Ndk3ase2CkQbXLLNf5QDHoYb6J9WtVfmHZu9n8rk2xs=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo=
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/cpuid/v2 v2.0.4/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
github.com/klauspost/cpuid/v2 v2.2.4 h1:acbojRNwl3o09bUq+yDCtZFc1aiwaAAxtcn8YkZXnvk=
github.com/klauspost/cpuid/v2 v2.2.4/go.mod h1:RVVoqg1df56z8g3pUjL/3lE5UfnlrJX8tyFgg4nqhuY=
github.com/leodido/go-urn v1.2.4 h1:XlAE/cm/ms7TE/VMVoduSpNBoyc2dOxHs5MZSwAN63Q=
github.com/leodido/go-urn v1.2.4/go.mod h1:7ZrI8mTSeBSHl/UaRyKQW1qZeMgak41ANeCNaVckg+4=
github.com/libp2p/go-buffer-pool v0.1.0 h1:oK4mSFcQz7cTQIfqbe4MIj9gLW+mnanjyFtc6cdF0Y8=
github.com/libp2p/go-buffer-pool v0.1.0/go.mod h1:N+vh8gMqimBzdKkSMVuydVDq+UV5QTWy5HSiZacSbPg=
github.com/libp2p/go-flow-metrics v0.1.0 h1:0iPhMI8PskQwzh57jB9WxIuIOQ0r+15PChFGkx3Q3WM=
github.com/libp2p/go-flow-metrics v0.1.0/go.mod h1:4Xi8MX8wj5aWNDAZttg6UPmc0ZrnFNsMtpsYUClFtro=
github.com/libp2p/go-libp2p v0.26.3 h1:6g/psubqwdaBqNNoidbRKSTBEYgaOuKBhHl8Q5tO+PM=
github.com/libp2p/go-libp2p v0.26.3/go.mod h1:x75BN32YbwuY0Awm2Uix4d4KOz+/4piInkp4Wr3yOo8=
github.com/magefile/mage v1.15.0 h1:BvGheCMAsG3bWUDbZ8AyXXpCNwU9u5CB6sM+HNb9HYg=
github.com/magefile/mage v1.15.0/go.mod h1:z5UZb/iS3GoOSn0JgWuiw7dxlurVYTu+/jHXqQg881A=
github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA=
github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/mattn/go-runewidth v0.0.13 h1:lTGmDsbAYt5DmK6OnoV7EuIF1wEIFAcxld6ypU4OSgU=
github.com/mattn/go-runewidth v0.0.13/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w=
github.com/minio/sha256-simd v1.0.0 h1:v1ta+49hkWZyvaKwrQB8elexRqm6Y0aMLjCNsrYxo6g=
github.com/minio/sha256-simd v1.0.0/go.mod h1:OuYzVNI5vcoYIAmbIvHPl3N3jUzVedXbKy5RFepssQM=
github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y=
github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY=
github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/mr-tron/base58 v1.2.0 h1:T/HDJBh4ZCPbU39/+c3rRvE0uKBQlU27+QI8LJ4t64o=
github.com/mr-tron/base58 v1.2.0/go.mod h1:BinMc/sQntlIE1frQmRFPUoPA1Zkr8VRgBdjWI2mNwc=
github.com/multiformats/go-base32 v0.1.0 h1:pVx9xoSPqEIQG8o+UbAe7DNi51oej1NtK+aGkbLYxPE=
github.com/multiformats/go-base32 v0.1.0/go.mod h1:Kj3tFY6zNr+ABYMqeUNeGvkIC/UYgtWibDcT0rExnbI=
github.com/multiformats/go-base36 v0.2.0 h1:lFsAbNOGeKtuKozrtBsAkSVhv1p9D0/qedU9rQyccr0=
github.com/multiformats/go-base36 v0.2.0/go.mod h1:qvnKE++v+2MWCfePClUEjE78Z7P2a1UV0xHgWc0hkp4=
github.com/multiformats/go-multiaddr v0.8.0 h1:aqjksEcqK+iD/Foe1RRFsGZh8+XFiGo7FgUCZlpv3LU=
github.com/multiformats/go-multiaddr v0.8.0/go.mod h1:Fs50eBDWvZu+l3/9S6xAE7ZYj6yhxlvaVZjakWN7xRs=
github.com/multiformats/go-multibase v0.1.1 h1:3ASCDsuLX8+j4kx58qnJ4YFq/JWTJpCyDW27ztsVTOI=
github.com/multiformats/go-multibase v0.1.1/go.mod h1:ZEjHE+IsUrgp5mhlEAYjMtZwK1k4haNkcaPg9aoe1a8=
github.com/multiformats/go-multicodec v0.8.1 h1:ycepHwavHafh3grIbR1jIXnKCsFm0fqsfEOsJ8NtKE8=
github.com/multiformats/go-multicodec v0.8.1/go.mod h1:L3QTQvMIaVBkXOXXtVmYE+LI16i14xuaojr/H7Ai54k=
github.com/multiformats/go-multihash v0.2.1 h1:aem8ZT0VA2nCHHk7bPJ1BjUbHNciqZC/d16Vve9l108=
github.com/multiformats/go-multihash v0.2.1/go.mod h1:WxoMcYG85AZVQUyRyo9s4wULvW5qrI9vb2Lt6evduFc=
github.com/multiformats/go-multistream v0.4.1 h1:rFy0Iiyn3YT0asivDUIR05leAdwZq3de4741sbiSdfo=
github.com/multiformats/go-multistream v0.4.1/go.mod h1:Mz5eykRVAjJWckE2U78c6xqdtyNUEhKSM0Lwar2p77Q=
github.com/multiformats/go-varint v0.0.7 h1:sWSGR+f/eu5ABZA2ZpYKBILXTTs9JWpdEM/nEGOHFS8=
github.com/multiformats/go-varint v0.0.7/go.mod h1:r8PUYw/fD/SjBCiKOoDlGF6QawOELpZAu9eioSos/OU=
github.com/otiai10/copy v1.12.0 h1:cLMgSQnXBs1eehF0Wy/FAGsgDTDmAqFR7rQylBb1nDY=
github.com/otiai10/copy v1.12.0/go.mod h1:rSaLseMUsZFFbsFGc7wCJnnkTAvdc5L6VWxPE4308Ww=
github.com/otiai10/mint v1.5.1 h1:XaPLeE+9vGbuyEHem1JNk3bYc7KKqyI/na0/mLd/Kks=
github.com/pelletier/go-toml/v2 v2.0.8 h1:0ctb6s9mE31h0/lhu+J6OPmVeDxJn+kYnJc2jZR9tGQ=
github.com/pelletier/go-toml/v2 v2.0.8/go.mod h1:vuYfssBdrU2XDZ9bYydBu6t+6a6PYNcZljzZR9VXg+4=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/profile v1.6.0/go.mod h1:qBsxPvzyUincmltOk6iyRVxHYg4adc0OFOv72ZdLa18=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY=
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
github.com/samber/lo v1.38.1 h1:j2XEAqXKb09Am4ebOg31SpvzUTTs6EN3VfgeLUhPdXM=
github.com/samber/lo v1.38.1/go.mod h1:+m/ZKRl6ClXCE2Lgf3MsQlWfh4bn1bz6CXEOxnEXnEA=
github.com/sirupsen/logrus v1.9.2 h1:oxx1eChJGI6Uks2ZC4W1zpLlVgqB8ner4EuQwV4Ik1Y=
github.com/sirupsen/logrus v1.9.2/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
github.com/smartystreets/assertions v1.13.1 h1:Ef7KhSmjZcK6AVf9YbJdvPYG9avaF0ZxudX+ThRdWfU=
github.com/smartystreets/assertions v1.13.1/go.mod h1:cXr/IwVfSo/RbCSPhoAPv73p3hlSdrBH/b3SdnW/LMY=
github.com/smartystreets/goconvey v1.8.0 h1:Oi49ha/2MURE0WexF052Z0m+BNSGirfjg5RL+JXWq3w=
github.com/smartystreets/goconvey v1.8.0/go.mod h1:EdX8jtrTIj26jmjCOVNMVSIYAtgexqXKHOXW2Dx9JLg=
github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI=
github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/streadway/amqp v1.1.0 h1:py12iX8XSyI7aN/3dUT8DFIDJazNJsVJdxNVEpnQTZM=
github.com/streadway/amqp v1.1.0/go.mod h1:WYSrTEYHOXHd0nwFeUXAe2G2hRnQT+deZJJf88uS9Bg=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.4/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.8.3 h1:RP3t2pwF7cMEbC1dqtB6poj3niw/9gnV4Cjg5oW5gtY=
github.com/stretchr/testify v1.8.3/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI=
github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08=
github.com/ugorji/go/codec v1.2.11 h1:BMaWp1Bb6fHwEtbplGBGJ498wD+LKlNSl25MjdZY4dU=
github.com/ugorji/go/codec v1.2.11/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg=
github.com/whyrusleeping/tar-utils v0.0.0-20180509141711-8c6c8ba81d5c h1:GGsyl0dZ2jJgVT+VvWBf/cNijrHRhkrTjkmp5wg7li0=
github.com/whyrusleeping/tar-utils v0.0.0-20180509141711-8c6c8ba81d5c/go.mod h1:xxcJeBb7SIUl/Wzkz1eVKJE/CB34YNrqX2TQI6jY9zs=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/zyedidia/generic v1.2.1 h1:Zv5KS/N2m0XZZiuLS82qheRG4X1o5gsWreGb0hR7XDc=
github.com/zyedidia/generic v1.2.1/go.mod h1:ly2RBz4mnz1yeuVbQA/VFwGjK3mnHGRj1JuoG336Bis=
go.etcd.io/etcd/api/v3 v3.5.9 h1:4wSsluwyTbGGmyjJktOf3wFQoTBIURXHnq9n/G/JQHs=
go.etcd.io/etcd/api/v3 v3.5.9/go.mod h1:uyAal843mC8uUVSLWz6eHa/d971iDGnCRpmKd2Z+X8k=
go.etcd.io/etcd/client/pkg/v3 v3.5.9 h1:oidDC4+YEuSIQbsR94rY9gur91UPL6DnxDCIYd2IGsE=
go.etcd.io/etcd/client/pkg/v3 v3.5.9/go.mod h1:y+CzeSmkMpWN2Jyu1npecjB9BBnABxGM4pN8cGuJeL4=
go.etcd.io/etcd/client/v3 v3.5.9 h1:r5xghnU7CwbUxD/fbUtRyJGaYNfDun8sp/gTr1hew6E=
go.etcd.io/etcd/client/v3 v3.5.9/go.mod h1:i/Eo5LrZ5IKqpbtpPDuaUnDOUv471oDg8cjQaUr2MbA=
go.uber.org/atomic v1.10.0 h1:9qC72Qh0+3MqyJbAn8YU5xVq1frD8bn3JtD2oXtafVQ=
go.uber.org/atomic v1.10.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA=
go.uber.org/multierr v1.9.0 h1:7fIwc/ZtS0q++VgcfqFDxSBZVv/Xo49/SYnDFupUwlI=
go.uber.org/multierr v1.9.0/go.mod h1:X2jQV1h+kxSjClGpnseKVIxpmcjrj7MNnI0bnlfKTVQ=
go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60=
go.uber.org/zap v1.24.0/go.mod h1:2kMP+WWQ8aoFoedH3T2sq6iJ2yDWpHbP0f6MQbS9Gkg=
golang.org/x/arch v0.0.0-20210923205945-b76863e36670/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8=
golang.org/x/arch v0.3.0 h1:02VY4/ZcO/gBOH6PUaoiptASxtXU10jazRCP865E97k=
golang.org/x/arch v0.3.0/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.9.0 h1:LF6fAI+IutBocDJ2OT0Q1g8plpYljMZ4+lty+dsqw3g=
golang.org/x/crypto v0.9.0/go.mod h1:yrmDGqONDYtNj3tH8X9dzUun2m2lzPa9ngI6/RUPGR0=
golang.org/x/exp v0.0.0-20230519143937-03e91628a987 h1:3xJIFvzUFbu4ls0BTBYcgbCGhA63eAOEMxIHugyXJqA=
golang.org/x/exp v0.0.0-20230519143937-03e91628a987/go.mod h1:V1LtkGg67GoY2N1AnLN78QLrzxkLyJw7RJb1gzOOz9w=
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20210316092652-d523dce5a7f4/go.mod h1:RBQZq4jEuRlivfhVLdyRGr576XBO4/greRjx4P4O3yc=
golang.org/x/net v0.10.0 h1:X2//UzNDwYmtCLn7To6G58Wr6f5ahEAQgKNzv9Y951M=
golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210315160823-c6e025ad8005/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20220704084225-05e143d24a9e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.8.0 h1:EBmGv8NaZBZTWvrbjNoL6HVt+IVy3QDQpJs7VRIw3tU=
golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE=
golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/genproto v0.0.0-20230403163135-c38d8f061ccd h1:sLpv7bNL1AsX3fdnWh9WVh7ejIzXdOc1RRHGeAmeStU=
google.golang.org/genproto v0.0.0-20230403163135-c38d8f061ccd/go.mod h1:UUQDJDOlWu4KYeJZffbWgBkS1YFobzKbLVfK69pe0Ak=
google.golang.org/grpc v1.54.0 h1:EhTqbhiYeixwWQtAEZAxmV9MGqcjEU2mFx52xCzNyag=
google.golang.org/grpc v1.54.0/go.mod h1:PUSEXI6iWghWaB6lXM4knEgpJNu2qUcKfDtNci3EC2g=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.30.0 h1:kPPoIgf3TsEvrm0PFe15JQ+570QVxYzEvvHqChK+cng=
google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
lukechampine.com/blake3 v1.1.7 h1:GgRMhmdsuK8+ii6UZFDL8Nb+VyMwadAgcJyfYHxG6n0=
lukechampine.com/blake3 v1.1.7/go.mod h1:tkKEOtDkNtklkXtLNEOGNq5tcV90tJiA1vAA12R78LA=
rsc.io/pdf v0.1.1/go.mod h1:n8OzWcQ6Sp37PL01nO98y4iUCRdTGarVfzxY20ICaU4=

+ 60
- 0
client/internal/cmdline/bucket.go View File

@@ -0,0 +1,60 @@
package cmdline

import (
"fmt"

"github.com/jedib0t/go-pretty/v6/table"
)

func BucketListUserBuckets(ctx CommandContext) error {
userID := int64(0)

buckets, err := ctx.Cmdline.Svc.BucketSvc().GetUserBuckets(userID)
if err != nil {
return err
}

fmt.Printf("Find %d buckets for user %d:\n", len(buckets), userID)

tb := table.NewWriter()
tb.AppendHeader(table.Row{"ID", "Name", "CreatorID"})

for _, bucket := range buckets {
tb.AppendRow(table.Row{bucket.BucketID, bucket.Name, bucket.CreatorID})
}

fmt.Print(tb.Render())
return nil
}

func BucketCreateBucket(ctx CommandContext, bucketName string) error {
userID := int64(0)

bucketID, err := ctx.Cmdline.Svc.BucketSvc().CreateBucket(userID, bucketName)
if err != nil {
return err
}

fmt.Printf("Create bucket %s success, id: %d", bucketName, bucketID)
return nil
}

func BucketDeleteBucket(ctx CommandContext, bucketID int64) error {
userID := int64(0)

err := ctx.Cmdline.Svc.BucketSvc().DeleteBucket(userID, bucketID)
if err != nil {
return err
}

fmt.Printf("Delete bucket %d success ", bucketID)
return nil
}

func init() {
commands.MustAdd(BucketListUserBuckets, "bucket", "ls")

commands.MustAdd(BucketCreateBucket, "bucket", "new")

commands.MustAdd(BucketDeleteBucket, "bucket", "delete")
}

+ 32
- 0
client/internal/cmdline/cache.go View File

@@ -0,0 +1,32 @@
package cmdline

import (
"fmt"
"time"
)

func CacheMovePackage(ctx CommandContext, packageID int64, nodeID int64) error {
taskID, err := ctx.Cmdline.Svc.CacheSvc().StartCacheMovePackage(0, packageID, nodeID)
if err != nil {
return fmt.Errorf("start cache moving package: %w", err)
}

for {
complete, err := ctx.Cmdline.Svc.CacheSvc().WaitCacheMovePackage(nodeID, taskID, time.Second*10)
if complete {
if err != nil {
return fmt.Errorf("moving complete with: %w", err)
}

return nil
}

if err != nil {
return fmt.Errorf("wait moving: %w", err)
}
}
}

func init() {
commands.Add(CacheMovePackage, "cache", "move")
}

+ 40
- 0
client/internal/cmdline/commandline.go View File

@@ -0,0 +1,40 @@
package cmdline

import (
"fmt"
"os"

"gitlink.org.cn/cloudream/common/pkgs/cmdtrie"
"gitlink.org.cn/cloudream/storage-client/internal/services"
)

type CommandContext struct {
Cmdline *Commandline
}

var commands cmdtrie.CommandTrie[CommandContext, error] = cmdtrie.NewCommandTrie[CommandContext, error]()

type Commandline struct {
Svc *services.Service
}

func NewCommandline(svc *services.Service) (*Commandline, error) {
return &Commandline{
Svc: svc,
}, nil
}

func (c *Commandline) DispatchCommand(allArgs []string) {
cmdCtx := CommandContext{
Cmdline: c,
}
cmdErr, err := commands.Execute(cmdCtx, allArgs, cmdtrie.ExecuteOption{ReplaceEmptyArrayWithNil: true})
if err != nil {
fmt.Printf("execute command failed, err: %s", err.Error())
os.Exit(1)
}
if cmdErr != nil {
fmt.Printf("execute command failed, err: %s", cmdErr.Error())
os.Exit(1)
}
}

+ 72
- 0
client/internal/cmdline/distlock.go View File

@@ -0,0 +1,72 @@
package cmdline

import (
"fmt"
"strings"

"github.com/samber/lo"
"gitlink.org.cn/cloudream/common/pkgs/distlock"
"gitlink.org.cn/cloudream/common/pkgs/distlock/service"
"gitlink.org.cn/cloudream/storage-common/pkgs/distlock/lockprovider"
)

func DistLockLock(ctx CommandContext, lockData []string) error {
req := distlock.LockRequest{}

for _, lock := range lockData {
l, err := parseOneLock(lock)
if err != nil {
return fmt.Errorf("parse lock data %s failed, err: %w", lock, err)
}

req.Locks = append(req.Locks, l)
}

reqID, err := ctx.Cmdline.Svc.DistLock.Acquire(req, service.AcquireOption{
RetryTimeMs: 5000,
})
if err != nil {
return fmt.Errorf("acquire locks failed, err: %w", err)
}

fmt.Printf("%s\n", reqID)

return nil
}

func parseOneLock(lockData string) (distlock.Lock, error) {
var lock distlock.Lock

fullPathAndTarget := strings.Split(lockData, "@")
if len(fullPathAndTarget) != 2 {
return lock, fmt.Errorf("lock data must contains lock path, name and target")
}

pathAndName := strings.Split(fullPathAndTarget[0], "/")
if len(pathAndName) < 2 {
return lock, fmt.Errorf("lock data must contains lock path, name and target")
}

lock.Path = pathAndName[0 : len(pathAndName)-1]
lock.Name = pathAndName[len(pathAndName)-1]

target := lockprovider.NewStringLockTarget()
comps := strings.Split(fullPathAndTarget[1], "/")
for _, comp := range comps {
target.Add(lo.Map(strings.Split(comp, "."), func(str string, index int) any { return str })...)
}

lock.Target = *target

return lock, nil
}

func DistLockUnlock(ctx CommandContext, reqID string) error {
return ctx.Cmdline.Svc.DistLock.Release(reqID)
}

func init() {
commands.MustAdd(DistLockLock, "distlock", "lock")

commands.MustAdd(DistLockUnlock, "distlock", "unlock")
}

+ 313
- 0
client/internal/cmdline/package.go View File

@@ -0,0 +1,313 @@
package cmdline

import (
"fmt"
"io"
"os"
"path/filepath"
"time"

"github.com/jedib0t/go-pretty/v6/table"
"gitlink.org.cn/cloudream/common/models"
"gitlink.org.cn/cloudream/storage-client/internal/config"
"gitlink.org.cn/cloudream/storage-common/pkgs/iterator"
)

func PackageListBucketPackages(ctx CommandContext, bucketID int64) error {
userID := int64(0)

packages, err := ctx.Cmdline.Svc.BucketSvc().GetBucketPackages(userID, bucketID)
if err != nil {
return err
}

fmt.Printf("Find %d packages in bucket %d for user %d:\n", len(packages), bucketID, userID)

tb := table.NewWriter()
tb.AppendHeader(table.Row{"ID", "Name", "BucketID", "State", "Redundancy"})

for _, obj := range packages {
tb.AppendRow(table.Row{obj.PackageID, obj.Name, obj.BucketID, obj.State, obj.Redundancy})
}

fmt.Print(tb.Render())
return nil
}

func PackageDownloadPackage(ctx CommandContext, outputDir string, packageID int64) error {
err := os.MkdirAll(outputDir, os.ModePerm)
if err != nil {
return fmt.Errorf("create output directory %s failed, err: %w", outputDir, err)
}

// 下载文件
objIter, err := ctx.Cmdline.Svc.PackageSvc().DownloadPackage(0, packageID)
if err != nil {
return fmt.Errorf("download object failed, err: %w", err)
}
defer objIter.Close()

for {
objInfo, err := objIter.MoveNext()
if err == iterator.ErrNoMoreItem {
break
}
if err != nil {
return err
}
defer objInfo.File.Close()

fullPath := filepath.Join(outputDir, objInfo.Object.Path)

dirPath := filepath.Dir(fullPath)
if err := os.MkdirAll(dirPath, 0755); err != nil {
return fmt.Errorf("creating object dir: %w", err)
}

outputFile, err := os.Create(fullPath)
if err != nil {
return fmt.Errorf("creating object file: %w", err)
}
defer outputFile.Close()

_, err = io.Copy(outputFile, objInfo.File)
if err != nil {
return fmt.Errorf("copy object data to local file failed, err: %w", err)
}
}

return nil
}

func PackageUploadRepPackage(ctx CommandContext, rootPath string, bucketID int64, name string, repCount int) error {
rootPath = filepath.Clean(rootPath)

var uploadFilePathes []string
err := filepath.WalkDir(rootPath, func(fname string, fi os.DirEntry, err error) error {
if err != nil {
return nil
}

if !fi.IsDir() {
uploadFilePathes = append(uploadFilePathes, fname)
}

return nil
})
if err != nil {
return fmt.Errorf("open directory %s failed, err: %w", rootPath, err)
}

objIter := iterator.NewUploadingObjectIterator(rootPath, uploadFilePathes)
taskID, err := ctx.Cmdline.Svc.PackageSvc().StartCreatingRepPackage(0, bucketID, name, objIter, models.NewRepRedundancyInfo(repCount))

if err != nil {
return fmt.Errorf("upload file data failed, err: %w", err)
}

for {
complete, uploadObjectResult, err := ctx.Cmdline.Svc.PackageSvc().WaitCreatingRepPackage(taskID, time.Second*5)
if complete {
if err != nil {
return fmt.Errorf("uploading rep object: %w", err)
}

tb := table.NewWriter()

tb.AppendHeader(table.Row{"Path", "ObjectID", "FileHash"})
for i := 0; i < len(uploadObjectResult.ObjectResults); i++ {
tb.AppendRow(table.Row{
uploadObjectResult.ObjectResults[i].Info.Path,
uploadObjectResult.ObjectResults[i].ObjectID,
uploadObjectResult.ObjectResults[i].FileHash,
})
}
fmt.Print(tb.Render())
return nil
}

if err != nil {
return fmt.Errorf("wait uploading: %w", err)
}
}
}

func PackageUpdateRepPackage(ctx CommandContext, packageID int64, rootPath string) error {
//userID := int64(0)

var uploadFilePathes []string
err := filepath.WalkDir(rootPath, func(fname string, fi os.DirEntry, err error) error {
if err != nil {
return nil
}

if !fi.IsDir() {
uploadFilePathes = append(uploadFilePathes, fname)
}

return nil
})
if err != nil {
return fmt.Errorf("open directory %s failed, err: %w", rootPath, err)
}

objIter := iterator.NewUploadingObjectIterator(rootPath, uploadFilePathes)
taskID, err := ctx.Cmdline.Svc.PackageSvc().StartUpdatingRepPackage(0, packageID, objIter)
if err != nil {
return fmt.Errorf("update object %d failed, err: %w", packageID, err)
}

for {
complete, _, err := ctx.Cmdline.Svc.PackageSvc().WaitUpdatingRepPackage(taskID, time.Second*5)
if complete {
if err != nil {
return fmt.Errorf("updating rep object: %w", err)
}

return nil
}

if err != nil {
return fmt.Errorf("wait updating: %w", err)
}
}
}

func PackageUploadECPackage(ctx CommandContext, rootPath string, bucketID int64, name string, ecName string) error {
var uploadFilePathes []string
err := filepath.WalkDir(rootPath, func(fname string, fi os.DirEntry, err error) error {
if err != nil {
return nil
}

if !fi.IsDir() {
uploadFilePathes = append(uploadFilePathes, fname)
}

return nil
})
if err != nil {
return fmt.Errorf("open directory %s failed, err: %w", rootPath, err)
}

objIter := iterator.NewUploadingObjectIterator(rootPath, uploadFilePathes)
taskID, err := ctx.Cmdline.Svc.PackageSvc().StartCreatingECPackage(0, bucketID, name, objIter, models.NewECRedundancyInfo(ecName, config.Cfg().ECPacketSize))

if err != nil {
return fmt.Errorf("upload file data failed, err: %w", err)
}

for {
complete, uploadObjectResult, err := ctx.Cmdline.Svc.PackageSvc().WaitCreatingRepPackage(taskID, time.Second*5)
if complete {
if err != nil {
return fmt.Errorf("uploading ec package: %w", err)
}

tb := table.NewWriter()

tb.AppendHeader(table.Row{"Path", "ObjectID", "FileHash"})
for i := 0; i < len(uploadObjectResult.ObjectResults); i++ {
tb.AppendRow(table.Row{
uploadObjectResult.ObjectResults[i].Info.Path,
uploadObjectResult.ObjectResults[i].ObjectID,
uploadObjectResult.ObjectResults[i].FileHash,
})
}
fmt.Print(tb.Render())
return nil
}

if err != nil {
return fmt.Errorf("wait uploading: %w", err)
}
}
}

func PackageUpdateECPackage(ctx CommandContext, packageID int64, rootPath string) error {
//userID := int64(0)

var uploadFilePathes []string
err := filepath.WalkDir(rootPath, func(fname string, fi os.DirEntry, err error) error {
if err != nil {
return nil
}

if !fi.IsDir() {
uploadFilePathes = append(uploadFilePathes, fname)
}

return nil
})
if err != nil {
return fmt.Errorf("open directory %s failed, err: %w", rootPath, err)
}

objIter := iterator.NewUploadingObjectIterator(rootPath, uploadFilePathes)
taskID, err := ctx.Cmdline.Svc.PackageSvc().StartUpdatingECPackage(0, packageID, objIter)
if err != nil {
return fmt.Errorf("update package %d failed, err: %w", packageID, err)
}

for {
complete, _, err := ctx.Cmdline.Svc.PackageSvc().WaitUpdatingECPackage(taskID, time.Second*5)
if complete {
if err != nil {
return fmt.Errorf("updating ec package: %w", err)
}

return nil
}

if err != nil {
return fmt.Errorf("wait updating: %w", err)
}
}
}

func PackageDeletePackage(ctx CommandContext, packageID int64) error {
userID := int64(0)
err := ctx.Cmdline.Svc.PackageSvc().DeletePackage(userID, packageID)
if err != nil {
return fmt.Errorf("delete package %d failed, err: %w", packageID, err)
}
return nil
}

func PackageGetCachedNodes(ctx CommandContext, packageID int64, userID int64) error {
resp, err := ctx.Cmdline.Svc.PackageSvc().GetCachedNodes(userID, packageID)
fmt.Printf("resp: %v\n", resp)
if err != nil {
return fmt.Errorf("get package %d cached nodes failed, err: %w", packageID, err)
}
return nil
}

func PackageGetLoadedNodes(ctx CommandContext, packageID int64, userID int64) error {
nodeIDs, err := ctx.Cmdline.Svc.PackageSvc().GetLoadedNodes(userID, packageID)
fmt.Printf("nodeIDs: %v\n", nodeIDs)
if err != nil {
return fmt.Errorf("get package %d loaded nodes failed, err: %w", packageID, err)
}
return nil
}

func init() {
commands.MustAdd(PackageListBucketPackages, "pkg", "ls")

commands.MustAdd(PackageDownloadPackage, "pkg", "get")

commands.MustAdd(PackageUploadRepPackage, "pkg", "new", "rep")

commands.MustAdd(PackageUpdateRepPackage, "pkg", "update", "rep")

commands.MustAdd(PackageUploadRepPackage, "pkg", "new", "ec")

commands.MustAdd(PackageUpdateRepPackage, "pkg", "update", "ec")

commands.MustAdd(PackageDeletePackage, "pkg", "delete")

commands.MustAdd(PackageGetCachedNodes, "pkg", "cached")

commands.MustAdd(PackageGetLoadedNodes, "pkg", "loaded")
}

+ 41
- 0
client/internal/cmdline/scanner.go View File

@@ -0,0 +1,41 @@
package cmdline

import (
"fmt"

"gitlink.org.cn/cloudream/common/pkgs/cmdtrie"
myreflect "gitlink.org.cn/cloudream/common/utils/reflect"
scevt "gitlink.org.cn/cloudream/storage-common/pkgs/mq/scanner/event"
)

var parseScannerEventCmdTrie cmdtrie.StaticCommandTrie[any] = cmdtrie.NewStaticCommandTrie[any]()

func ScannerPostEvent(ctx CommandContext, args []string) error {
ret, err := parseScannerEventCmdTrie.Execute(args, cmdtrie.ExecuteOption{ReplaceEmptyArrayWithNil: true})
if err != nil {
return fmt.Errorf("execute parsing event command failed, err: %w", err)
}

err = ctx.Cmdline.Svc.ScannerSvc().PostEvent(ret, false, false)
if err != nil {
return fmt.Errorf("post event to scanner failed, err: %w", err)
}

return nil
}

func init() {
parseScannerEventCmdTrie.MustAdd(scevt.NewAgentCheckCache, myreflect.TypeNameOf[scevt.AgentCheckCache]())

parseScannerEventCmdTrie.MustAdd(scevt.NewAgentCheckState, myreflect.TypeNameOf[scevt.AgentCheckState]())

parseScannerEventCmdTrie.MustAdd(scevt.NewAgentCheckStorage, myreflect.TypeNameOf[scevt.AgentCheckStorage]())

parseScannerEventCmdTrie.MustAdd(scevt.NewCheckCache, myreflect.TypeNameOf[scevt.CheckCache]())

parseScannerEventCmdTrie.MustAdd(scevt.NewCheckPackage, myreflect.TypeNameOf[scevt.CheckPackage]())

parseScannerEventCmdTrie.MustAdd(scevt.NewCheckRepCount, myreflect.TypeNameOf[scevt.CheckRepCount]())

commands.MustAdd(ScannerPostEvent, "scanner", "event")
}

+ 30
- 0
client/internal/cmdline/serve.go View File

@@ -0,0 +1,30 @@
package cmdline

import (
"fmt"

"gitlink.org.cn/cloudream/storage-client/internal/http"
)

func ServeHTTP(ctx CommandContext, args []string) error {
listenAddr := ":7890"
if len(args) > 0 {
listenAddr = args[0]
}

httpSvr, err := http.NewServer(listenAddr, ctx.Cmdline.Svc)
if err != nil {
return fmt.Errorf("new http server: %w", err)
}

err = httpSvr.Serve()
if err != nil {
return fmt.Errorf("serving http: %w", err)
}

return nil
}

func init() {
commands.MustAdd(ServeHTTP, "serve", "http")
}

+ 60
- 0
client/internal/cmdline/storage.go View File

@@ -0,0 +1,60 @@
package cmdline

import (
"fmt"
"time"

"gitlink.org.cn/cloudream/common/models"
)

func StorageLoadPackage(ctx CommandContext, packageID int64, storageID int64) error {
taskID, err := ctx.Cmdline.Svc.StorageSvc().StartStorageLoadPackage(0, packageID, storageID)
if err != nil {
return fmt.Errorf("start loading package to storage: %w", err)
}

for {
complete, err := ctx.Cmdline.Svc.StorageSvc().WaitStorageLoadPackage(taskID, time.Second*10)
if complete {
if err != nil {
return fmt.Errorf("moving complete with: %w", err)
}

return nil
}

if err != nil {
return fmt.Errorf("wait moving: %w", err)
}
}
}

func StorageCreateRepPackage(ctx CommandContext, bucketID int64, name string, storageID int64, path string, repCount int) error {
nodeID, taskID, err := ctx.Cmdline.Svc.StorageSvc().StartStorageCreatePackage(0, bucketID, name, storageID, path,
models.NewTypedRepRedundancyInfo(repCount))
if err != nil {
return fmt.Errorf("start storage uploading rep package: %w", err)
}

for {
complete, packageID, err := ctx.Cmdline.Svc.StorageSvc().WaitStorageCreatePackage(nodeID, taskID, time.Second*10)
if complete {
if err != nil {
return fmt.Errorf("uploading complete with: %w", err)
}

fmt.Printf("%d\n", packageID)
return nil
}

if err != nil {
return fmt.Errorf("wait uploading: %w", err)
}
}
}

func init() {
commands.MustAdd(StorageLoadPackage, "stg", "load", "pkg")

commands.MustAdd(StorageCreateRepPackage, "stg", "upload", "rep")
}

+ 32
- 0
client/internal/config/config.go View File

@@ -0,0 +1,32 @@
package config

import (
"gitlink.org.cn/cloudream/common/pkgs/distlock"
"gitlink.org.cn/cloudream/common/pkgs/ipfs"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/utils/config"
stgmodels "gitlink.org.cn/cloudream/storage-common/models"
agtrpc "gitlink.org.cn/cloudream/storage-common/pkgs/grpc/agent"
stgmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq"
)

type Config struct {
Local stgmodels.LocalMachineInfo `json:"local"`
AgentGRPC agtrpc.PoolConfig `json:"agentGRPC"`
ECPacketSize int64 `json:"ecPacketSize"`
MaxRepCount int `json:"maxRepCount"`
Logger logger.Config `json:"logger"`
RabbitMQ stgmq.Config `json:"rabbitMQ"`
IPFS *ipfs.Config `json:"ipfs"` // 此字段非空代表客户端上存在ipfs daemon
DistLock distlock.Config `json:"distlock"`
}

var cfg Config

func Init() error {
return config.DefaultLoad("client", &cfg)
}

func Cfg() *Config {
return &cfg
}

+ 64
- 0
client/internal/http/cacah.go View File

@@ -0,0 +1,64 @@
package http

import (
"net/http"
"time"

"github.com/gin-gonic/gin"
"gitlink.org.cn/cloudream/common/consts/errorcode"
"gitlink.org.cn/cloudream/common/pkgs/logger"
)

type CacheService struct {
*Server
}

func (s *Server) CacheSvc() *CacheService {
return &CacheService{
Server: s,
}
}

type CacheMovePackageReq struct {
UserID *int64 `json:"userID" binding:"required"`
PackageID *int64 `json:"packageID" binding:"required"`
NodeID *int64 `json:"nodeID" binding:"required"`
}

func (s *CacheService) MovePackage(ctx *gin.Context) {
log := logger.WithField("HTTP", "Cache.LoadPackage")

var req CacheMovePackageReq
if err := ctx.ShouldBindJSON(&req); err != nil {
log.Warnf("binding body: %s", err.Error())
ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument"))
return
}

taskID, err := s.svc.CacheSvc().StartCacheMovePackage(*req.UserID, *req.PackageID, *req.NodeID)
if err != nil {
log.Warnf("start cache move package: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "cache move package failed"))
return
}

for {
complete, err := s.svc.CacheSvc().WaitCacheMovePackage(*req.NodeID, taskID, time.Second*10)
if complete {
if err != nil {
log.Warnf("moving complete with: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "cache move package failed"))
return
}

ctx.JSON(http.StatusOK, OK(nil))
return
}

if err != nil {
log.Warnf("wait moving: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "cache move package failed"))
return
}
}
}

+ 24
- 0
client/internal/http/http.go View File

@@ -0,0 +1,24 @@
package http

import "gitlink.org.cn/cloudream/common/consts/errorcode"

type Response struct {
Code string `json:"code"`
Message string `json:"message"`
Data any `json:"data"`
}

func OK(data any) Response {
return Response{
Code: errorcode.OK,
Message: "",
Data: data,
}
}

func Failed(code string, msg string) Response {
return Response{
Code: code,
Message: msg,
}
}

+ 70
- 0
client/internal/http/object.go View File

@@ -0,0 +1,70 @@
package http

import (
"io"
"net/http"

"github.com/gin-gonic/gin"
"gitlink.org.cn/cloudream/common/consts/errorcode"
"gitlink.org.cn/cloudream/common/pkgs/logger"
myio "gitlink.org.cn/cloudream/common/utils/io"
)

type ObjectService struct {
*Server
}

func (s *Server) ObjectSvc() *ObjectService {
return &ObjectService{
Server: s,
}
}

type ObjectDownloadReq struct {
UserID *int64 `form:"userID" binding:"required"`
ObjectID *int64 `form:"objectID" binding:"required"`
}

func (s *ObjectService) Download(ctx *gin.Context) {
log := logger.WithField("HTTP", "Object.Download")

var req ObjectDownloadReq
if err := ctx.ShouldBindQuery(&req); err != nil {
log.Warnf("binding body: %s", err.Error())
ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument"))
return
}

file, err := s.svc.ObjectSvc().Download(*req.UserID, *req.ObjectID)
if err != nil {
log.Warnf("downloading object: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "download object failed"))
return
}

ctx.Writer.WriteHeader(http.StatusOK)
// TODO 需要设置FileName
ctx.Header("Content-Disposition", "attachment; filename=filename")
ctx.Header("Content-Type", "application/octet-stream")

buf := make([]byte, 4096)
ctx.Stream(func(w io.Writer) bool {
rd, err := file.Read(buf)
if err == io.EOF {
return false
}

if err != nil {
log.Warnf("reading file data: %s", err.Error())
return false
}

err = myio.WriteAll(w, buf[:rd])
if err != nil {
log.Warnf("writing data to response: %s", err.Error())
return false
}

return true
})
}

+ 234
- 0
client/internal/http/package.go View File

@@ -0,0 +1,234 @@
package http

import (
"mime/multipart"
"net/http"
"time"

"github.com/gin-gonic/gin"
"gitlink.org.cn/cloudream/common/consts/errorcode"
"gitlink.org.cn/cloudream/common/models"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/storage-common/pkgs/iterator"
)

type PackageService struct {
*Server
}

func (s *Server) PackageSvc() *PackageService {
return &PackageService{
Server: s,
}
}

type PackageUploadReq struct {
Info PackageUploadInfo `form:"info" binding:"required"`
Files []*multipart.FileHeader `form:"files"`
}

type PackageUploadInfo struct {
UserID *int64 `json:"userID" binding:"required"`
BucketID *int64 `json:"bucketID" binding:"required"`
Name string `json:"name" binding:"required"`
Redundancy models.TypedRedundancyInfo `json:"redundancy" binding:"required"`
}

type PackageUploadResp struct {
PackageID int64 `json:"packageID,string"`
}

func (s *PackageService) Upload(ctx *gin.Context) {
log := logger.WithField("HTTP", "Package.Upload")

var req PackageUploadReq
if err := ctx.ShouldBind(&req); err != nil {
log.Warnf("binding body: %s", err.Error())
ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument"))
return
}

if req.Info.Redundancy.IsRepInfo() {
s.uploadRep(ctx, &req)
return
}

if req.Info.Redundancy.IsECInfo() {
s.uploadEC(ctx, &req)
return
}

ctx.JSON(http.StatusForbidden, Failed(errorcode.OperationFailed, "not supported yet"))
}

func (s *PackageService) uploadRep(ctx *gin.Context, req *PackageUploadReq) {
log := logger.WithField("HTTP", "Package.Upload")

var err error
var repInfo models.RepRedundancyInfo
if repInfo, err = req.Info.Redundancy.ToRepInfo(); err != nil {
log.Warnf("parsing rep redundancy config: %s", err.Error())
ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "invalid rep redundancy config"))
return
}

objIter := iterator.NewHTTPObjectIterator(req.Files)

taskID, err := s.svc.PackageSvc().StartCreatingRepPackage(*req.Info.UserID, *req.Info.BucketID, req.Info.Name, objIter, repInfo)

if err != nil {
log.Warnf("start uploading rep package task: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "start uploading task failed"))
return
}

for {
complete, createResult, err := s.svc.PackageSvc().WaitCreatingRepPackage(taskID, time.Second*5)
if complete {
if err != nil {
log.Warnf("uploading rep package: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "uploading rep package failed"))
return
}

ctx.JSON(http.StatusOK, OK(PackageUploadResp{
PackageID: createResult.PackageID,
}))
return
}

if err != nil {
log.Warnf("waiting task: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "wait uploading task failed"))
return
}
}
}

func (s *PackageService) uploadEC(ctx *gin.Context, req *PackageUploadReq) {
log := logger.WithField("HTTP", "Package.Upload")

var err error
var ecInfo models.ECRedundancyInfo
if ecInfo, err = req.Info.Redundancy.ToECInfo(); err != nil {
log.Warnf("parsing ec redundancy config: %s", err.Error())
ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "invalid rep redundancy config"))
return
}

objIter := iterator.NewHTTPObjectIterator(req.Files)

taskID, err := s.svc.PackageSvc().StartCreatingECPackage(*req.Info.UserID, *req.Info.BucketID, req.Info.Name, objIter, ecInfo)

if err != nil {
log.Warnf("start uploading ec package task: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "start uploading task failed"))
return
}

for {
complete, createResult, err := s.svc.PackageSvc().WaitCreatingECPackage(taskID, time.Second*5)
if complete {
if err != nil {
log.Warnf("uploading ec package: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "uploading ec package failed"))
return
}

ctx.JSON(http.StatusOK, OK(PackageUploadResp{
PackageID: createResult.PackageID,
}))
return
}

if err != nil {
log.Warnf("waiting task: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "wait uploading task failed"))
return
}
}
}

type PackageDeleteReq struct {
UserID *int64 `json:"userID" binding:"required"`
PackageID *int64 `json:"packageID" binding:"required"`
}

func (s *PackageService) Delete(ctx *gin.Context) {
log := logger.WithField("HTTP", "Package.Delete")

var req PackageDeleteReq
if err := ctx.ShouldBindJSON(&req); err != nil {
log.Warnf("binding body: %s", err.Error())
ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument"))
return
}

err := s.svc.PackageSvc().DeletePackage(*req.UserID, *req.PackageID)
if err != nil {
log.Warnf("deleting package: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "delete package failed"))
return
}

ctx.JSON(http.StatusOK, OK(nil))
}

type GetCachedNodesReq struct {
UserID *int64 `json:"userID" binding:"required"`
PackageID *int64 `json:"packageID" binding:"required"`
}
type GetCachedNodesResp struct {
models.PackageCachingInfo
}

func (s *PackageService) GetCachedNodes(ctx *gin.Context) {
log := logger.WithField("HTTP", "Package.GetCachedNodes")

var req GetCachedNodesReq
if err := ctx.ShouldBindJSON(&req); err != nil {
log.Warnf("binding body: %s", err.Error())
ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument"))
return
}

resp, err := s.svc.PackageSvc().GetCachedNodes(*req.UserID, *req.PackageID)
if err != nil {
log.Warnf("get package cached nodes failed: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get package cached nodes failed"))
return
}

ctx.JSON(http.StatusOK, OK(GetCachedNodesResp{resp}))
}

type GetLoadedNodesReq struct {
UserID *int64 `json:"userID" binding:"required"`
PackageID *int64 `json:"packageID" binding:"required"`
}

type GetLoadedNodesResp struct {
NodeIDs []int64 `json:"nodeIDs"`
}

func (s *PackageService) GetLoadedNodes(ctx *gin.Context) {
log := logger.WithField("HTTP", "Package.GetLoadedNodes")

var req GetLoadedNodesReq
if err := ctx.ShouldBindJSON(&req); err != nil {
log.Warnf("binding body: %s", err.Error())
ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument"))
return
}

nodeIDs, err := s.svc.PackageSvc().GetLoadedNodes(*req.UserID, *req.PackageID)
if err != nil {
log.Warnf("get package loaded nodes failed: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get package loaded nodes failed"))
return
}

ctx.JSON(http.StatusOK, OK(GetLoadedNodesResp{
NodeIDs: nodeIDs,
}))
}

+ 52
- 0
client/internal/http/server.go View File

@@ -0,0 +1,52 @@
package http

import (
"github.com/gin-gonic/gin"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/storage-client/internal/services"
)

type Server struct {
engine *gin.Engine
listenAddr string
svc *services.Service
}

func NewServer(listenAddr string, svc *services.Service) (*Server, error) {
engine := gin.New()

return &Server{
engine: engine,
listenAddr: listenAddr,
svc: svc,
}, nil
}

func (s *Server) Serve() error {
s.initRouters()

logger.Infof("start serving http at: %s", s.listenAddr)
err := s.engine.Run(s.listenAddr)

if err != nil {
logger.Infof("http stopped with error: %s", err.Error())
return err
}

logger.Infof("http stopped")
return nil
}

func (s *Server) initRouters() {
s.engine.GET("/object/download", s.ObjectSvc().Download)

s.engine.POST("/package/upload", s.PackageSvc().Upload)
s.engine.POST("/package/delete", s.PackageSvc().Delete)
s.engine.GET("/package/getCachedNodes", s.PackageSvc().GetCachedNodes)
s.engine.GET("/package/getLoadedNodes", s.PackageSvc().GetLoadedNodes)

s.engine.POST("/storage/loadPackage", s.StorageSvc().LoadPackage)
s.engine.POST("/storage/createPackage", s.StorageSvc().CreatePackage)

s.engine.POST("/cache/movePackage", s.CacheSvc().MovePackage)
}

+ 119
- 0
client/internal/http/storage.go View File

@@ -0,0 +1,119 @@
package http

import (
"net/http"
"time"

"github.com/gin-gonic/gin"
"gitlink.org.cn/cloudream/common/consts/errorcode"
"gitlink.org.cn/cloudream/common/models"
"gitlink.org.cn/cloudream/common/pkgs/logger"
)

type StorageService struct {
*Server
}

func (s *Server) StorageSvc() *StorageService {
return &StorageService{
Server: s,
}
}

type StorageLoadPackageReq struct {
UserID *int64 `json:"userID" binding:"required"`
PackageID *int64 `json:"packageID" binding:"required"`
StorageID *int64 `json:"storageID" binding:"required"`
}

func (s *StorageService) LoadPackage(ctx *gin.Context) {
log := logger.WithField("HTTP", "Storage.LoadPackage")

var req StorageLoadPackageReq
if err := ctx.ShouldBindJSON(&req); err != nil {
log.Warnf("binding body: %s", err.Error())
ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument"))
return
}

taskID, err := s.svc.StorageSvc().StartStorageLoadPackage(*req.UserID, *req.PackageID, *req.StorageID)
if err != nil {
log.Warnf("start storage load package: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "storage load package failed"))
return
}

for {
complete, err := s.svc.StorageSvc().WaitStorageLoadPackage(taskID, time.Second*10)
if complete {
if err != nil {
log.Warnf("loading complete with: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "storage load package failed"))
return
}

ctx.JSON(http.StatusOK, OK(nil))
return
}

if err != nil {
log.Warnf("wait loadding: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "storage load package failed"))
return
}
}
}

type StorageCreatePackageReq struct {
UserID *int64 `json:"userID" binding:"required"`
StorageID *int64 `json:"storageID" binding:"required"`
Path string `json:"path" binding:"required"`
BucketID *int64 `json:"bucketID" binding:"required"`
Name string `json:"name" binding:"required"`
Redundancy models.TypedRedundancyInfo `json:"redundancy" binding:"required"`
}

type StorageCreatePackageResp struct {
PackageID int64 `json:"packageID"`
}

func (s *StorageService) CreatePackage(ctx *gin.Context) {
log := logger.WithField("HTTP", "Storage.CreatePackage")

var req StorageCreatePackageReq
if err := ctx.ShouldBindJSON(&req); err != nil {
log.Warnf("binding body: %s", err.Error())
ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument"))
return
}

nodeID, taskID, err := s.svc.StorageSvc().StartStorageCreatePackage(
*req.UserID, *req.BucketID, req.Name, *req.StorageID, req.Path, req.Redundancy)
if err != nil {
log.Warnf("start storage create package: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "storage create package failed"))
return
}

for {
complete, packageID, err := s.svc.StorageSvc().WaitStorageCreatePackage(nodeID, taskID, time.Second*10)
if complete {
if err != nil {
log.Warnf("creating complete with: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "storage create package failed"))
return
}

ctx.JSON(http.StatusOK, OK(StorageCreatePackageResp{
PackageID: packageID,
}))
return
}

if err != nil {
log.Warnf("wait creating: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "storage create package failed"))
return
}
}
}

+ 9
- 0
client/internal/services/agent.go View File

@@ -0,0 +1,9 @@
package services

type AgentService struct {
*Service
}

func (svc *Service) AgentSvc() *AgentService {
return &AgentService{Service: svc}
}

+ 113
- 0
client/internal/services/bucket.go View File

@@ -0,0 +1,113 @@
package services

import (
"fmt"

"gitlink.org.cn/cloudream/storage-common/globals"
"gitlink.org.cn/cloudream/storage-common/pkgs/db/model"
"gitlink.org.cn/cloudream/storage-common/pkgs/distlock/reqbuilder"
coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator"
)

type BucketService struct {
*Service
}

func (svc *Service) BucketSvc() *BucketService {
return &BucketService{Service: svc}
}

func (svc *BucketService) GetBucket(userID int64, bucketID int64) (model.Bucket, error) {
// TODO
panic("not implement yet")
}

func (svc *BucketService) GetUserBuckets(userID int64) ([]model.Bucket, error) {
coorCli, err := globals.CoordinatorMQPool.Acquire()
if err != nil {
return nil, fmt.Errorf("new coordinator client: %w", err)
}
defer coorCli.Close()

resp, err := coorCli.GetUserBuckets(coormq.NewGetUserBuckets(userID))
if err != nil {
return nil, fmt.Errorf("get user buckets failed, err: %w", err)
}

return resp.Buckets, nil
}

func (svc *BucketService) GetBucketPackages(userID int64, bucketID int64) ([]model.Package, error) {
coorCli, err := globals.CoordinatorMQPool.Acquire()
if err != nil {
return nil, fmt.Errorf("new coordinator client: %w", err)
}
defer coorCli.Close()

resp, err := coorCli.GetBucketPackages(coormq.NewGetBucketPackages(userID, bucketID))
if err != nil {
return nil, fmt.Errorf("get bucket packages failed, err: %w", err)
}

return resp.Packages, nil
}

func (svc *BucketService) CreateBucket(userID int64, bucketName string) (int64, error) {
coorCli, err := globals.CoordinatorMQPool.Acquire()
if err != nil {
return 0, fmt.Errorf("new coordinator client: %w", err)
}
defer coorCli.Close()

// TODO 只有阅读了系统操作的源码,才能知道要加哪些锁,但用户的命令可能会调用不止一个系统操作。
// 因此加锁的操作还是必须在用户命令里完成,但具体加锁的内容,则需要被封装起来与系统操作放到一起,方便管理,避免分散改动。

mutex, err := reqbuilder.NewBuilder().
Metadata().Bucket().CreateOne(userID, bucketName).
// TODO 可以考虑二次加锁,加的更精确
UserBucket().CreateAny().
MutexLock(svc.DistLock)
if err != nil {
return 0, fmt.Errorf("acquire locks failed, err: %w", err)
}
defer mutex.Unlock()

resp, err := coorCli.CreateBucket(coormq.NewCreateBucket(userID, bucketName))
if err != nil {
return 0, fmt.Errorf("creating bucket: %w", err)
}

return resp.BucketID, nil
}

func (svc *BucketService) DeleteBucket(userID int64, bucketID int64) error {
coorCli, err := globals.CoordinatorMQPool.Acquire()
if err != nil {
return fmt.Errorf("new coordinator client: %w", err)
}
defer coorCli.Close()

// TODO 检查用户是否有删除这个Bucket的权限。检查的时候可以只上UserBucket的Read锁

mutex, err := reqbuilder.NewBuilder().
Metadata().
UserBucket().WriteAny().
Bucket().WriteOne(bucketID).
Package().WriteAny().
Object().WriteAny().
ObjectRep().WriteAny().
ObjectBlock().WriteAny().
StoragePackage().WriteAny().
MutexLock(svc.DistLock)
if err != nil {
return fmt.Errorf("acquire locks failed, err: %w", err)
}
defer mutex.Unlock()

_, err = coorCli.DeleteBucket(coormq.NewDeleteBucket(userID, bucketID))
if err != nil {
return fmt.Errorf("request to coordinator failed, err: %w", err)
}

return nil
}

+ 55
- 0
client/internal/services/cacah.go View File

@@ -0,0 +1,55 @@
package services

import (
"fmt"
"time"

"gitlink.org.cn/cloudream/storage-common/globals"
agtmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/agent"
)

type CacheService struct {
*Service
}

func (svc *Service) CacheSvc() *CacheService {
return &CacheService{Service: svc}
}

func (svc *CacheService) StartCacheMovePackage(userID int64, packageID int64, nodeID int64) (string, error) {
agentCli, err := globals.AgentMQPool.Acquire(nodeID)
if err != nil {
return "", fmt.Errorf("new agent client: %w", err)
}
defer agentCli.Close()

startResp, err := agentCli.StartCacheMovePackage(agtmq.NewStartCacheMovePackage(userID, packageID))
if err != nil {
return "", fmt.Errorf("start cache move package: %w", err)
}

return startResp.TaskID, nil
}

func (svc *CacheService) WaitCacheMovePackage(nodeID int64, taskID string, waitTimeout time.Duration) (bool, error) {
agentCli, err := globals.AgentMQPool.Acquire(nodeID)
if err != nil {
return true, fmt.Errorf("new agent client: %w", err)
}
defer agentCli.Close()

waitResp, err := agentCli.WaitCacheMovePackage(agtmq.NewWaitCacheMovePackage(taskID, waitTimeout.Milliseconds()))
if err != nil {
return true, fmt.Errorf("wait cache move package: %w", err)
}

if !waitResp.IsComplete {
return false, nil
}

if waitResp.Error != "" {
return true, fmt.Errorf("%s", waitResp.Error)
}

return true, nil
}

+ 15
- 0
client/internal/services/object.go View File

@@ -0,0 +1,15 @@
package services

import "io"

type ObjectService struct {
*Service
}

func (svc *Service) ObjectSvc() *ObjectService {
return &ObjectService{Service: svc}
}

func (svc *ObjectService) Download(userID int64, objectID int64) (io.ReadCloser, error) {
panic("not implement yet!")
}

+ 249
- 0
client/internal/services/package.go View File

@@ -0,0 +1,249 @@
package services

import (
"fmt"
"time"

"gitlink.org.cn/cloudream/common/models"
mytask "gitlink.org.cn/cloudream/storage-client/internal/task"
"gitlink.org.cn/cloudream/storage-common/globals"
agtcmd "gitlink.org.cn/cloudream/storage-common/pkgs/cmd"
"gitlink.org.cn/cloudream/storage-common/pkgs/db/model"
"gitlink.org.cn/cloudream/storage-common/pkgs/distlock/reqbuilder"
"gitlink.org.cn/cloudream/storage-common/pkgs/iterator"
coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator"
)

type PackageService struct {
*Service
}

func (svc *Service) PackageSvc() *PackageService {
return &PackageService{Service: svc}
}

func (svc *PackageService) DownloadPackage(userID int64, packageID int64) (iterator.DownloadingObjectIterator, error) {
coorCli, err := globals.CoordinatorMQPool.Acquire()
if err != nil {
return nil, fmt.Errorf("new coordinator client: %w", err)
}
defer coorCli.Close()

mutex, err := reqbuilder.NewBuilder().
// 用于判断用户是否有对象权限
Metadata().UserBucket().ReadAny().
// 用于查询可用的下载节点
Node().ReadAny().
// 用于读取包信息
Package().ReadOne(packageID).
// 用于读取包内的文件信息
Object().ReadAny().
// 用于查询Rep配置
ObjectRep().ReadAny().
// 用于查询Block配置
ObjectBlock().ReadAny().
// 用于查询包含了副本的节点
Cache().ReadAny().
MutexLock(svc.DistLock)
if err != nil {
return nil, fmt.Errorf("acquire locks failed, err: %w", err)
}

getPkgResp, err := coorCli.GetPackage(coormq.NewGetPackage(userID, packageID))
if err != nil {
return nil, fmt.Errorf("getting package: %w", err)
}

getObjsResp, err := coorCli.GetPackageObjects(coormq.NewGetPackageObjects(userID, packageID))
if err != nil {
return nil, fmt.Errorf("getting package objects: %w", err)
}

if getPkgResp.Redundancy.IsRepInfo() {
iter, err := svc.downloadRepPackage(packageID, getObjsResp.Objects, coorCli)

if err != nil {
mutex.Unlock()
return nil, err
}

iter.OnClosing = func() {
mutex.Unlock()
}

return iter, nil
} else {
iter, err := svc.downloadECPackage(getPkgResp.Package, getObjsResp.Objects, coorCli)

if err != nil {
mutex.Unlock()
return nil, err
}

iter.OnClosing = func() {
mutex.Unlock()
}

return iter, nil
}
}

func (svc *PackageService) downloadRepPackage(packageID int64, objects []model.Object, coorCli *coormq.PoolClient) (*iterator.RepObjectIterator, error) {
getObjRepDataResp, err := coorCli.GetPackageObjectRepData(coormq.NewGetPackageObjectRepData(packageID))
if err != nil {
return nil, fmt.Errorf("getting package object rep data: %w", err)
}

iter := iterator.NewRepObjectIterator(objects, getObjRepDataResp.Data, &iterator.DownloadContext{
Distlock: svc.DistLock,
})

return iter, nil
}
func (svc *PackageService) downloadECPackage(pkg model.Package, objects []model.Object, coorCli *coormq.PoolClient) (*iterator.ECObjectIterator, error) {
getObjECDataResp, err := coorCli.GetPackageObjectECData(coormq.NewGetPackageObjectECData(pkg.PackageID))
if err != nil {
return nil, fmt.Errorf("getting package object ec data: %w", err)
}

var ecInfo models.ECRedundancyInfo
if ecInfo, err = pkg.Redundancy.ToECInfo(); err != nil {
return nil, fmt.Errorf("get ec redundancy info: %w", err)
}

getECResp, err := coorCli.GetECConfig(coormq.NewGetECConfig(ecInfo.ECName))
if err != nil {
return nil, fmt.Errorf("getting ec: %w", err)
}

iter := iterator.NewECObjectIterator(objects, getObjECDataResp.Data, ecInfo, getECResp.Config, &iterator.DownloadContext{
Distlock: svc.DistLock,
})

return iter, nil
}

func (svc *PackageService) StartCreatingRepPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, repInfo models.RepRedundancyInfo) (string, error) {
tsk := svc.TaskMgr.StartNew(mytask.NewCreateRepPackage(userID, bucketID, name, objIter, repInfo))
return tsk.ID(), nil
}

func (svc *PackageService) WaitCreatingRepPackage(taskID string, waitTimeout time.Duration) (bool, *mytask.CreateRepPackageResult, error) {
tsk := svc.TaskMgr.FindByID(taskID)
if tsk.WaitTimeout(waitTimeout) {
cteatePkgTask := tsk.Body().(*mytask.CreateRepPackage)
return true, cteatePkgTask.Result, tsk.Error()
}
return false, nil, nil
}

func (svc *PackageService) StartUpdatingRepPackage(userID int64, packageID int64, objIter iterator.UploadingObjectIterator) (string, error) {
tsk := svc.TaskMgr.StartNew(mytask.NewUpdateRepPackage(userID, packageID, objIter))
return tsk.ID(), nil
}

func (svc *PackageService) WaitUpdatingRepPackage(taskID string, waitTimeout time.Duration) (bool, *agtcmd.UpdateRepPackageResult, error) {
tsk := svc.TaskMgr.FindByID(taskID)
if tsk.WaitTimeout(waitTimeout) {
updatePkgTask := tsk.Body().(*mytask.UpdateRepPackage)
return true, updatePkgTask.Result, tsk.Error()
}
return false, nil, nil
}

func (svc *PackageService) StartCreatingECPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, ecInfo models.ECRedundancyInfo) (string, error) {
tsk := svc.TaskMgr.StartNew(mytask.NewCreateECPackage(userID, bucketID, name, objIter, ecInfo))
return tsk.ID(), nil
}

func (svc *PackageService) WaitCreatingECPackage(taskID string, waitTimeout time.Duration) (bool, *agtcmd.CreateRepPackageResult, error) {
tsk := svc.TaskMgr.FindByID(taskID)
if tsk.WaitTimeout(waitTimeout) {
cteatePkgTask := tsk.Body().(*mytask.CreateRepPackage)
return true, cteatePkgTask.Result, tsk.Error()
}
return false, nil, nil
}

func (svc *PackageService) StartUpdatingECPackage(userID int64, packageID int64, objIter iterator.UploadingObjectIterator) (string, error) {
tsk := svc.TaskMgr.StartNew(mytask.NewUpdateECPackage(userID, packageID, objIter))
return tsk.ID(), nil
}

func (svc *PackageService) WaitUpdatingECPackage(taskID string, waitTimeout time.Duration) (bool, *agtcmd.UpdateECPackageResult, error) {
tsk := svc.TaskMgr.FindByID(taskID)
if tsk.WaitTimeout(waitTimeout) {
updatePkgTask := tsk.Body().(*mytask.UpdateECPackage)
return true, updatePkgTask.Result, tsk.Error()
}
return false, nil, nil
}

func (svc *PackageService) DeletePackage(userID int64, packageID int64) error {
coorCli, err := globals.CoordinatorMQPool.Acquire()
if err != nil {
return fmt.Errorf("new coordinator client: %w", err)
}
defer coorCli.Close()

mutex, err := reqbuilder.NewBuilder().
Metadata().
// 用于判断用户是否有对象的权限
UserBucket().ReadAny().
// 用于读取、修改包信息
Package().WriteOne(packageID).
// 用于删除包内的所有文件
Object().WriteAny().
// 用于删除Rep配置
ObjectRep().WriteAny().
// 用于删除Block配置
ObjectBlock().WriteAny().
// 用于修改Move此Object的记录的状态
StoragePackage().WriteAny().
MutexLock(svc.DistLock)
if err != nil {
return fmt.Errorf("acquire locks failed, err: %w", err)
}
defer mutex.Unlock()

_, err = coorCli.DeletePackage(coormq.NewDeletePackage(userID, packageID))
if err != nil {
return fmt.Errorf("deleting package: %w", err)
}

return nil
}

func (svc *PackageService) GetCachedNodes(userID int64, packageID int64) (models.PackageCachingInfo, error) {
coorCli, err := globals.CoordinatorMQPool.Acquire()
if err != nil {
return models.PackageCachingInfo{}, fmt.Errorf("new coordinator client: %w", err)
}
defer coorCli.Close()

resp, err := coorCli.GetPackageCachedNodes(coormq.NewGetPackageCachedNodes(userID, packageID))
if err != nil {
return models.PackageCachingInfo{}, fmt.Errorf("get package cached nodes: %w", err)
}

tmp := models.PackageCachingInfo{
NodeInfos: resp.NodeInfos,
PackageSize: resp.PackageSize,
RedunancyType: resp.RedunancyType,
}
return tmp, nil
}

func (svc *PackageService) GetLoadedNodes(userID int64, packageID int64) ([]int64, error) {
coorCli, err := globals.CoordinatorMQPool.Acquire()
if err != nil {
return nil, fmt.Errorf("new coordinator client: %w", err)
}
defer coorCli.Close()

resp, err := coorCli.GetPackageLoadedNodes(coormq.NewGetPackageLoadedNodes(userID, packageID))
if err != nil {
return nil, fmt.Errorf("get package loaded nodes: %w", err)
}
return resp.NodeIDs, nil
}

+ 30
- 0
client/internal/services/scanner.go View File

@@ -0,0 +1,30 @@
package services

import (
"fmt"

"gitlink.org.cn/cloudream/storage-common/globals"
)

type ScannerService struct {
*Service
}

func (svc *Service) ScannerSvc() *ScannerService {
return &ScannerService{Service: svc}
}

func (svc *ScannerService) PostEvent(event any, isEmergency bool, dontMerge bool) error {
scCli, err := globals.ScannerMQPool.Acquire()
if err != nil {
return fmt.Errorf("new scacnner client: %w", err)
}
defer scCli.Close()

err = scCli.PostEvent(event, isEmergency, dontMerge)
if err != nil {
return fmt.Errorf("request to scanner failed, err: %w", err)
}

return nil
}

+ 18
- 0
client/internal/services/service.go View File

@@ -0,0 +1,18 @@
package services

import (
distlock "gitlink.org.cn/cloudream/common/pkgs/distlock/service"
"gitlink.org.cn/cloudream/storage-client/internal/task"
)

type Service struct {
DistLock *distlock.Service
TaskMgr *task.Manager
}

func NewService(distlock *distlock.Service, taskMgr *task.Manager) (*Service, error) {
return &Service{
DistLock: distlock,
TaskMgr: taskMgr,
}, nil
}

+ 90
- 0
client/internal/services/storage.go View File

@@ -0,0 +1,90 @@
package services

import (
"fmt"
"time"

"gitlink.org.cn/cloudream/common/models"
"gitlink.org.cn/cloudream/storage-client/internal/task"
"gitlink.org.cn/cloudream/storage-common/globals"
agtmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/agent"
coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator"
)

type StorageService struct {
*Service
}

func (svc *Service) StorageSvc() *StorageService {
return &StorageService{Service: svc}
}

func (svc *StorageService) StartStorageLoadPackage(userID int64, packageID int64, storageID int64) (string, error) {
tsk := svc.TaskMgr.StartNew(task.NewStorageLoadPackage(userID, packageID, storageID))
return tsk.ID(), nil
}

func (svc *StorageService) WaitStorageLoadPackage(taskID string, waitTimeout time.Duration) (bool, error) {
tsk := svc.TaskMgr.FindByID(taskID)
if tsk.WaitTimeout(waitTimeout) {
return true, tsk.Error()
}
return false, nil
}

func (svc *StorageService) DeleteStoragePackage(userID int64, packageID int64, storageID int64) error {
// TODO
panic("not implement yet")
}

// 请求节点启动从Storage中上传文件的任务。会返回节点ID和任务ID
func (svc *StorageService) StartStorageCreatePackage(userID int64, bucketID int64, name string, storageID int64, path string, redundancy models.TypedRedundancyInfo) (int64, string, error) {
coorCli, err := globals.CoordinatorMQPool.Acquire()
if err != nil {
return 0, "", fmt.Errorf("new coordinator client: %w", err)
}
defer coorCli.Close()

stgResp, err := coorCli.GetStorageInfo(coormq.NewGetStorageInfo(userID, storageID))
if err != nil {
return 0, "", fmt.Errorf("getting storage info: %w", err)
}

agentCli, err := globals.AgentMQPool.Acquire(stgResp.NodeID)
if err != nil {
return 0, "", fmt.Errorf("new agent client: %w", err)
}
defer agentCli.Close()

startResp, err := agentCli.StartStorageCreatePackage(agtmq.NewStartStorageCreatePackage(userID, bucketID, name, storageID, path, redundancy))
if err != nil {
return 0, "", fmt.Errorf("start storage upload package: %w", err)
}

return stgResp.NodeID, startResp.TaskID, nil
}

func (svc *StorageService) WaitStorageCreatePackage(nodeID int64, taskID string, waitTimeout time.Duration) (bool, int64, error) {
agentCli, err := globals.AgentMQPool.Acquire(nodeID)
if err != nil {
// TODO 失败是否要当做任务已经结束?
return true, 0, fmt.Errorf("new agent client: %w", err)
}
defer agentCli.Close()

waitResp, err := agentCli.WaitStorageCreatePackage(agtmq.NewWaitStorageCreatePackage(taskID, waitTimeout.Milliseconds()))
if err != nil {
// TODO 请求失败是否要当做任务已经结束?
return true, 0, fmt.Errorf("wait storage upload package: %w", err)
}

if !waitResp.IsComplete {
return false, 0, nil
}

if waitResp.Error != "" {
return true, 0, fmt.Errorf("%s", waitResp.Error)
}

return true, waitResp.PackageID, nil
}

+ 34
- 0
client/internal/task/create_ec_package.go View File

@@ -0,0 +1,34 @@
package task

import (
"time"

"gitlink.org.cn/cloudream/common/models"
"gitlink.org.cn/cloudream/storage-common/pkgs/cmd"
"gitlink.org.cn/cloudream/storage-common/pkgs/iterator"
)

type CreateECPackageResult = cmd.CreateECPackageResult

type CreateECPackage struct {
cmd cmd.CreateECPackage

Result *CreateECPackageResult
}

func NewCreateECPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, redundancy models.ECRedundancyInfo) *CreateECPackage {
return &CreateECPackage{
cmd: *cmd.NewCreateECPackage(userID, bucketID, name, objIter, redundancy),
}
}

func (t *CreateECPackage) Execute(ctx TaskContext, complete CompleteFn) {
ret, err := t.cmd.Execute(&cmd.UpdatePackageContext{
Distlock: ctx.distlock,
})
t.Result = ret

complete(err, CompleteOption{
RemovingDelay: time.Minute,
})
}

+ 34
- 0
client/internal/task/create_rep_package.go View File

@@ -0,0 +1,34 @@
package task

import (
"time"

"gitlink.org.cn/cloudream/common/models"
"gitlink.org.cn/cloudream/storage-common/pkgs/cmd"
"gitlink.org.cn/cloudream/storage-common/pkgs/iterator"
)

type CreateRepPackageResult = cmd.CreateRepPackageResult

type CreateRepPackage struct {
cmd cmd.CreateRepPackage

Result *CreateRepPackageResult
}

func NewCreateRepPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, redundancy models.RepRedundancyInfo) *CreateRepPackage {
return &CreateRepPackage{
cmd: *cmd.NewCreateRepPackage(userID, bucketID, name, objIter, redundancy),
}
}

func (t *CreateRepPackage) Execute(ctx TaskContext, complete CompleteFn) {
ret, err := t.cmd.Execute(&cmd.UpdatePackageContext{
Distlock: ctx.distlock,
})
t.Result = ret

complete(err, CompleteOption{
RemovingDelay: time.Minute,
})
}

+ 108
- 0
client/internal/task/storage_load_package.go View File

@@ -0,0 +1,108 @@
package task

import (
"fmt"
"time"

"gitlink.org.cn/cloudream/storage-common/globals"
"gitlink.org.cn/cloudream/storage-common/pkgs/distlock/reqbuilder"
agtmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/agent"
coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator"
)

type StorageLoadPackage struct {
userID int64
packageID int64
storageID int64
}

func NewStorageLoadPackage(userID int64, packageID int64, storageID int64) *StorageLoadPackage {
return &StorageLoadPackage{
userID: userID,
packageID: packageID,
storageID: storageID,
}
}

func (t *StorageLoadPackage) Execute(ctx TaskContext, complete CompleteFn) {
err := t.do(ctx)
complete(err, CompleteOption{
RemovingDelay: time.Minute,
})
}

func (t *StorageLoadPackage) do(ctx TaskContext) error {
mutex, err := reqbuilder.NewBuilder().
Metadata().
// 用于判断用户是否有Storage权限
UserStorage().ReadOne(t.packageID, t.storageID).
// 用于判断用户是否有对象权限
UserBucket().ReadAny().
// 用于读取包信息
Package().ReadOne(t.packageID).
// 用于读取对象信息
Object().ReadAny().
// 用于查询Rep配置
ObjectRep().ReadAny().
// 用于查询Block配置
ObjectBlock().ReadAny().
// 用于创建Move记录
StoragePackage().CreateOne(t.storageID, t.userID, t.packageID).
Storage().
// 用于创建对象文件
CreateOnePackage(t.storageID, t.userID, t.packageID).
MutexLock(ctx.distlock)
if err != nil {
return fmt.Errorf("acquire locks failed, err: %w", err)
}
defer mutex.Unlock()

coorCli, err := globals.CoordinatorMQPool.Acquire()
if err != nil {
return fmt.Errorf("new coordinator client: %w", err)
}
defer coorCli.Close()

getStgResp, err := coorCli.GetStorageInfo(coormq.NewGetStorageInfo(t.userID, t.storageID))
if err != nil {
return fmt.Errorf("getting storage info: %w", err)
}

// 然后向代理端发送移动文件的请求
agentClient, err := globals.AgentMQPool.Acquire(getStgResp.NodeID)
if err != nil {
return fmt.Errorf("create agent client to %d failed, err: %w", getStgResp.NodeID, err)
}
defer agentClient.Close()

agentMoveResp, err := agentClient.StartStorageLoadPackage(
agtmq.NewStartStorageLoadPackage(
t.userID,
t.packageID,
t.storageID,
))
if err != nil {
return fmt.Errorf("start loading package to storage: %w", err)
}

for {
waitResp, err := agentClient.WaitStorageLoadPackage(agtmq.NewWaitStorageLoadPackage(agentMoveResp.TaskID, int64(time.Second)*5))
if err != nil {
return fmt.Errorf("wait loading package: %w", err)
}

if waitResp.IsComplete {
if waitResp.Error != "" {
return fmt.Errorf("agent loading package: %s", waitResp.Error)
}

break
}
}

_, err = coorCli.StoragePackageLoaded(coormq.NewStoragePackageLoaded(t.userID, t.packageID, t.storageID))
if err != nil {
return fmt.Errorf("loading package to storage: %w", err)
}
return nil
}

+ 28
- 0
client/internal/task/task.go View File

@@ -0,0 +1,28 @@
package task

import (
distsvc "gitlink.org.cn/cloudream/common/pkgs/distlock/service"
"gitlink.org.cn/cloudream/common/pkgs/task"
)

type TaskContext struct {
distlock *distsvc.Service
}

// 需要在Task结束后主动调用,completing函数将在Manager加锁期间被调用,
// 因此适合进行执行结果的设置
type CompleteFn = task.CompleteFn

type Manager = task.Manager[TaskContext]

type TaskBody = task.TaskBody[TaskContext]

type Task = task.Task[TaskContext]

type CompleteOption = task.CompleteOption

func NewManager(distlock *distsvc.Service) Manager {
return task.NewManager(TaskContext{
distlock: distlock,
})
}

+ 34
- 0
client/internal/task/update_ec_package.go View File

@@ -0,0 +1,34 @@
package task

import (
"time"

"gitlink.org.cn/cloudream/storage-common/pkgs/cmd"
"gitlink.org.cn/cloudream/storage-common/pkgs/iterator"
)

type UpdateECPackageResult = cmd.UpdateECPackageResult

type UpdateECPackage struct {
cmd cmd.UpdateECPackage

Result *UpdateECPackageResult
}

func NewUpdateECPackage(userID int64, packageID int64, objectIter iterator.UploadingObjectIterator) *UpdateECPackage {
return &UpdateECPackage{
cmd: *cmd.NewUpdateECPackage(userID, packageID, objectIter),
}
}

func (t *UpdateECPackage) Execute(ctx TaskContext, complete CompleteFn) {
ret, err := t.cmd.Execute(&cmd.UpdatePackageContext{
Distlock: ctx.distlock,
})

t.Result = ret

complete(err, CompleteOption{
RemovingDelay: time.Minute,
})
}

+ 34
- 0
client/internal/task/update_rep_package.go View File

@@ -0,0 +1,34 @@
package task

import (
"time"

"gitlink.org.cn/cloudream/storage-common/pkgs/cmd"
"gitlink.org.cn/cloudream/storage-common/pkgs/iterator"
)

type UpdateRepPackageResult = cmd.UpdateRepPackageResult

type UpdateRepPackage struct {
cmd cmd.UpdateRepPackage

Result *UpdateRepPackageResult
}

func NewUpdateRepPackage(userID int64, packageID int64, objectIter iterator.UploadingObjectIterator) *UpdateRepPackage {
return &UpdateRepPackage{
cmd: *cmd.NewUpdateRepPackage(userID, packageID, objectIter),
}
}

func (t *UpdateRepPackage) Execute(ctx TaskContext, complete CompleteFn) {
ret, err := t.cmd.Execute(&cmd.UpdatePackageContext{
Distlock: ctx.distlock,
})

t.Result = ret

complete(err, CompleteOption{
RemovingDelay: time.Minute,
})
}

+ 20
- 0
client/magefiles/magefile.go View File

@@ -0,0 +1,20 @@
//go:build mage

package main

import (
"gitlink.org.cn/cloudream/common/magefiles"

//mage:import
_ "gitlink.org.cn/cloudream/common/magefiles/targets"
)

var Default = Build

func Build() error {
return magefiles.Build(magefiles.BuildArgs{
OutputName: "client",
OutputDir: "client",
AssetsDir: "assets",
})
}

+ 74
- 0
client/main.go View File

@@ -0,0 +1,74 @@
package main

import (
"fmt"
"os"

_ "google.golang.org/grpc/balancer/grpclb"

"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/storage-client/internal/cmdline"
"gitlink.org.cn/cloudream/storage-client/internal/config"
"gitlink.org.cn/cloudream/storage-client/internal/services"
"gitlink.org.cn/cloudream/storage-client/internal/task"
"gitlink.org.cn/cloudream/storage-common/globals"
"gitlink.org.cn/cloudream/storage-common/pkgs/distlock"
)

func main() {
err := config.Init()
if err != nil {
fmt.Printf("init config failed, err: %s", err.Error())
os.Exit(1)
}

err = logger.Init(&config.Cfg().Logger)
if err != nil {
fmt.Printf("init logger failed, err: %s", err.Error())
os.Exit(1)
}

globals.InitLocal(&config.Cfg().Local)
globals.InitMQPool(&config.Cfg().RabbitMQ)
globals.InitAgentRPCPool(&config.Cfg().AgentGRPC)
if config.Cfg().IPFS != nil {
logger.Infof("IPFS config is not empty, so create a ipfs client")

globals.InitIPFSPool(config.Cfg().IPFS)
}

distlockSvc, err := distlock.NewService(&config.Cfg().DistLock)
if err != nil {
logger.Warnf("new distlock service failed, err: %s", err.Error())
os.Exit(1)
}
go serveDistLock(distlockSvc)

taskMgr := task.NewManager(distlockSvc)

svc, err := services.NewService(distlockSvc, &taskMgr)
if err != nil {
logger.Warnf("new services failed, err: %s", err.Error())
os.Exit(1)
}

cmds, err := cmdline.NewCommandline(svc)
if err != nil {
logger.Warnf("new command line failed, err: %s", err.Error())
os.Exit(1)
}

cmds.DispatchCommand(os.Args[1:])
}

func serveDistLock(svc *distlock.Service) {
logger.Info("start serving distlock")

err := svc.Serve()

if err != nil {
logger.Errorf("distlock stopped with error: %s", err.Error())
}

logger.Info("distlock stopped")
}

Loading…
Cancel
Save