From 972160ccaf6d4e75a7aa7bf27f356495617d0965 Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Thu, 27 Jul 2023 17:01:48 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0agent=E5=9B=9E=E6=BA=90?= =?UTF-8?q?=E7=9A=84=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- go.mod | 13 ++ go.sum | 58 ++++++ internal/config/config.go | 20 +- internal/services/cmd/storage.go | 114 +++++++---- internal/task/ipfs_read.go | 4 +- internal/task/task.go | 12 +- internal/task/upload_rep_objects.go | 282 ++++++++++++++++++++++++++++ main.go | 30 ++- 8 files changed, 484 insertions(+), 49 deletions(-) create mode 100644 internal/task/upload_rep_objects.go diff --git a/go.mod b/go.mod index 36b5377..2958102 100644 --- a/go.mod +++ b/go.mod @@ -16,9 +16,12 @@ require ( github.com/baohan10/reedsolomon v0.0.0-20230406042632-43574cac9fa7 // indirect github.com/beevik/etree v1.2.0 // indirect github.com/benbjohnson/clock v1.3.0 // indirect + github.com/coreos/go-semver v0.3.0 // indirect + github.com/coreos/go-systemd/v22 v22.5.0 // indirect github.com/crackcomm/go-gitignore v0.0.0-20170627025303-887ab5e44cc3 // indirect github.com/decred/dcrd/dcrec/secp256k1/v4 v4.1.0 // indirect github.com/go-ping/ping v1.1.0 // indirect + github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/protobuf v1.5.3 // indirect github.com/google/uuid v1.3.0 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect @@ -26,6 +29,7 @@ require ( github.com/imdario/mergo v0.3.15 // indirect github.com/ipfs/boxo v0.8.0 // indirect github.com/ipfs/go-cid v0.4.1 // indirect + github.com/json-iterator/go v1.1.12 // indirect github.com/klauspost/cpuid/v2 v2.2.4 // indirect github.com/libp2p/go-buffer-pool v0.1.0 // indirect github.com/libp2p/go-flow-metrics v0.1.0 // indirect @@ -34,6 +38,8 @@ require ( github.com/minio/sha256-simd v1.0.0 // indirect github.com/mitchellh/go-homedir v1.1.0 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect + github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 // indirect + github.com/modern-go/reflect2 v1.0.2 // indirect github.com/mr-tron/base58 v1.2.0 // indirect github.com/multiformats/go-base32 v0.1.0 // indirect github.com/multiformats/go-base36 v0.2.0 // indirect @@ -48,7 +54,14 @@ require ( github.com/spaolacci/murmur3 v1.1.0 // indirect github.com/streadway/amqp v1.0.0 // indirect github.com/whyrusleeping/tar-utils v0.0.0-20201201191210-20a61371de5b // indirect + github.com/zyedidia/generic v1.2.1 // indirect gitlink.org.cn/cloudream/db v0.0.0 // indirect + go.etcd.io/etcd/api/v3 v3.5.9 // indirect + go.etcd.io/etcd/client/pkg/v3 v3.5.9 // indirect + go.etcd.io/etcd/client/v3 v3.5.9 // indirect + go.uber.org/atomic v1.10.0 // indirect + go.uber.org/multierr v1.11.0 // indirect + go.uber.org/zap v1.24.0 // indirect golang.org/x/crypto v0.8.0 // indirect golang.org/x/exp v0.0.0-20230519143937-03e91628a987 // indirect golang.org/x/net v0.9.0 // indirect diff --git a/go.sum b/go.sum index ea6ad71..6e35240 100644 --- a/go.sum +++ b/go.sum @@ -7,6 +7,10 @@ github.com/beevik/etree v1.2.0/go.mod h1:aiPf89g/1k3AShMVAzriilpcE4R/Vuor90y83zV github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A= github.com/benbjohnson/clock v1.3.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/cheekybits/is v0.0.0-20150225183255-68e9c0620927 h1:SKI1/fuSdodxmNNyVBR8d7X/HuLnRpvvFO0AgyQk764= +github.com/coreos/go-semver v0.3.0 h1:wkHLiw0WNATZnSG7epLsujiMCgPAc9xhjJ4tgnAxmfM= +github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= +github.com/coreos/go-systemd/v22 v22.5.0 h1:RrqgGjYQKalulkV8NGVIfkXQf6YYmOyiJKk8iXXhfZs= +github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/crackcomm/go-gitignore v0.0.0-20170627025303-887ab5e44cc3 h1:HVTnpeuvF6Owjd5mniCL8DEXo7uYXdQEmOP4FJbV5tg= github.com/crackcomm/go-gitignore v0.0.0-20170627025303-887ab5e44cc3/go.mod h1:p1d6YEZWvFzEh4KLyvBcVSnrfNDDvK2zfK/4x2v/4pE= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -17,11 +21,15 @@ github.com/decred/dcrd/dcrec/secp256k1/v4 v4.1.0 h1:HbphB4TFFXpv7MNrT52FGrrgVXF1 github.com/decred/dcrd/dcrec/secp256k1/v4 v4.1.0/go.mod h1:DZGJHZMqrU4JJqFAWUS2UO1+lbSKsdiOoYi9Zzey7Fc= github.com/go-ping/ping v1.1.0 h1:3MCGhVX4fyEUuhsfwPrsEdQw6xspHkv5zHsiSoDFZYw= github.com/go-ping/ping v1.1.0/go.mod h1:xIFjORFzTxqIV/tDVGO4eDy/bLuSyawEeojSm3GfRGk= +github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= +github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= +github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= +github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= @@ -39,7 +47,11 @@ github.com/ipfs/go-cid v0.4.1 h1:A/T3qGvxi4kpKWWcPC/PgbvDA2bjVLO7n4UeVwnbs/s= github.com/ipfs/go-cid v0.4.1/go.mod h1:uQHwDeX4c6CtyrFwdqyhpNcxVewur1M7l7fNU7LKwZk= github.com/ipfs/go-ipfs-api v0.6.0 h1:JARgG0VTbjyVhO5ZfesnbXv9wTcMvoKRBLF1SzJqzmg= github.com/ipfs/go-ipfs-api v0.6.0/go.mod h1:iDC2VMwN9LUpQV/GzEeZ2zNqd8NUdRmWcFM+K/6odf0= +github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= +github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo= +github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= +github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/cpuid/v2 v2.0.4/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/klauspost/cpuid/v2 v2.2.4 h1:acbojRNwl3o09bUq+yDCtZFc1aiwaAAxtcn8YkZXnvk= @@ -58,6 +70,10 @@ github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY= github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 h1:ZqeYNhU3OHLH3mGKHDcjJRFFRrJa6eAM5H+CtDdOsPc= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= +github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/mr-tron/base58 v1.2.0 h1:T/HDJBh4ZCPbU39/+c3rRvE0uKBQlU27+QI8LJ4t64o= github.com/mr-tron/base58 v1.2.0/go.mod h1:BinMc/sQntlIE1frQmRFPUoPA1Zkr8VRgBdjWI2mNwc= github.com/multiformats/go-base32 v0.1.0 h1:pVx9xoSPqEIQG8o+UbAe7DNi51oej1NtK+aGkbLYxPE= @@ -79,6 +95,7 @@ github.com/multiformats/go-varint v0.0.7/go.mod h1:r8PUYw/fD/SjBCiKOoDlGF6QawOEL github.com/otiai10/copy v1.12.0 h1:cLMgSQnXBs1eehF0Wy/FAGsgDTDmAqFR7rQylBb1nDY= github.com/otiai10/copy v1.12.0/go.mod h1:rSaLseMUsZFFbsFGc7wCJnnkTAvdc5L6VWxPE4308Ww= github.com/otiai10/mint v1.5.1 h1:XaPLeE+9vGbuyEHem1JNk3bYc7KKqyI/na0/mLd/Kks= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/samber/lo v1.38.1 h1:j2XEAqXKb09Am4ebOg31SpvzUTTs6EN3VfgeLUhPdXM= @@ -92,20 +109,53 @@ github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2 github.com/streadway/amqp v1.0.0 h1:kuuDrUJFZL1QYL9hUNuCxNObNzB0bV/ZG5jV3RWAQgo= github.com/streadway/amqp v1.0.0/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8= github.com/whyrusleeping/tar-utils v0.0.0-20201201191210-20a61371de5b h1:wA3QeTsaAXybLL2kb2cKhCAQTHgYTMwuI8lBlJSv5V8= github.com/whyrusleeping/tar-utils v0.0.0-20201201191210-20a61371de5b/go.mod h1:xT1Y5p2JR2PfSZihE0s4mjdJaRGp1waCTf5JzhQLBck= +github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/zyedidia/generic v1.2.1 h1:Zv5KS/N2m0XZZiuLS82qheRG4X1o5gsWreGb0hR7XDc= +github.com/zyedidia/generic v1.2.1/go.mod h1:ly2RBz4mnz1yeuVbQA/VFwGjK3mnHGRj1JuoG336Bis= +go.etcd.io/etcd/api/v3 v3.5.9 h1:4wSsluwyTbGGmyjJktOf3wFQoTBIURXHnq9n/G/JQHs= +go.etcd.io/etcd/api/v3 v3.5.9/go.mod h1:uyAal843mC8uUVSLWz6eHa/d971iDGnCRpmKd2Z+X8k= +go.etcd.io/etcd/client/pkg/v3 v3.5.9 h1:oidDC4+YEuSIQbsR94rY9gur91UPL6DnxDCIYd2IGsE= +go.etcd.io/etcd/client/pkg/v3 v3.5.9/go.mod h1:y+CzeSmkMpWN2Jyu1npecjB9BBnABxGM4pN8cGuJeL4= +go.etcd.io/etcd/client/v3 v3.5.9 h1:r5xghnU7CwbUxD/fbUtRyJGaYNfDun8sp/gTr1hew6E= +go.etcd.io/etcd/client/v3 v3.5.9/go.mod h1:i/Eo5LrZ5IKqpbtpPDuaUnDOUv471oDg8cjQaUr2MbA= +go.uber.org/atomic v1.10.0 h1:9qC72Qh0+3MqyJbAn8YU5xVq1frD8bn3JtD2oXtafVQ= +go.uber.org/atomic v1.10.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= +go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA= +go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= +go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= +go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60= +go.uber.org/zap v1.24.0/go.mod h1:2kMP+WWQ8aoFoedH3T2sq6iJ2yDWpHbP0f6MQbS9Gkg= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.8.0 h1:pd9TJtTueMTVQXzk8E2XESSMQDj/U7OUu0PqJqPXQjQ= golang.org/x/crypto v0.8.0/go.mod h1:mRqEX+O9/h5TFCrQhkgjo2yKi0yYA+9ecGkdQoHrywE= golang.org/x/exp v0.0.0-20230519143937-03e91628a987 h1:3xJIFvzUFbu4ls0BTBYcgbCGhA63eAOEMxIHugyXJqA= golang.org/x/exp v0.0.0-20230519143937-03e91628a987/go.mod h1:V1LtkGg67GoY2N1AnLN78QLrzxkLyJw7RJb1gzOOz9w= +golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20210316092652-d523dce5a7f4/go.mod h1:RBQZq4jEuRlivfhVLdyRGr576XBO4/greRjx4P4O3yc= golang.org/x/net v0.9.0 h1:aWJ/m6xSmxWBx+V0XRHTlrYrPG56jKsLdTFmsSsCzOM= golang.org/x/net v0.9.0/go.mod h1:d48xBJpPfHeWQsugry2m+kC02ZBRGRgulfHnEXEuWns= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210315160823-c6e025ad8005/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20220704084225-05e143d24a9e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -113,11 +163,18 @@ golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.7.0 h1:3jlCCIQZPdOYu1h8BkNvLz8Kgwtae2cagcG/VamtZRU= golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE= golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= +golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/genproto v0.0.0-20230403163135-c38d8f061ccd h1:sLpv7bNL1AsX3fdnWh9WVh7ejIzXdOc1RRHGeAmeStU= google.golang.org/genproto v0.0.0-20230403163135-c38d8f061ccd/go.mod h1:UUQDJDOlWu4KYeJZffbWgBkS1YFobzKbLVfK69pe0Ak= google.golang.org/grpc v1.54.0 h1:EhTqbhiYeixwWQtAEZAxmV9MGqcjEU2mFx52xCzNyag= @@ -127,6 +184,7 @@ google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQ google.golang.org/protobuf v1.30.0 h1:kPPoIgf3TsEvrm0PFe15JQ+570QVxYzEvvHqChK+cng= google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/config/config.go b/internal/config/config.go index 50888d6..5d05feb 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -1,6 +1,7 @@ package config import ( + "gitlink.org.cn/cloudream/common/pkg/distlock" log "gitlink.org.cn/cloudream/common/pkg/logger" c "gitlink.org.cn/cloudream/common/utils/config" "gitlink.org.cn/cloudream/common/utils/ipfs" @@ -8,14 +9,17 @@ import ( ) type Config struct { - ID int64 `json:"id"` - GRPCListenAddress string `json:"grpcListenAddress"` - LocalIP string `json:"localIP"` - StorageBaseDir string `json:"storageBaseDir"` - TempFileLifetime int `json:"tempFileLifetime"` // temp状态的副本最多能保持多久时间,单位:秒 - Logger log.Config `json:"logger"` - RabbitMQ racfg.Config `json:"rabbitMQ"` - IPFS ipfs.Config `json:"ipfs"` + ID int64 `json:"id"` + GRPCListenAddress string `json:"grpcListenAddress"` + GRPCPort int `json:"grpcPort"` + LocalIP string `json:"localIP"` + ExternalIP string `json:"externalIP"` + StorageBaseDir string `json:"storageBaseDir"` + TempFileLifetime int `json:"tempFileLifetime"` // temp状态的副本最多能保持多久时间,单位:秒 + Logger log.Config `json:"logger"` + RabbitMQ racfg.Config `json:"rabbitMQ"` + IPFS ipfs.Config `json:"ipfs"` + DistLock distlock.Config `json:"distlock"` } var cfg Config diff --git a/internal/services/cmd/storage.go b/internal/services/cmd/storage.go index d1d71ca..201c970 100644 --- a/internal/services/cmd/storage.go +++ b/internal/services/cmd/storage.go @@ -14,9 +14,7 @@ import ( "gitlink.org.cn/cloudream/agent/internal/task" "gitlink.org.cn/cloudream/common/consts" "gitlink.org.cn/cloudream/common/pkg/logger" - log "gitlink.org.cn/cloudream/common/pkg/logger" "gitlink.org.cn/cloudream/common/utils" - "gitlink.org.cn/cloudream/common/utils/serder" "gitlink.org.cn/cloudream/ec" "gitlink.org.cn/cloudream/common/consts/errorcode" @@ -24,43 +22,37 @@ import ( agtmsg "gitlink.org.cn/cloudream/rabbitmq/message/agent" ) -func (service *Service) StartMovingObjectToStorage(msg *agtmsg.StartMovingObjectToStorage) (*agtmsg.StartMovingObjectToStorageResp, *ramsg.CodeMessage) { +func (service *Service) StartStorageMoveObject(msg *agtmsg.StartStorageMoveObject) (*agtmsg.StartStorageMoveObjectResp, *ramsg.CodeMessage) { outFileName := utils.MakeMoveOperationFileName(msg.ObjectID, msg.UserID) outFilePath := filepath.Join(config.Cfg().StorageBaseDir, msg.Directory, outFileName) - if msg.Redundancy == consts.REDUNDANCY_REP { - taskID, err := service.moveRepObject(msg, outFilePath) + if repRed, ok := msg.Redundancy.(ramsg.RepRedundancyData); ok { + taskID, err := service.moveRepObject(repRed, outFilePath) if err != nil { logger.Warnf("move rep object as %s failed, err: %s", outFilePath, err.Error()) - return ramsg.ReplyFailed[agtmsg.StartMovingObjectToStorageResp](errorcode.OPERATION_FAILED, "move rep object failed") + return ramsg.ReplyFailed[agtmsg.StartStorageMoveObjectResp](errorcode.OPERATION_FAILED, "move rep object failed") } - return ramsg.ReplyOK(agtmsg.NewStartMovingObjectToStorageResp(taskID)) + return ramsg.ReplyOK(agtmsg.NewStartStorageMoveObjectResp(taskID)) } else { // TODO 处理其他备份类型 - return ramsg.ReplyFailed[agtmsg.StartMovingObjectToStorageResp](errorcode.OPERATION_FAILED, "not implement yet!") + return ramsg.ReplyFailed[agtmsg.StartStorageMoveObjectResp](errorcode.OPERATION_FAILED, "not implement yet!") } } -func (svc *Service) moveRepObject(msg *agtmsg.StartMovingObjectToStorage, outFilePath string) (string, error) { - var repInfo ramsg.ObjectRepInfo - err := serder.AnyToAny(msg.RedundancyData, &repInfo) - if err != nil { - return "", fmt.Errorf("redundancy data to rep info failed, err: %w", err) - } - - tsk := svc.taskManager.StartComparable(task.NewIPFSRead(repInfo.FileHash, outFilePath)) +func (svc *Service) moveRepObject(repData ramsg.RepRedundancyData, outFilePath string) (string, error) { + tsk := svc.taskManager.StartComparable(task.NewIPFSRead(repData.FileHash, outFilePath)) return tsk.ID(), nil } -func (svc *Service) WaitMovingObject(msg *agtmsg.WaitMovingObject) (*agtmsg.WaitMovingObjectResp, *ramsg.CodeMessage) { - log.WithField("TaskID", msg.TaskID).Debugf("wait moving object") +func (svc *Service) WaitStorageMoveObject(msg *agtmsg.WaitStorageMoveObject) (*agtmsg.WaitStorageMoveObjectResp, *ramsg.CodeMessage) { + logger.WithField("TaskID", msg.TaskID).Debugf("wait moving object") tsk := svc.taskManager.FindByID(msg.TaskID) if tsk == nil { - return ramsg.ReplyFailed[agtmsg.WaitMovingObjectResp](errorcode.TASK_NOT_FOUND, "task not found") + return ramsg.ReplyFailed[agtmsg.WaitStorageMoveObjectResp](errorcode.TASK_NOT_FOUND, "task not found") } if msg.WaitTimeoutMs == 0 { @@ -71,7 +63,7 @@ func (svc *Service) WaitMovingObject(msg *agtmsg.WaitMovingObject) (*agtmsg.Wait errMsg = tsk.Error().Error() } - return ramsg.ReplyOK(agtmsg.NewWaitMovingObjectResp(true, errMsg)) + return ramsg.ReplyOK(agtmsg.NewWaitStorageMoveObjectResp(true, errMsg)) } else { if tsk.WaitTimeout(time.Duration(msg.WaitTimeoutMs)) { @@ -81,20 +73,20 @@ func (svc *Service) WaitMovingObject(msg *agtmsg.WaitMovingObject) (*agtmsg.Wait errMsg = tsk.Error().Error() } - return ramsg.ReplyOK(agtmsg.NewWaitMovingObjectResp(true, errMsg)) + return ramsg.ReplyOK(agtmsg.NewWaitStorageMoveObjectResp(true, errMsg)) } - return ramsg.ReplyOK(agtmsg.NewWaitMovingObjectResp(false, "")) + return ramsg.ReplyOK(agtmsg.NewWaitStorageMoveObjectResp(false, "")) } } -func (svc *Service) CheckStorage(msg *agtmsg.CheckStorage) (*agtmsg.CheckStorageResp, *ramsg.CodeMessage) { +func (svc *Service) StorageCheck(msg *agtmsg.StorageCheck) (*agtmsg.StorageCheckResp, *ramsg.CodeMessage) { dirFullPath := filepath.Join(config.Cfg().StorageBaseDir, msg.Directory) infos, err := ioutil.ReadDir(dirFullPath) if err != nil { logger.Warnf("list storage directory failed, err: %s", err.Error()) - return ramsg.ReplyOK(agtmsg.NewCheckStorageResp( + return ramsg.ReplyOK(agtmsg.NewStorageCheckResp( err.Error(), nil, )) @@ -109,13 +101,13 @@ func (svc *Service) CheckStorage(msg *agtmsg.CheckStorage) (*agtmsg.CheckStorage } } -func (svc *Service) checkStorageIncrement(msg *agtmsg.CheckStorage, fileInfos []fs.FileInfo) (*agtmsg.CheckStorageResp, *ramsg.CodeMessage) { +func (svc *Service) checkStorageIncrement(msg *agtmsg.StorageCheck, fileInfos []fs.FileInfo) (*agtmsg.StorageCheckResp, *ramsg.CodeMessage) { infosMap := make(map[string]fs.FileInfo) for _, info := range fileInfos { infosMap[info.Name()] = info } - var entries []agtmsg.CheckStorageRespEntry + var entries []agtmsg.StorageCheckRespEntry for _, obj := range msg.Objects { fileName := utils.MakeMoveOperationFileName(obj.ObjectID, obj.UserID) _, ok := infosMap[fileName] @@ -127,23 +119,23 @@ func (svc *Service) checkStorageIncrement(msg *agtmsg.CheckStorage, fileInfos [] } else { // 只要文件不存在,就删除StorageObject表中的记录 - entries = append(entries, agtmsg.NewCheckStorageRespEntry(obj.ObjectID, obj.UserID, agtmsg.CHECK_STORAGE_RESP_OP_DELETE)) + entries = append(entries, agtmsg.NewStorageCheckRespEntry(obj.ObjectID, obj.UserID, agtmsg.CHECK_STORAGE_RESP_OP_DELETE)) } } // 增量情况下,不需要对infosMap中没检查的记录进行处理 - return ramsg.ReplyOK(agtmsg.NewCheckStorageResp(consts.STORAGE_DIRECTORY_STATE_OK, entries)) + return ramsg.ReplyOK(agtmsg.NewStorageCheckResp(consts.STORAGE_DIRECTORY_STATE_OK, entries)) } -func (svc *Service) checkStorageComplete(msg *agtmsg.CheckStorage, fileInfos []fs.FileInfo) (*agtmsg.CheckStorageResp, *ramsg.CodeMessage) { +func (svc *Service) checkStorageComplete(msg *agtmsg.StorageCheck, fileInfos []fs.FileInfo) (*agtmsg.StorageCheckResp, *ramsg.CodeMessage) { infosMap := make(map[string]fs.FileInfo) for _, info := range fileInfos { infosMap[info.Name()] = info } - var entries []agtmsg.CheckStorageRespEntry + var entries []agtmsg.StorageCheckRespEntry for _, obj := range msg.Objects { fileName := utils.MakeMoveOperationFileName(obj.ObjectID, obj.UserID) _, ok := infosMap[fileName] @@ -155,17 +147,17 @@ func (svc *Service) checkStorageComplete(msg *agtmsg.CheckStorage, fileInfos []f } else { // 只要文件不存在,就删除StorageObject表中的记录 - entries = append(entries, agtmsg.NewCheckStorageRespEntry(obj.ObjectID, obj.UserID, agtmsg.CHECK_STORAGE_RESP_OP_DELETE)) + entries = append(entries, agtmsg.NewStorageCheckRespEntry(obj.ObjectID, obj.UserID, agtmsg.CHECK_STORAGE_RESP_OP_DELETE)) } } // Storage中多出来的文件不做处理 - return ramsg.ReplyOK(agtmsg.NewCheckStorageResp(consts.STORAGE_DIRECTORY_STATE_OK, entries)) + return ramsg.ReplyOK(agtmsg.NewStorageCheckResp(consts.STORAGE_DIRECTORY_STATE_OK, entries)) } /* -func (service *Service) ECMove(msg *agtmsg.ECMoveCommand) *agtmsg.StartMovingObjectToStorageResp { +func (service *Service) ECMove(msg *agtmsg.ECMoveCommand) *agtmsg.StartStorageMoveObjectResp { panic("not implement yet!") wg := sync.WaitGroup{} fmt.Println("EcMove") @@ -297,3 +289,59 @@ func persist(inBuf []chan []byte, numPacket int64, localFilePath string, wg *syn file.Close() wg.Done() } + +func (svc *Service) StartStorageUploadRepObject(msg *agtmsg.StartStorageUploadRepObject) (*agtmsg.StartStorageUploadRepObjectResp, *ramsg.CodeMessage) { + fullPath := filepath.Join(config.Cfg().StorageBaseDir, msg.StorageDirectory, msg.FilePath) + + file, err := os.Open(fullPath) + if err != nil { + logger.Warnf("opening file %s: %s", fullPath, err.Error()) + return nil, ramsg.Failed(errorcode.OPERATION_FAILED, "open file failed") + } + + fileInfo, err := file.Stat() + if err != nil { + file.Close() + logger.Warnf("getting file %s state: %s", fullPath, err.Error()) + return nil, ramsg.Failed(errorcode.OPERATION_FAILED, "get file info failed") + } + fileSize := fileInfo.Size() + + uploadObject := task.UploadObject{ + ObjectName: msg.ObjectName, + File: file, + FileSize: fileSize, + } + uploadObjects := []task.UploadObject{uploadObject} + + // Task会关闭文件流 + tsk := svc.taskManager.StartNew(task.NewUploadRepObjects(msg.UserID, msg.BucketID, uploadObjects, msg.RepCount)) + return ramsg.ReplyOK(agtmsg.NewStartStorageUploadRepObjectResp(tsk.ID())) +} + +func (svc *Service) WaitStorageUploadRepObject(msg *agtmsg.WaitStorageUploadRepObject) (*agtmsg.WaitStorageUploadRepObjectResp, *ramsg.CodeMessage) { + tsk := svc.taskManager.FindByID(msg.TaskID) + if tsk == nil { + return nil, ramsg.Failed(errorcode.TASK_NOT_FOUND, "task not found") + } + + if msg.WaitTimeoutMs == 0 { + tsk.Wait() + } else if !tsk.WaitTimeout(time.Duration(msg.WaitTimeoutMs)) { + return ramsg.ReplyOK(agtmsg.NewWaitStorageUploadRepObjectResp(false, "", 0, "")) + } + + uploadTask := tsk.Body().(*task.UploadRepObjects) + uploadRet := uploadTask.Results[0] + + errMsg := "" + if tsk.Error() != nil { + errMsg = tsk.Error().Error() + } + + if uploadRet.Error != nil { + errMsg = uploadRet.Error.Error() + } + + return ramsg.ReplyOK(agtmsg.NewWaitStorageUploadRepObjectResp(true, errMsg, uploadRet.ObjectID, uploadRet.FileHash)) +} diff --git a/internal/task/ipfs_read.go b/internal/task/ipfs_read.go index 6971914..a1ae7e6 100644 --- a/internal/task/ipfs_read.go +++ b/internal/task/ipfs_read.go @@ -7,7 +7,6 @@ import ( "path/filepath" "time" - "github.com/juju/ratelimit" "gitlink.org.cn/cloudream/common/pkg/logger" ) @@ -73,8 +72,7 @@ func (t *IPFSRead) Execute(ctx TaskContext, complete CompleteFn) { return } - bkt := ratelimit.NewBucketWithRate(10*1024, 10*1024) - _, err = io.Copy(outputFile, ratelimit.Reader(rd, bkt)) + _, err = io.Copy(outputFile, rd) if err != nil { err := fmt.Errorf("copy ipfs file to local file failed, err: %w", err) log.WithField("LocalPath", t.LocalPath).Warn(err.Error()) diff --git a/internal/task/task.go b/internal/task/task.go index 5c76beb..47fb437 100644 --- a/internal/task/task.go +++ b/internal/task/task.go @@ -1,12 +1,16 @@ package task import ( + distsvc "gitlink.org.cn/cloudream/common/pkg/distlock/service" "gitlink.org.cn/cloudream/common/pkg/task" "gitlink.org.cn/cloudream/common/utils/ipfs" + coorcli "gitlink.org.cn/cloudream/rabbitmq/client/coordinator" ) type TaskContext struct { - IPFS *ipfs.IPFS + IPFS *ipfs.IPFS + Coordinator *coorcli.Client + DistLock *distsvc.Service } // 需要在Task结束后主动调用,completing函数将在Manager加锁期间被调用, @@ -21,8 +25,10 @@ type Task = task.Task[TaskContext] type CompleteOption = task.CompleteOption -func NewManager(ipfs *ipfs.IPFS) Manager { +func NewManager(ipfs *ipfs.IPFS, coorCli *coorcli.Client, distLock *distsvc.Service) Manager { return task.NewManager(TaskContext{ - IPFS: ipfs, + IPFS: ipfs, + Coordinator: coorCli, + DistLock: distLock, }) } diff --git a/internal/task/upload_rep_objects.go b/internal/task/upload_rep_objects.go new file mode 100644 index 0000000..4a0a3d8 --- /dev/null +++ b/internal/task/upload_rep_objects.go @@ -0,0 +1,282 @@ +package task + +import ( + "fmt" + "io" + "math/rand" + "time" + + "github.com/samber/lo" + "gitlink.org.cn/cloudream/agent/internal/config" + "gitlink.org.cn/cloudream/common/pkg/distlock/reqbuilder" + "gitlink.org.cn/cloudream/common/pkg/logger" + mygrpc "gitlink.org.cn/cloudream/common/utils/grpc" + "gitlink.org.cn/cloudream/common/utils/ipfs" + + agentcaller "gitlink.org.cn/cloudream/proto" + ramsg "gitlink.org.cn/cloudream/rabbitmq/message" + coormsg "gitlink.org.cn/cloudream/rabbitmq/message/coordinator" + + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +// UploadObjects和UploadRepResults为一一对应关系 +type UploadRepObjects struct { + userID int64 + bucketID int64 + repCount int + Objects []UploadObject + Results []UploadSingleRepObjectResult + IsUploading bool +} + +type UploadRepObjectsResult struct { + Objects []UploadObject + Results []UploadSingleRepObjectResult + IsUploading bool +} + +type UploadObject struct { + ObjectName string + File io.ReadCloser + FileSize int64 +} + +type UploadSingleRepObjectResult struct { + Error error + FileHash string + ObjectID int64 +} + +func NewUploadRepObjects(userID int64, bucketID int64, uploadObjects []UploadObject, repCount int) *UploadRepObjects { + return &UploadRepObjects{ + userID: userID, + bucketID: bucketID, + Objects: uploadObjects, + repCount: repCount, + } +} + +func (t *UploadRepObjects) Execute(ctx TaskContext, complete CompleteFn) { + log := logger.WithType[UploadRepObjects]("Task") + log.Debugf("begin with %v", logger.FormatStruct(t)) + defer log.Debugf("end") + + err := t.do(ctx) + complete(err, CompleteOption{ + RemovingDelay: time.Minute, + }) + for _, obj := range t.Objects { + obj.File.Close() + } +} + +func (t *UploadRepObjects) do(ctx TaskContext) error { + + reqBlder := reqbuilder.NewBuilder() + for _, uploadObject := range t.Objects { + reqBlder.Metadata(). + // 用于防止创建了多个同名对象 + Object().CreateOne(t.bucketID, uploadObject.ObjectName) + } + mutex, err := reqBlder. + Metadata(). + // 用于判断用户是否有桶的权限 + UserBucket().ReadOne(t.userID, t.bucketID). + // 用于查询可用的上传节点 + Node().ReadAny(). + // 用于设置Rep配置 + ObjectRep().CreateAny(). + // 用于创建Cache记录 + Cache().CreateAny(). + MutexLock(ctx.DistLock) + if err != nil { + return fmt.Errorf("acquire locks failed, err: %w", err) + } + defer mutex.Unlock() + + var repWriteResps []*coormsg.PreUploadResp + + //判断是否所有文件都符合上传条件 + hasFailure := true + for i := 0; i < len(t.Objects); i++ { + repWriteResp, err := t.preUploadSingleObject(ctx, t.Objects[i]) + if err != nil { + hasFailure = false + t.Results = append(t.Results, + UploadSingleRepObjectResult{ + Error: err, + FileHash: "", + ObjectID: 0, + }) + continue + } + t.Results = append(t.Results, UploadSingleRepObjectResult{}) + repWriteResps = append(repWriteResps, repWriteResp) + } + + // 不满足上传条件,返回各文件检查结果 + if !hasFailure { + return nil + } + + //上传文件夹 + t.IsUploading = true + for i := 0; i < len(repWriteResps); i++ { + objectID, fileHash, err := t.uploadSingleObject(ctx, t.Objects[i], repWriteResps[i]) + // 记录文件上传结果 + t.Results[i] = UploadSingleRepObjectResult{ + Error: err, + FileHash: fileHash, + ObjectID: objectID, + } + } + return nil +} + +// 检查单个文件是否能够上传 +func (t *UploadRepObjects) preUploadSingleObject(ctx TaskContext, uploadObject UploadObject) (*coormsg.PreUploadResp, error) { + //发送写请求,请求Coor分配写入节点Ip + repWriteResp, err := ctx.Coordinator.PreUploadRepObject(coormsg.NewPreUploadRepObjectBody(t.bucketID, uploadObject.ObjectName, uploadObject.FileSize, t.userID, config.Cfg().ExternalIP)) + if err != nil { + return nil, fmt.Errorf("pre upload rep object: %w", err) + } + if len(repWriteResp.Nodes) == 0 { + return nil, fmt.Errorf("no node to upload file") + } + return repWriteResp, nil +} + +// 上传文件 +func (t *UploadRepObjects) uploadSingleObject(ctx TaskContext, uploadObject UploadObject, preResp *coormsg.PreUploadResp) (int64, string, error) { + + var fileHash string + uploadedNodeIDs := []int64{} + willUploadToNode := true + + // 因为本地的IPFS属于调度系统的一部分,所以需要加锁 + mutex, err := reqbuilder.NewBuilder(). + IPFS().CreateAnyRep(config.Cfg().ID). + MutexLock(ctx.DistLock) + if err != nil { + return 0, "", fmt.Errorf("acquiring locks: %w", err) + } + + fileHash, err = uploadToLocalIPFS(ctx.IPFS, uploadObject.File) + if err != nil { + // 上传失败,则立刻解锁 + mutex.Unlock() + + logger.Warnf("uploading to local IPFS: %s, will select a node to upload", err.Error()) + + } else { + willUploadToNode = false + uploadedNodeIDs = append(uploadedNodeIDs, config.Cfg().ID) + + // 上传成功,则等到所有操作结束后才能解锁 + defer mutex.Unlock() + } + + // 本地IPFS失败,则发送到agent上传 + if willUploadToNode { + // 本地IPFS已经失败,所以不要再选择当前节点了 + uploadNode := t.chooseUploadNode(lo.Reject(preResp.Nodes, func(item ramsg.RespNode, index int) bool { return item.ID == config.Cfg().ID })) + + // 如果客户端与节点在同一个地域,则使用内网地址连接节点 + nodeIP := uploadNode.ExternalIP + if uploadNode.IsSameLocation { + nodeIP = uploadNode.LocalIP + + logger.Infof("client and node %d are at the same location, use local ip\n", uploadNode.ID) + } + + mutex, err := reqbuilder.NewBuilder(). + // 防止上传的副本被清除 + IPFS().CreateAnyRep(uploadNode.ID). + MutexLock(ctx.DistLock) + if err != nil { + return 0, "", fmt.Errorf("acquire locks failed, err: %w", err) + } + defer mutex.Unlock() + + fileHash, err = uploadToNode(uploadObject.File, nodeIP) + if err != nil { + return 0, "", fmt.Errorf("upload to node %s failed, err: %w", nodeIP, err) + } + uploadedNodeIDs = append(uploadedNodeIDs, uploadNode.ID) + } + + // 记录写入的文件的Hash + createResp, err := ctx.Coordinator.CreateRepObject(coormsg.NewCreateRepObject(t.bucketID, uploadObject.ObjectName, uploadObject.FileSize, t.repCount, t.userID, uploadedNodeIDs, fileHash)) + if err != nil { + return 0, "", fmt.Errorf("creating rep object: %w", err) + } + + return createResp.ObjectID, fileHash, nil +} + +// chooseUploadNode 选择一个上传文件的节点 +// 1. 从与当前客户端相同地域的节点中随机选一个 +// 2. 没有用的话从所有节点中随机选一个 +func (t *UploadRepObjects) chooseUploadNode(nodes []ramsg.RespNode) ramsg.RespNode { + sameLocationNodes := lo.Filter(nodes, func(e ramsg.RespNode, i int) bool { return e.IsSameLocation }) + if len(sameLocationNodes) > 0 { + return sameLocationNodes[rand.Intn(len(sameLocationNodes))] + } + + return nodes[rand.Intn(len(nodes))] +} + +func uploadToNode(file io.ReadCloser, nodeIP string) (string, error) { + // 建立grpc连接,发送请求 + grpcAddr := fmt.Sprintf("%s:%d", nodeIP, config.Cfg().GRPCPort) + grpcCon, err := grpc.Dial(grpcAddr, grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + return "", fmt.Errorf("connect to grpc server at %s failed, err: %w", grpcAddr, err) + } + defer grpcCon.Close() + + client := agentcaller.NewFileTransportClient(grpcCon) + upload, err := mygrpc.SendFileAsStream(client) + if err != nil { + return "", fmt.Errorf("request to send file failed, err: %w", err) + } + + // 发送文件数据 + _, err = io.Copy(upload, file) + if err != nil { + // 发生错误则关闭连接 + upload.Abort(io.ErrClosedPipe) + return "", fmt.Errorf("copy file date to upload stream failed, err: %w", err) + } + + // 发送EOF消息,并获得FileHash + fileHash, err := upload.Finish() + if err != nil { + upload.Abort(io.ErrClosedPipe) + return "", fmt.Errorf("send EOF failed, err: %w", err) + } + + return fileHash, nil +} + +func uploadToLocalIPFS(ipfs *ipfs.IPFS, file io.ReadCloser) (string, error) { + // 从本地IPFS上传文件 + writer, err := ipfs.CreateFile() + if err != nil { + return "", fmt.Errorf("create IPFS file failed, err: %w", err) + } + + _, err = io.Copy(writer, file) + if err != nil { + return "", fmt.Errorf("copy file data to IPFS failed, err: %w", err) + } + + fileHash, err := writer.Finish() + if err != nil { + return "", fmt.Errorf("finish writing IPFS failed, err: %w", err) + } + + return fileHash, nil +} diff --git a/main.go b/main.go index 491b5f2..4979110 100644 --- a/main.go +++ b/main.go @@ -8,12 +8,14 @@ import ( "gitlink.org.cn/cloudream/agent/internal/config" "gitlink.org.cn/cloudream/agent/internal/task" + distsvc "gitlink.org.cn/cloudream/common/pkg/distlock/service" log "gitlink.org.cn/cloudream/common/pkg/logger" "gitlink.org.cn/cloudream/common/utils/ipfs" agentserver "gitlink.org.cn/cloudream/proto" "google.golang.org/grpc" + "gitlink.org.cn/cloudream/rabbitmq/client/coordinator" rasvr "gitlink.org.cn/cloudream/rabbitmq/server/agent" cmdsvc "gitlink.org.cn/cloudream/agent/internal/services/cmd" @@ -44,11 +46,21 @@ func main() { log.Fatalf("new ipfs failed, err: %s", err.Error()) } + coor, err := coordinator.NewClient(&config.Cfg().RabbitMQ) + if err != nil { + log.Fatalf("new ipfs failed, err: %s", err.Error()) + } + + distlock, err := distsvc.NewService(&config.Cfg().DistLock) + if err != nil { + log.Fatalf("new ipfs failed, err: %s", err.Error()) + } + //处置协调端、客户端命令(可多建几个) wg := sync.WaitGroup{} - wg.Add(4) + wg.Add(5) - taskMgr := task.NewManager(ipfs) + taskMgr := task.NewManager(ipfs, coor, distlock) // 启动命令服务器 // TODO 需要设计AgentID持久化机制 @@ -74,6 +86,8 @@ func main() { agentserver.RegisterFileTransportServer(s, grpcsvc.NewService(ipfs)) go serveGRPC(s, lis, &wg) + go serveDistLock(distlock) + wg.Wait() } @@ -104,3 +118,15 @@ func serveGRPC(s *grpc.Server, lis net.Listener, wg *sync.WaitGroup) { wg.Done() } + +func serveDistLock(svc *distsvc.Service) { + log.Info("start serving distlock") + + err := svc.Serve() + + if err != nil { + log.Errorf("distlock stopped with error: %s", err.Error()) + } + + log.Info("distlock stopped") +}