Browse Source

调整项目结构

gitlink
Sydonian 2 years ago
parent
commit
e3db987296
67 changed files with 4063 additions and 0 deletions
  1. +40
    -0
      go.mod
  2. +84
    -0
      go.sum
  3. +73
    -0
      models/models.go
  4. +120
    -0
      pkgs/db/bucket.go
  5. +97
    -0
      pkgs/db/cache.go
  6. +21
    -0
      pkgs/db/config/config.go
  7. +238
    -0
      pkgs/db/db.go
  8. +30
    -0
      pkgs/db/ec.go
  9. +103
    -0
      pkgs/db/model/model.go
  10. +48
    -0
      pkgs/db/node.go
  11. +316
    -0
      pkgs/db/object.go
  12. +72
    -0
      pkgs/db/object_block.go
  13. +66
    -0
      pkgs/db/object_rep.go
  14. +64
    -0
      pkgs/db/storage.go
  15. +116
    -0
      pkgs/db/storage_object.go
  16. +14
    -0
      pkgs/db/user_bucket.go
  17. +38
    -0
      pkgs/ec/rs.go
  18. +10
    -0
      pkgs/mq/client/agent/agent.go
  19. +28
    -0
      pkgs/mq/client/agent/client.go
  20. +10
    -0
      pkgs/mq/client/agent/ipfs.go
  21. +14
    -0
      pkgs/mq/client/agent/object.go
  22. +26
    -0
      pkgs/mq/client/agent/storage.go
  23. +14
    -0
      pkgs/mq/client/coordinator/agent.go
  24. +22
    -0
      pkgs/mq/client/coordinator/bucket.go
  25. +26
    -0
      pkgs/mq/client/coordinator/client.go
  26. +42
    -0
      pkgs/mq/client/coordinator/object.go
  27. +18
    -0
      pkgs/mq/client/coordinator/storage.go
  28. +26
    -0
      pkgs/mq/client/scanner/client.go
  29. +25
    -0
      pkgs/mq/client/scanner/event.go
  30. +14
    -0
      pkgs/mq/config/config.go
  31. +12
    -0
      pkgs/mq/consts.go
  32. +27
    -0
      pkgs/mq/message/agent/agent.go
  33. +50
    -0
      pkgs/mq/message/agent/ipfs.go
  34. +55
    -0
      pkgs/mq/message/agent/object.go
  35. +184
    -0
      pkgs/mq/message/agent/storage.go
  36. +41
    -0
      pkgs/mq/message/coordinator/agent.go
  37. +102
    -0
      pkgs/mq/message/coordinator/bucket.go
  38. +24
    -0
      pkgs/mq/message/coordinator/coordinator_test.go
  39. +295
    -0
      pkgs/mq/message/coordinator/object.go
  40. +100
    -0
      pkgs/mq/message/coordinator/storage.go
  41. +112
    -0
      pkgs/mq/message/publics.go
  42. +31
    -0
      pkgs/mq/message/scanner/event.go
  43. +17
    -0
      pkgs/mq/message/scanner/event/agent_check_cache.go
  44. +15
    -0
      pkgs/mq/message/scanner/event/agent_check_state.go
  45. +17
    -0
      pkgs/mq/message/scanner/event/agent_check_storage.go
  46. +15
    -0
      pkgs/mq/message/scanner/event/check_cache.go
  47. +15
    -0
      pkgs/mq/message/scanner/event/check_object.go
  48. +15
    -0
      pkgs/mq/message/scanner/event/check_rep_count.go
  49. +25
    -0
      pkgs/mq/message/scanner/event/event.go
  50. +14
    -0
      pkgs/mq/server/agent/agent.go
  51. +14
    -0
      pkgs/mq/server/agent/ipfs.go
  52. +16
    -0
      pkgs/mq/server/agent/object.go
  53. +67
    -0
      pkgs/mq/server/agent/server.go
  54. +30
    -0
      pkgs/mq/server/agent/storage.go
  55. +15
    -0
      pkgs/mq/server/coordinator/agent.go
  56. +26
    -0
      pkgs/mq/server/coordinator/bucket.go
  57. +15
    -0
      pkgs/mq/server/coordinator/coordinator_test.go
  58. +38
    -0
      pkgs/mq/server/coordinator/object.go
  59. +67
    -0
      pkgs/mq/server/coordinator/server.go
  60. +18
    -0
      pkgs/mq/server/coordinator/storage.go
  61. +11
    -0
      pkgs/mq/server/scanner/event.go
  62. +61
    -0
      pkgs/mq/server/scanner/server.go
  63. +2
    -0
      pkgs/proto/Makefile
  64. +343
    -0
      pkgs/proto/file_transport.pb.go
  65. +30
    -0
      pkgs/proto/file_transport.proto
  66. +209
    -0
      pkgs/proto/file_transport_grpc.pb.go
  67. +120
    -0
      utils/grpc/file_transport.go

+ 40
- 0
go.mod View File

@@ -0,0 +1,40 @@
module gitlink.org.cn/cloudream/storage-common

require (
github.com/baohan10/reedsolomon v0.0.0-20230406042632-43574cac9fa7
github.com/go-sql-driver/mysql v1.7.1
github.com/jmoiron/sqlx v1.3.5
github.com/samber/lo v1.36.0
github.com/smartystreets/goconvey v1.8.0
gitlink.org.cn/cloudream/common v0.0.0
google.golang.org/grpc v1.54.0
google.golang.org/protobuf v1.30.0
)

require (
github.com/antonfisher/nested-logrus-formatter v1.3.1 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/gopherjs/gopherjs v1.17.2 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/jtolds/gls v4.20.0+incompatible // indirect
github.com/klauspost/cpuid/v2 v2.2.3 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/sirupsen/logrus v1.9.2 // indirect
github.com/smartystreets/assertions v1.13.1 // indirect
github.com/streadway/amqp v1.1.0 // indirect
github.com/zyedidia/generic v1.2.1 // indirect
golang.org/x/exp v0.0.0-20230519143937-03e91628a987 // indirect
golang.org/x/net v0.8.0 // indirect
golang.org/x/sys v0.6.0 // indirect
golang.org/x/text v0.8.0 // indirect
google.golang.org/genproto v0.0.0-20230403163135-c38d8f061ccd // indirect
)

go 1.20

replace gitlink.org.cn/cloudream/common v0.0.0 => ../../common

+ 84
- 0
go.sum View File

@@ -0,0 +1,84 @@
github.com/antonfisher/nested-logrus-formatter v1.3.1 h1:NFJIr+pzwv5QLHTPyKz9UMEoHck02Q9L0FP13b/xSbQ=
github.com/antonfisher/nested-logrus-formatter v1.3.1/go.mod h1:6WTfyWFkBc9+zyBaKIqRrg/KwMqBbodBjgbHjDz7zjA=
github.com/baohan10/reedsolomon v0.0.0-20230406042632-43574cac9fa7 h1:wcvD6enR///dFvb9cRodx5SGbPH4G4jPjw+aVIWkAKE=
github.com/baohan10/reedsolomon v0.0.0-20230406042632-43574cac9fa7/go.mod h1:rAxMF6pVaFK/s6T4gGczvloccNbtwzuYaP2Y7W6flE8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
github.com/go-sql-driver/mysql v1.7.1 h1:lUIinVbN1DY0xBg0eMOzmmtGoHwWBbvnWubQUrtU8EI=
github.com/go-sql-driver/mysql v1.7.1/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gopherjs/gopherjs v1.17.2 h1:fQnZVsXk8uxXIStYb0N4bGk7jeyTalG/wsZjQ25dO0g=
github.com/gopherjs/gopherjs v1.17.2/go.mod h1:pRRIvn/QzFLrKfvEz3qUuEhtE/zLCWfreZ6J5gM2i+k=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I=
github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM=
github.com/jmoiron/sqlx v1.3.5 h1:vFFPA71p1o5gAeqtEAwLU4dnX2napprKtHr7PYIcN3g=
github.com/jmoiron/sqlx v1.3.5/go.mod h1:nRVWtLre0KfCLJvgxzCsLVMogSvQ1zNJtpYr2Ccp0mQ=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo=
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
github.com/klauspost/cpuid/v2 v2.2.3 h1:sxCkb+qR91z4vsqw4vGGZlDgPz3G7gjaLyK3V8y70BU=
github.com/klauspost/cpuid/v2 v2.2.3/go.mod h1:RVVoqg1df56z8g3pUjL/3lE5UfnlrJX8tyFgg4nqhuY=
github.com/lib/pq v1.2.0 h1:LXpIM/LZ5xGFhOpXAQUIMM1HdyqzVYM13zNdjCEEcA0=
github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/mattn/go-sqlite3 v1.14.6 h1:dNPt6NO46WmLVt2DLNpwczCmdV5boIZ6g/tlDrlRUbg=
github.com/mattn/go-sqlite3 v1.14.6/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU=
github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY=
github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 h1:ZqeYNhU3OHLH3mGKHDcjJRFFRrJa6eAM5H+CtDdOsPc=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/samber/lo v1.36.0 h1:4LaOxH1mHnbDGhTVE0i1z8v/lWaQW8AIfOD3HU4mSaw=
github.com/samber/lo v1.36.0/go.mod h1:HLeWcJRRyLKp3+/XBJvOrerCQn9mhdKMHyd7IRlgeQ8=
github.com/sirupsen/logrus v1.9.2 h1:oxx1eChJGI6Uks2ZC4W1zpLlVgqB8ner4EuQwV4Ik1Y=
github.com/sirupsen/logrus v1.9.2/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
github.com/smartystreets/assertions v1.13.1 h1:Ef7KhSmjZcK6AVf9YbJdvPYG9avaF0ZxudX+ThRdWfU=
github.com/smartystreets/assertions v1.13.1/go.mod h1:cXr/IwVfSo/RbCSPhoAPv73p3hlSdrBH/b3SdnW/LMY=
github.com/smartystreets/goconvey v1.8.0 h1:Oi49ha/2MURE0WexF052Z0m+BNSGirfjg5RL+JXWq3w=
github.com/smartystreets/goconvey v1.8.0/go.mod h1:EdX8jtrTIj26jmjCOVNMVSIYAtgexqXKHOXW2Dx9JLg=
github.com/streadway/amqp v1.1.0 h1:py12iX8XSyI7aN/3dUT8DFIDJazNJsVJdxNVEpnQTZM=
github.com/streadway/amqp v1.1.0/go.mod h1:WYSrTEYHOXHd0nwFeUXAe2G2hRnQT+deZJJf88uS9Bg=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
github.com/thoas/go-funk v0.9.1 h1:O549iLZqPpTUQ10ykd26sZhzD+rmR5pWhuElrhbC20M=
github.com/zyedidia/generic v1.2.1 h1:Zv5KS/N2m0XZZiuLS82qheRG4X1o5gsWreGb0hR7XDc=
github.com/zyedidia/generic v1.2.1/go.mod h1:ly2RBz4mnz1yeuVbQA/VFwGjK3mnHGRj1JuoG336Bis=
golang.org/x/exp v0.0.0-20230519143937-03e91628a987 h1:3xJIFvzUFbu4ls0BTBYcgbCGhA63eAOEMxIHugyXJqA=
golang.org/x/exp v0.0.0-20230519143937-03e91628a987/go.mod h1:V1LtkGg67GoY2N1AnLN78QLrzxkLyJw7RJb1gzOOz9w=
golang.org/x/net v0.8.0 h1:Zrh2ngAOFYneWTAIAPethzeaQLuHwhuBkuV6ZiRnUaQ=
golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc=
golang.org/x/sys v0.0.0-20220704084225-05e143d24a9e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/text v0.8.0 h1:57P1ETyNKtuIjB4SRd15iJxuhj8Gc416Y78H3qgMh68=
golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/genproto v0.0.0-20230403163135-c38d8f061ccd h1:sLpv7bNL1AsX3fdnWh9WVh7ejIzXdOc1RRHGeAmeStU=
google.golang.org/genproto v0.0.0-20230403163135-c38d8f061ccd/go.mod h1:UUQDJDOlWu4KYeJZffbWgBkS1YFobzKbLVfK69pe0Ak=
google.golang.org/grpc v1.54.0 h1:EhTqbhiYeixwWQtAEZAxmV9MGqcjEU2mFx52xCzNyag=
google.golang.org/grpc v1.54.0/go.mod h1:PUSEXI6iWghWaB6lXM4knEgpJNu2qUcKfDtNci3EC2g=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.30.0 h1:kPPoIgf3TsEvrm0PFe15JQ+570QVxYzEvvHqChK+cng=
google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=

+ 73
- 0
models/models.go View File

@@ -0,0 +1,73 @@
package models

/// TODO 将分散在各处的公共结构体定义集中到这里来

const (
RedundancyRep = "rep"
RedundancyEC = "ec"
)

type RedundancyConfigTypes interface{}
type RedundancyConfigTypesConst interface {
RepRedundancyConfig | ECRedundancyConfig
}
type RepRedundancyConfig struct {
RepCount int `json:"repCount"`
}

type ECRedundancyConfig struct {
}

type RedundancyDataTypes interface{}
type RedundancyDataTypesConst interface {
RepRedundancyData | ECRedundancyData
}
type RepRedundancyData struct {
FileHash string `json:"fileHash"`
}

func NewRedundancyRepData(fileHash string) RepRedundancyData {
return RepRedundancyData{
FileHash: fileHash,
}
}

type ECRedundancyData struct {
Ec EC `json:"ec"`
Blocks []ObjectBlock `json:"blocks"`
}

func NewRedundancyEcData(ec EC, blocks []ObjectBlock) ECRedundancyData {
return ECRedundancyData{
Ec: ec,
Blocks: blocks,
}
}

type EC struct {
ID int `json:"id"`
Name string `json:"name"`
EcK int `json:"ecK"`
EcN int `json:"ecN"`
}

type ObjectBlock struct {
Index int `json:"index"`
FileHash string `json:"fileHash"`
}

func NewObjectBlock(index int, fileHash string) ObjectBlock {
return ObjectBlock{
Index: index,
FileHash: fileHash,
}
}

func NewEc(id int, name string, ecK int, ecN int) EC{
return EC{
ID: id,
Name: name,
EcK: ecK,
EcN: ecN,
}
}

+ 120
- 0
pkgs/db/bucket.go View File

@@ -0,0 +1,120 @@
package db

import (
"database/sql"
"errors"
"fmt"

"github.com/jmoiron/sqlx"
"gitlink.org.cn/cloudream/storage-common/pkgs/db/model"
)

type BucketDB struct {
*DB
}

func (db *DB) Bucket() *BucketDB {
return &BucketDB{DB: db}
}

// GetIDByName 根据BucketName查询BucketID
func (db *BucketDB) GetIDByName(bucketName string) (int64, error) {
//桶结构体
var result struct {
BucketID int64 `db:"BucketID"`
BucketName string `db:"BucketName"`
}

sql := "select BucketID, BucketName from Bucket where BucketName=? "
if err := db.d.Get(&result, sql, bucketName); err != nil {
return 0, err
}

return result.BucketID, nil
}

// IsAvailable 判断用户是否有指定Bucekt的权限
func (db *BucketDB) IsAvailable(ctx SQLContext, bucketID int64, userID int64) (bool, error) {
_, err := db.GetUserBucket(ctx, userID, bucketID)
if errors.Is(err, sql.ErrNoRows) {
return false, nil
}

if err != nil {
return false, fmt.Errorf("find bucket failed, err: %w", err)
}

return true, nil
}

func (*BucketDB) GetUserBucket(ctx SQLContext, userID int64, bucketID int64) (model.Bucket, error) {
var ret model.Bucket
err := sqlx.Get(ctx, &ret,
"select Bucket.* from UserBucket, Bucket where UserID = ? and "+
"UserBucket.BucketID = Bucket.BucketID and "+
"Bucket.BucketID = ?", userID, bucketID)
return ret, err
}

func (*BucketDB) GetUserBuckets(ctx SQLContext, userID int64) ([]model.Bucket, error) {
var ret []model.Bucket
err := sqlx.Select(ctx, &ret, "select Bucket.* from UserBucket, Bucket where UserID = ? and UserBucket.BucketID = Bucket.BucketID", userID)
return ret, err
}

func (db *BucketDB) Create(ctx SQLContext, userID int64, bucketName string) (int64, error) {
var bucketID int64
err := sqlx.Get(ctx, &bucketID, "select Bucket.BucketID from UserBucket, Bucket where UserBucket.UserID = ? and UserBucket.BucketID = Bucket.BucketID and Bucket.Name = ?", userID, bucketName)
if err == nil {
return 0, fmt.Errorf("bucket name exsits")
}

if err != sql.ErrNoRows {
return 0, err
}

ret, err := ctx.Exec("insert into Bucket(Name,CreatorID) values(?,?)", bucketName, userID)
if err != nil {
return 0, fmt.Errorf("insert bucket failed, err: %w", err)
}

bucketID, err = ret.LastInsertId()
if err != nil {
return 0, fmt.Errorf("get inserted bucket id failed, err: %w", err)
}

_, err = ctx.Exec("insert into UserBucket(UserID,BucketID) values(?,?)", userID, bucketID)
if err != nil {
return 0, fmt.Errorf("insert into user bucket failed, err: %w", err)
}

return bucketID, err
}

func (db *BucketDB) Delete(ctx SQLContext, bucketID int64) error {
_, err := ctx.Exec("delete from UserBucket where BucketID = ?", bucketID)
if err != nil {
return fmt.Errorf("delete user bucket failed, err: %w", err)
}

_, err = ctx.Exec("delete from Bucket where BucketID = ?", bucketID)
if err != nil {
return fmt.Errorf("delete bucket failed, err: %w", err)
}

// 删除Bucket内的Object
var objIDs []int64
err = sqlx.Select(ctx, &objIDs, "select ObjectID from Object where BucketID = ?", bucketID)
if err != nil {
return fmt.Errorf("query object failed, err: %w", err)
}

for _, objID := range objIDs {
// TODO 不一定所有的错误都要中断后续过程
err = db.Object().SoftDelete(ctx, objID)
if err != nil {
return fmt.Errorf("set object seleted failed, err: %w", err)
}
}
return nil
}

+ 97
- 0
pkgs/db/cache.go View File

@@ -0,0 +1,97 @@
package db

import (
"time"

"github.com/jmoiron/sqlx"
"gitlink.org.cn/cloudream/common/consts"
"gitlink.org.cn/cloudream/storage-common/pkgs/db/model"
)

type CacheDB struct {
*DB
}

func (db *DB) Cache() *CacheDB {
return &CacheDB{DB: db}
}

func (*CacheDB) Get(ctx SQLContext, fileHash string, nodeID int64) (model.Cache, error) {
var ret model.Cache
err := sqlx.Get(ctx, &ret, "select * from Cache where FileHash = ? and NodeID = ?", fileHash, nodeID)
return ret, err
}

func (*CacheDB) BatchGetAllFileHashes(ctx SQLContext, start int, count int) ([]string, error) {
var ret []string
err := sqlx.Select(ctx, &ret, "select distinct FileHash from Cache limit ?, ?", start, count)
return ret, err
}

func (*CacheDB) GetNodeCaches(ctx SQLContext, nodeID int64) ([]model.Cache, error) {
var ret []model.Cache
err := sqlx.Select(ctx, &ret, "select * from Cache where NodeID = ?", nodeID)
return ret, err
}

// CreateNew 创建一条新的缓存记录
func (*CacheDB) CreateNew(ctx SQLContext, fileHash string, nodeID int64) error {
_, err := ctx.Exec("insert into Cache values(?,?,?,?)", fileHash, nodeID, consts.CacheStatePinned, time.Now())
if err != nil {
return err
}

return nil
}

// CreatePinned 创建一条缓存记录,如果已存在,但不是pinned状态,则将其设置为pin状态
func (*CacheDB) CreatePinned(ctx SQLContext, fileHash string, nodeID int64, priority int) error {
_, err := ctx.Exec("replace into Cache values(?,?,?,?,?)", fileHash, nodeID, consts.CacheStatePinned, time.Now(), priority)
return err
}

// Create 创建一条Temp状态的缓存记录,如果已存在则不产生效果
func (*CacheDB) CreateTemp(ctx SQLContext, fileHash string, nodeID int64) error {
_, err := ctx.Exec("insert ignore into Cache values(?,?,?,?)", fileHash, nodeID, consts.CacheStateTemp, time.Now())
return err
}

// GetCachingFileNodes 查找缓存了指定文件的节点
func (*CacheDB) GetCachingFileNodes(ctx SQLContext, fileHash string) ([]model.Node, error) {
var x []model.Node
err := sqlx.Select(ctx, &x,
"select Node.* from Cache, Node where Cache.FileHash=? and Cache.NodeID = Node.NodeID", fileHash)
return x, err
}

// DeleteTemp 删除一条Temp状态的记录
func (*CacheDB) DeleteTemp(ctx SQLContext, fileHash string, nodeID int64) error {
_, err := ctx.Exec("delete from Cache where FileHash = ? and NodeID = ? and State = ?", fileHash, nodeID, consts.CacheStateTemp)
return err
}

// DeleteNodeAll 删除一个节点所有的记录
func (*CacheDB) DeleteNodeAll(ctx SQLContext, nodeID int64) error {
_, err := ctx.Exec("delete from Cache where NodeID = ?", nodeID)
return err
}

// FindCachingFileUserNodes 在缓存表中查询指定数据所在的节点
func (*CacheDB) FindCachingFileUserNodes(ctx SQLContext, userID int64, fileHash string) ([]model.Node, error) {
var x []model.Node
err := sqlx.Select(ctx, &x,
"select Node.* from Cache, UserNode, Node where "+
"Cache.FileHash=? and Cache.NodeID = UserNode.NodeID and "+
"UserNode.UserID = ? and UserNode.NodeID = Node.NodeID", fileHash, userID)
return x, err
}

func (*CacheDB) SetTemp(ctx SQLContext, fileHash string, nodeID int64) error {
_, err := ctx.Exec("update Cache set State = ?, CacheTime = ? where FileHash = ? and NodeID = ?",
consts.CacheStateTemp,
time.Now(),
fileHash,
nodeID,
)
return err
}

+ 21
- 0
pkgs/db/config/config.go View File

@@ -0,0 +1,21 @@
package config

import "fmt"

type Config struct {
Address string `json:"address"`
Account string `json:"account"`
Password string `json:"password"`
DatabaseName string `json:"databaseName"`
}

func (cfg *Config) MakeSourceString() string {
return fmt.Sprintf(
"%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=true&loc=%s",
cfg.Account,
cfg.Password,
cfg.Address,
cfg.DatabaseName,
"Asia%2FShanghai",
)
}

+ 238
- 0
pkgs/db/db.go View File

@@ -0,0 +1,238 @@
package db

import (
"context"
"database/sql"
"errors"
"fmt"
"time"

_ "github.com/go-sql-driver/mysql"
"github.com/jmoiron/sqlx"
"gitlink.org.cn/cloudream/common/consts"
"gitlink.org.cn/cloudream/storage-common/pkgs/db/config"
"gitlink.org.cn/cloudream/storage-common/pkgs/db/model"
)

type DB struct {
d *sqlx.DB
}

type SQLContext interface {
sqlx.Queryer
sqlx.Execer
}

func NewDB(cfg *config.Config) (*DB, error) {
db, err := sqlx.Open("mysql", cfg.MakeSourceString())
if err != nil {
return nil, fmt.Errorf("open database connection failed, err: %w", err)
}

// 尝试连接一下数据库,如果数据库配置有错误在这里就能报出来
err = db.Ping()
if err != nil {
return nil, err
}

return &DB{
d: db,
}, nil
}

func (db *DB) DoTx(isolation sql.IsolationLevel, fn func(tx *sqlx.Tx) error) error {
tx, err := db.d.BeginTxx(context.Background(), &sql.TxOptions{Isolation: isolation})
if err != nil {
return err
}

if err := fn(tx); err != nil {
tx.Rollback()
return err
}

if err := tx.Commit(); err != nil {
tx.Rollback()
return err
}

return nil
}

func (db *DB) SQLCtx() SQLContext {
return db.d
}

// 纠删码对象表插入
// TODO 需要使用事务保证查询之后插入的正确性
func (db *DB) InsertECObject(objectName string, bucketID int, fileSize int64, ecName string) (int64, error) {
// TODO 参考CreateRepObject重写
// 根据objectname和bucketid查询,若不存在则插入,若存在则不操作
//查询
/*type Object struct {
ObjectID int64 `db:"ObjectID"`
Name string `db:"Name"`
BucketID int `db:"BucketID"`
}
var x Object
err := db.d.Get(&x, "select ObjectID, Name, BucketID from Object where Name=? AND BucketID=?", objectName, bucketID)
//不存在才插入
if errors.Is(err, sql.ErrNoRows) {
sql := "insert into Object(Name, BucketID, FileSize, Redundancy, NumRep, EcName) values(?,?,?,?,?,?)"
r, err := db.d.Exec(sql, objectName, bucketID, fileSize, false, "-1", ecName)
if err != nil {
return 0, err
}

id, err := r.LastInsertId()
if err != nil {
return 0, err
}

// TODO 需要考虑失败后的处理

return id, nil

} else if err == nil {
return x.ObjectID, nil
}
return 0, err*/
panic("not implement yet")
}

// QueryObjectBlock 查询对象编码块表
func (db *DB) QueryObjectBlock(objectID int64) ([]model.ObjectBlock, error) {
var x []model.ObjectBlock
sql := "select * from ObjectBlock where ObjectID=?"
err := db.d.Select(&x, sql, objectID)
return x, err
}

// 对象编码块表Echash插入
func (db *DB) InsertECHash(objectID int64, hashes []string) {
for i := 0; i < len(hashes); i++ {
sql := "update ObjectBlock set BlockHash =? where ObjectID = ? AND InnerID = ?"
// TODO 需要处理错误
db.d.Exec(sql, hashes[i], objectID, i)
}
}

// 对象编码块表插入
func (db *DB) InsertEcObjectBlock(objectID int64, innerID int) error {
// 根据objectID查询,若不存在则插入,若存在则不操作
_, err := db.d.Exec(
"insert into ObjectBlock(ObjectID, InnerID) select ?, ? where not exists (select ObjectID from ObjectBlock where ObjectID=? AND InnerID=?)",
objectID,
innerID,
objectID,
innerID,
)
return err
}

// BatchInsertOrUpdateCache 批量更新缓存表
func (db *DB) BatchInsertOrUpdateCache(blockHashes []string, nodeID int64) error {
//jh:将hashs中的hash,IP插入缓存表中,TempOrPin字段为true,Time为插入时的时间戳
//-如果要插入的hash、IP在表中已存在且所对应的TempOrPin字段为false,则不做任何操作
//-如果要插入的hash、IP在表中已存在且所对应的TempOrPin字段为true,则更新Time
tx, err := db.d.BeginTxx(context.Background(), &sql.TxOptions{
Isolation: sql.LevelSerializable,
})
if err != nil {
return fmt.Errorf("start transaction failed, err: %w", err)
}

for _, blockHash := range blockHashes {
//根据hash和nodeip查询缓存表里是否存在此条记录
var cache model.Cache
err := tx.Get(
&cache,
"select NodeID, TempOrPin, Cachetime from Cache where FileHash=? AND NodeID=?",
blockHash,
nodeID,
)

// 不存在记录则创建新记录
if errors.Is(err, sql.ErrNoRows) {
_, err := tx.Exec("insert into Cache values(?,?,?,?)", blockHash, nodeID, true, time.Now())
if err != nil {
tx.Rollback()
return fmt.Errorf("insert cache failed, err: %w", err)
}

} else if err == nil && cache.State == consts.CacheStateTemp {
//若在表中已存在且所对应的TempOrPin字段为true,则更新Time
_, err := tx.Exec(
"update Cache set Cachetime=? where FileHash=? AND NodeID=?",
time.Now(),
blockHash,
nodeID,
)
if err != nil {
tx.Rollback()
return fmt.Errorf("update cache failed, err: %w", err)
}
}
}

err = tx.Commit()
if err != nil {
tx.Rollback()
return fmt.Errorf("commit transaction failed, err: %w", err)
}

return nil
}

// 查询节点延迟表
func (db *DB) QueryNodeDelay(inNodeIP string, outNodeIP string) (int, error) {
//节点延迟结构体
var x struct {
DelayInMs int `db:"DelayInMs"`
}
sql := "select DelayInMs from NodeDelay where InNodeIP=? AND OutNodeIP=?"
err := db.d.Get(&x, sql, inNodeIP, outNodeIP)
return x.DelayInMs, err
}

// 节点延迟表插入
// TODO 需要使用事务确保插入的记录完整
func (db *DB) InsertNodeDelay(srcNodeID int64, dstNodeIDs []int64, delay []int) {
insSql := "insert into NodeDelay values(?,?,?)"
updateSql := "UPDATE NodeDelay SET DelayInMs=? WHERE SourceNodeID=? AND DestinationNodeID=?"
for i := 0; i < len(dstNodeIDs); i++ {
_, err := db.d.Exec(insSql, srcNodeID, dstNodeIDs[i], delay[i])
if err != nil {
// TODO 处理错误
db.d.Exec(updateSql, delay[i], srcNodeID, dstNodeIDs[i])
}
}
}

// 节点表插入
// TODO 需要使用事务保证查询之后插入的正确性
func (db *DB) InsertNode(nodeip string, nodelocation string, ipfsstatus string, localdirstatus string) error {
// 根据NodeIP查询,若不存在则插入,若存在则更新
//查询
type Node struct {
NodeIP string `db:"NodeIP"`
}
var x Node
err := db.d.Get(&x, "select NodeIP from Node where NodeIP=?", nodeip)
//local和ipfs同时可达才可达
// TODO 将status字段改成字符串(枚举值)
NodeStatus := ipfsstatus == consts.IPFSStateOK && localdirstatus == consts.StorageDirectoryStateOK
//不存在才插入
if errors.Is(err, sql.ErrNoRows) {
sql := "insert into Node values(?,?,?)"
_, err := db.d.Exec(sql, nodeip, nodelocation, NodeStatus)

return err
}

//存在则更新
sql := "update Node set NodeStatus=? where NodeIP=?"
_, err = db.d.Exec(sql, NodeStatus, nodeip)

return err
}

+ 30
- 0
pkgs/db/ec.go View File

@@ -0,0 +1,30 @@
package db

import (
//"database/sql"

"github.com/jmoiron/sqlx"
//"gitlink.org.cn/cloudream/common/consts"
"gitlink.org.cn/cloudream/storage-common/pkgs/db/model"
)

type EcDB struct {
*DB
}

func (db *DB) Ec() *EcDB {
return &EcDB{DB: db}
}

// GetEc 查询纠删码参数
func (db *EcDB) GetEc(ctx SQLContext, ecName string) (model.Ec, error) {
var ret model.Ec
err := sqlx.Get(ctx, &ret, "select * from Ec where Name = ?", ecName)
return ret, err
}

func (db *EcDB) GetEcName(ctx SQLContext, objectID int) (string, error) {
var ret string
err := sqlx.Get(ctx, &ret, "select Redundancy from Object where ObjectID = ?")
return ret, err
}

+ 103
- 0
pkgs/db/model/model.go View File

@@ -0,0 +1,103 @@
package model

import "time"

type Node struct {
NodeID int64 `db:"NodeID" json:"nodeID"`
Name string `db:"Name" json:"name"`
LocalIP string `db:"LocalIP" json:"localIP"`
ExternalIP string `db:"ExternalIP" json:"externalIP"`
LocationID int64 `db:"LocationID" json:"locationID"`
State string `db:"State" json:"state"`
LastReportTime *time.Time `db:"LastReportTime" json:"lastReportTime"`
}

type Storage struct {
StorageID int64 `db:"StorageID" json:"storageID"`
Name string `db:"Name" json:"name"`
NodeID int64 `db:"NodeID" json:"nodeID"`
Directory string `db:"Directory" json:"directory"`
State string `db:"State" json:"state"`
}

type NodeDelay struct {
SourceNodeID int64 `db:"SourceNodeID"`
DestinationNodeID int64 `db:"DestinationNodeID"`
DelayInMs int `db:"DelayInMs"`
}

type User struct {
UserID int64 `db:"UserID" json:"userID"`
Password string `db:"PassWord" json:"password"`
}

type UserBucket struct {
UserID int64 `db:"UserID" json:"userID"`
BucketID int64 `db:"BucketID" json:"bucketID"`
}

type UserNode struct {
UserID int64 `db:"UserID" json:"userID"`
NodeID int64 `db:"NodeID" json:"nodeID"`
}

type UserStorage struct {
UserID int64 `db:"UserID" json:"userID"`
StorageID int64 `db:"StorageID" json:"storageID"`
}

type Bucket struct {
BucketID int64 `db:"BucketID" json:"bucketID"`
Name string `db:"Name" json:"name"`
CreatorID int64 `db:"CreatorID" json:"creatorID"`
}

type Object struct {
ObjectID int64 `db:"ObjectID" json:"objectID"`
Name string `db:"Name" json:"name"`
BucketID int64 `db:"BucketID" json:"bucketID"`
State string `db:"State" json:"state"`
FileSize int64 `db:"FileSize" json:"fileSize,string"`
Redundancy string `db:"Redundancy" json:"redundancy"`
DirName string `db:"DirName" json:"dirName"`
}

type ObjectRep struct {
ObjectID int64 `db:"ObjectID" json:"objectID"`
RepCount int `db:"RepCount" json:"repCount"`
FileHash string `db:"FileHash" json:"fileHash"`
}

type ObjectBlock struct {
BlockID int64 `db:"BlockID" json:"blockID"`
ObjectID int64 `db:"ObjectID" json:"objectID"`
InnerID int `db:"InnerID" json:"innerID"`
BlockHash string `db:"BlockHash" json:"blockHash"`
}

type Cache struct {
FileHash string `db:"FileHash" json:"fileHash"`
NodeID int64 `db:"NodeID" json:"nodeID"`
State string `db:"State" json:"state"`
CacheTime time.Time `db:"CacheTime" json:"cacheTime"`
Priority int `db:"Priority" json:"priority"`
}

type StorageObject struct {
ObjectID int64 `db:"ObjectID" json:"objectID"`
StorageID int64 `db:"StorageID" json:"storageID"`
UserID int64 `db:"UserID" json:"userID"`
State string `db:"State" json:"state"`
}

type Location struct {
LocationID int64 `db:"LocationID" json:"locationID"`
Name string `db:"Name" json:"name"`
}

type Ec struct {
EcID int `db:"EcID" json:"ecID"`
Name string `db:"Name" json:"name"`
EcK int `db:"EcK" json:"ecK"`
EcN int `db:"EcN" json:"ecN"`
}

+ 48
- 0
pkgs/db/node.go View File

@@ -0,0 +1,48 @@
package db

import (
"time"

"github.com/jmoiron/sqlx"
"gitlink.org.cn/cloudream/storage-common/pkgs/db/model"
)

type NodeDB struct {
*DB
}

func (db *DB) Node() *NodeDB {
return &NodeDB{DB: db}
}

func (db *NodeDB) GetByID(ctx SQLContext, nodeID int64) (model.Node, error) {
var ret model.Node
err := sqlx.Get(ctx, &ret, "select * from Node where NodeID = ?", nodeID)
return ret, err
}

func (db *NodeDB) GetAllNodes(ctx SQLContext) ([]model.Node, error) {
var ret []model.Node
err := sqlx.Select(ctx, &ret, "select * from Node")
return ret, err
}

// GetByExternalIP 根据外网IP查找节点
func (db *NodeDB) GetByExternalIP(ctx SQLContext, exterIP string) (model.Node, error) {
var ret model.Node
err := sqlx.Get(ctx, &ret, "select * from Node where ExternalIP = ?", exterIP)
return ret, err
}

// GetUserNodes 根据用户id查询可用node
func (db *NodeDB) GetUserNodes(ctx SQLContext, userID int64) ([]model.Node, error) {
var nodes []model.Node
err := sqlx.Select(ctx, &nodes, "select Node.* from UserNode, Node where UserNode.NodeID = Node.NodeID and UserNode.UserID=?", userID)
return nodes, err
}

// UpdateState 更新状态,并且设置上次上报时间为现在
func (db *NodeDB) UpdateState(ctx SQLContext, nodeID int64, state string) error {
_, err := ctx.Exec("update Node set State = ?, LastReportTime = ? where NodeID = ?", state, time.Now(), nodeID)
return err
}

+ 316
- 0
pkgs/db/object.go View File

@@ -0,0 +1,316 @@
package db

import (
"database/sql"
"errors"
"fmt"

"github.com/jmoiron/sqlx"
"github.com/samber/lo"
"gitlink.org.cn/cloudream/common/consts"
"gitlink.org.cn/cloudream/common/models"
"gitlink.org.cn/cloudream/storage-common/pkgs/db/model"
)

type ObjectDB struct {
*DB
}

func (db *DB) Object() *ObjectDB {
return &ObjectDB{DB: db}
}

func (db *ObjectDB) GetByID(ctx SQLContext, objectID int64) (model.Object, error) {
var ret model.Object
err := sqlx.Get(ctx, &ret, "select * from Object where ObjectID = ?", objectID)
return ret, err
}

func (db *ObjectDB) GetByName(ctx SQLContext, bucketID int64, name string) (model.Object, error) {
var ret model.Object
err := sqlx.Get(ctx, &ret, "select * from Object where BucketID = ? and Name = ?", bucketID, name)
return ret, err
}

func (db *ObjectDB) GetBucketObjects(ctx SQLContext, userID int64, bucketID int64) ([]model.Object, error) {
var ret []model.Object
err := sqlx.Select(ctx, &ret, "select Object.* from UserBucket, Object where UserID = ? and UserBucket.BucketID = ? and UserBucket.BucketID = Object.BucketID", userID, bucketID)
return ret, err
}

func (db *ObjectDB) GetByDirName(ctx SQLContext, dirName string) ([]model.Object, error) {
var ret []model.Object
err := sqlx.Select(ctx, &ret, "select * from Object where DirName = ? ", dirName)
return ret, err
}

// IsAvailable 判断一个用户是否拥有指定对象
func (db *ObjectDB) IsAvailable(ctx SQLContext, userID int64, objectID int64) (bool, error) {
var objID int64
// 先根据ObjectID找到Object,然后判断此Object所在的Bucket是不是归此用户所有
err := sqlx.Get(ctx, &objID,
"select Object.ObjectID from Object, UserBucket where "+
"Object.ObjectID = ? and "+
"Object.BucketID = UserBucket.BucketID and "+
"UserBucket.UserID = ?",
objectID, userID)

if err == sql.ErrNoRows {
return false, nil
}

if err != nil {
return false, fmt.Errorf("find object failed, err: %w", err)
}

return true, nil
}

// GetUserObject 获得Object,如果用户没有权限访问,则不会获得结果
func (db *ObjectDB) GetUserObject(ctx SQLContext, userID int64, objectID int64) (model.Object, error) {
var ret model.Object
err := sqlx.Get(ctx, &ret,
"select Object.* from Object, UserBucket where "+
"Object.ObjectID = ? and "+
"Object.BucketID = UserBucket.BucketID and "+
"UserBucket.UserID = ?",
objectID, userID)
return ret, err
}

// CreateRepObject 创建多副本对象相关的记录
func (db *ObjectDB) CreateRepObject(ctx SQLContext, bucketID int64, objectName string, fileSize int64, repCount int, nodeIDs []int64, fileHash string, dirName string) (int64, error) {
// 根据objectname和bucketid查询,若不存在则插入,若存在则返回错误
var objectID int64
err := sqlx.Get(ctx, &objectID, "select ObjectID from Object where Name = ? AND BucketID = ?", objectName, bucketID)
// 无错误代表存在记录
if err == nil {
return 0, fmt.Errorf("object with given Name and BucketID already exists")
}
// 错误不是记录不存在
if err != nil && !errors.Is(err, sql.ErrNoRows) {
return 0, fmt.Errorf("query Object by ObjectName and BucketID failed, err: %w", err)
}

// 创建对象的记录
sql := "insert into Object(Name, BucketID, State, FileSize, Redundancy, DirName) values(?,?,?,?,?,?)"
r, err := ctx.Exec(sql, objectName, bucketID, consts.ObjectStateNormal, fileSize, models.RedundancyRep, dirName)
if err != nil {
return 0, fmt.Errorf("insert object failed, err: %w", err)
}

objectID, err = r.LastInsertId()
if err != nil {
return 0, fmt.Errorf("get id of inserted object failed, err: %w", err)
}

// 创建对象副本的记录
_, err = ctx.Exec("insert into ObjectRep(ObjectID, RepCount, FileHash) values(?,?,?)", objectID, repCount, fileHash)
if err != nil {
return 0, fmt.Errorf("insert object rep failed, err: %w", err)
}

// 创建缓存记录
priority := 0 //优先级暂时设置为0
for _, nodeID := range nodeIDs {
err = db.Cache().CreatePinned(ctx, fileHash, nodeID, priority)
if err != nil {
return 0, fmt.Errorf("create cache failed, err: %w", err)
}
}

return objectID, nil
}

func (db *ObjectDB) CreateEcObject(ctx SQLContext, bucketID int64, objectName string, fileSize int64, userID int64, nodeIDs []int64, hashs []string, ecName string, dirName string) (int64, error) {
// 根据objectname和bucketid查询,若不存在则插入,若存在则返回错误
var objectID int64
err := sqlx.Get(ctx, &objectID, "select ObjectID from Object where Name = ? AND BucketID = ?", objectName, bucketID)
// 无错误代表存在记录
if err == nil {
return 0, fmt.Errorf("object with given Name and BucketID already exists")
}
// 错误不是记录不存在
if err != nil && !errors.Is(err, sql.ErrNoRows) {
return 0, fmt.Errorf("query Object by ObjectName and BucketID failed, err: %w", err)
}

// 创建对象的记录
sql := "insert into Object(Name, BucketID, State, FileSize, Redundancy, DirName) values(?,?,?,?,?,?)"
r, err := ctx.Exec(sql, objectName, bucketID, consts.ObjectStateNormal, fileSize, ecName, dirName)
if err != nil {
return 0, fmt.Errorf("insert Ec object failed, err: %w", err)
}

objectID, err = r.LastInsertId()
if err != nil {
return 0, fmt.Errorf("get id of inserted object failed, err: %w", err)
}

// 创建编码块的记录
for i := 0; i < len(hashs); i++ {
_, err = ctx.Exec("insert into ObjectBlock(ObjectID, InnerID, BlockHash) values(?,?,?)", objectID, i, hashs[i])
if err != nil {
return 0, fmt.Errorf("insert object rep failed, err: %w", err)
}
}
// 创建缓存记录
priority := 0 //优先级暂时设置为0
i := 0
for _, nodeID := range nodeIDs {
err = db.Cache().CreatePinned(ctx, hashs[i], nodeID, priority)
i += 1
if err != nil {
return 0, fmt.Errorf("create cache failed, err: %w", err)
}
}

return objectID, nil
}

func (db *ObjectDB) UpdateRepObject(ctx SQLContext, objectID int64, fileSize int64, nodeIDs []int64, fileHash string) error {
obj, err := db.GetByID(ctx, objectID)
if err != nil {
return fmt.Errorf("get object failed, err: %w", err)
}
if obj.Redundancy != models.RedundancyRep {
return fmt.Errorf("object is not a rep object")
}

_, err = db.UpdateFileInfo(ctx, objectID, fileSize)
if err != nil {
if err != nil {
return fmt.Errorf("update rep object failed, err: %w", err)
}
}

objRep, err := db.ObjectRep().GetByID(ctx, objectID)
if err != nil {
return fmt.Errorf("get object rep failed, err: %w", err)
}

// 如果新文件与旧文件的Hash不同,则需要更新关联的FileHash,设置Storage中的文件已过期,重新插入Cache记录
if objRep.FileHash != fileHash {
_, err := db.ObjectRep().UpdateFileHash(ctx, objectID, fileHash)
if err != nil {
return fmt.Errorf("update rep object file hash failed, err: %w", err)
}

_, err = db.StorageObject().SetAllObjectOutdated(ctx, objectID)
if err != nil {
return fmt.Errorf("set storage object outdated failed, err: %w", err)
}

for _, nodeID := range nodeIDs {
err := db.Cache().CreatePinned(ctx, fileHash, nodeID, 0) //priority = 0
if err != nil {
return fmt.Errorf("create cache failed, err: %w", err)
}
}

} else {
// 如果相同,则只增加Cache中不存在的记录

cachedNodes, err := db.Cache().GetCachingFileNodes(ctx, fileHash)
if err != nil {
return fmt.Errorf("find caching file nodes failed, err: %w", err)
}

// 筛选出不在cachedNodes中的id
newNodeIDs := lo.Filter(nodeIDs, func(id int64, index int) bool {
return lo.NoneBy(cachedNodes, func(node model.Node) bool {
return node.NodeID == id
})
})
for _, nodeID := range newNodeIDs {
err := db.Cache().CreatePinned(ctx, fileHash, nodeID, 0) //priority
if err != nil {
return fmt.Errorf("create cache failed, err: %w", err)
}
}
}

return nil
}

// SoftDelete 设置一个对象被删除,并将相关数据删除
func (db *ObjectDB) SoftDelete(ctx SQLContext, objectID int64) error {
obj, err := db.GetByID(ctx, objectID)
if err != nil {
return fmt.Errorf("get object failed, err: %w", err)
}

// 不是正常状态的Object,则不删除
// TODO 未来可能有其他状态
if obj.State != consts.ObjectStateNormal {
return nil
}

err = db.ChangeState(ctx, objectID, consts.ObjectStateDeleted)
if err != nil {
return fmt.Errorf("change object state failed, err: %w", err)
}

if obj.Redundancy == models.RedundancyRep {
err = db.ObjectRep().Delete(ctx, objectID)
if err != nil {
return fmt.Errorf("delete from object rep failed, err: %w", err)
}
} else {
err = db.ObjectBlock().Delete(ctx, objectID)
if err != nil {
return fmt.Errorf("delete from object rep failed, err: %w", err)
}
}

_, err = db.StorageObject().SetAllObjectDeleted(ctx, objectID)
if err != nil {
return fmt.Errorf("set storage object deleted failed, err: %w", err)
}

return nil
}

// DeleteUnused 删除一个已经是Deleted状态,且不再被使用的对象。目前可能被使用的地方只有StorageObject
func (ObjectDB) DeleteUnused(ctx SQLContext, objectID int64) error {
_, err := ctx.Exec("delete from Object where ObjectID = ? and State = ? and "+
"not exists(select StorageID from StorageObject where ObjectID = ?)",
objectID,
consts.ObjectStateDeleted,
objectID,
)

return err
}

func (*ObjectDB) BatchGetAllObjectIDs(ctx SQLContext, start int, count int) ([]int64, error) {
var ret []int64
err := sqlx.Select(ctx, &ret, "select ObjectID from Object limit ?, ?", start, count)
return ret, err
}

func (*ObjectDB) BatchGetAllEcObjectIDs(ctx SQLContext, start int, count int) ([]int64, error) {
var ret []int64
rep := "rep"
err := sqlx.Select(ctx, &ret, "SELECT ObjectID FROM object where Redundancy != ? limit ?, ?", rep, start, count)
return ret, err
}

func (*ObjectDB) ChangeState(ctx SQLContext, objectID int64, state string) error {
_, err := ctx.Exec("update Object set State = ? where ObjectID = ?", state, objectID)
return err
}

func (*ObjectDB) UpdateFileInfo(ctx SQLContext, objectID int64, fileSize int64) (bool, error) {
ret, err := ctx.Exec("update Object set FileSize = ? where ObjectID = ?", fileSize, objectID)
if err != nil {
return false, err
}

cnt, err := ret.RowsAffected()
if err != nil {
return false, fmt.Errorf("get affected rows failed, err: %w", err)
}

return cnt > 0, nil
}

+ 72
- 0
pkgs/db/object_block.go View File

@@ -0,0 +1,72 @@
package db

import (
"database/sql"

"github.com/jmoiron/sqlx"
"gitlink.org.cn/cloudream/common/consts"
"gitlink.org.cn/cloudream/storage-common/pkgs/db/model"
)

type ObjectBlockDB struct {
*DB
}

func (db *DB) ObjectBlock() *ObjectBlockDB {
return &ObjectBlockDB{DB: db}
}

func (db *ObjectBlockDB) Delete(ctx SQLContext, objectID int64) error {
_, err := ctx.Exec("delete from ObjectBlock where ObjectID = ?", objectID)
return err
}

func (db *ObjectBlockDB) CountBlockWithHash(ctx SQLContext, fileHash string) (int, error) {
var cnt int
err := sqlx.Get(ctx, &cnt,
"select count(BlockHash) from ObjectBlock, Object where BlockHash = ? and "+
"ObjectBlock.ObjectID = Object.ObjectID and "+
"Object.State = ?", fileHash, consts.ObjectStateNormal)
if err == sql.ErrNoRows {
return 0, nil
}

return cnt, err
}

func (db *ObjectBlockDB) GetBatchObjectBlocks(ctx SQLContext, objectIDs []int64) ([][]string, error) {
blocks := make([][]string, len(objectIDs))
var err error
for i, objectID := range objectIDs {
var x []model.ObjectBlock
sql := "select * from ObjectBlock where ObjectID=?"
err = db.d.Select(&x, sql, objectID)
xx := make([]string, len(x))
for ii := 0; ii < len(x); ii++ {
xx[x[ii].InnerID] = x[ii].BlockHash
}
blocks[i] = xx
}
return blocks, err
}

func (db *ObjectBlockDB) GetBatchBlocksNodes(ctx SQLContext, hashs [][]string) ([][][]int64, error) {
nodes := make([][][]int64, len(hashs))
var err error
for i, hs := range hashs {
fileNodes := make([][]int64, len(hs))
for j, h := range hs {
var x []model.Node
err = sqlx.Select(ctx, &x,
"select Node.* from Cache, Node where "+
"Cache.FileHash=? and Cache.NodeID = Node.NodeID and Cache.State=?", h, consts.CacheStatePinned)
xx := make([]int64, len(x))
for ii := 0; ii < len(x); ii++ {
xx[ii] = x[ii].NodeID
}
fileNodes[j] = xx
}
nodes[i] = fileNodes
}
return nodes, err
}

+ 66
- 0
pkgs/db/object_rep.go View File

@@ -0,0 +1,66 @@
package db

import (
"database/sql"
"fmt"

"github.com/jmoiron/sqlx"
"gitlink.org.cn/cloudream/common/consts"
"gitlink.org.cn/cloudream/storage-common/pkgs/db/model"
)

type ObjectRepDB struct {
*DB
}

func (db *DB) ObjectRep() *ObjectRepDB {
return &ObjectRepDB{DB: db}
}

// GetObjectRep 查询对象副本表
func (db *ObjectRepDB) GetByID(ctx SQLContext, objectID int64) (model.ObjectRep, error) {
var ret model.ObjectRep
err := sqlx.Get(ctx, &ret, "select * from ObjectRep where ObjectID = ?", objectID)
return ret, err
}

func (db *ObjectRepDB) UpdateFileHash(ctx SQLContext, objectID int64, fileHash string) (int64, error) {
ret, err := ctx.Exec("update ObjectRep set FileHash = ? where ObjectID = ?", fileHash, objectID)
if err != nil {
return 0, err
}

cnt, err := ret.RowsAffected()
if err != nil {
return 0, fmt.Errorf("get affected rows failed, err: %w", err)
}

return cnt, nil
}

func (db *ObjectRepDB) Delete(ctx SQLContext, objectID int64) error {
_, err := ctx.Exec("delete from ObjectRep where ObjectID = ?", objectID)
return err
}

func (db *ObjectRepDB) GetFileMaxRepCount(ctx SQLContext, fileHash string) (int, error) {
var maxRepCnt *int
err := sqlx.Get(ctx, &maxRepCnt,
"select max(RepCount) from ObjectRep, Object where FileHash = ? and "+
"ObjectRep.ObjectID = Object.ObjectID and "+
"Object.State = ?", fileHash, consts.ObjectStateNormal)

if err == sql.ErrNoRows {
return 0, nil
}

if err != nil {
return 0, err
}

if maxRepCnt == nil {
return 0, nil
}

return *maxRepCnt, err
}

+ 64
- 0
pkgs/db/storage.go View File

@@ -0,0 +1,64 @@
package db

import (
"database/sql"
"fmt"

"github.com/jmoiron/sqlx"
"gitlink.org.cn/cloudream/storage-common/pkgs/db/model"
)

type StorageDB struct {
*DB
}

func (db *DB) Storage() *StorageDB {
return &StorageDB{DB: db}
}

func (db *StorageDB) GetByID(ctx SQLContext, stgID int64) (model.Storage, error) {
var stg model.Storage
err := sqlx.Get(ctx, &stg, "select * from Storage where StorageID = ?", stgID)
return stg, err
}

func (db *StorageDB) BatchGetAllStorageIDs(ctx SQLContext, start int, count int) ([]int64, error) {
var ret []int64
err := sqlx.Select(ctx, &ret, "select StorageID from Storage limit ?, ?", start, count)
return ret, err
}

func (db *StorageDB) IsAvailable(ctx SQLContext, userID int64, storageID int64) (bool, error) {
var stgID int64
err := sqlx.Get(ctx, &stgID,
"select Storage.StorageID from Storage, UserStorage where "+
"Storage.StorageID = ? and "+
"Storage.StorageID = UserStorage.StorageID and "+
"UserStorage.UserID = ?",
storageID, userID)

if err == sql.ErrNoRows {
return false, nil
}

if err != nil {
return false, fmt.Errorf("find storage failed, err: %w", err)
}

return true, nil
}

func (db *StorageDB) GetUserStorage(ctx SQLContext, userID int64, storageID int64) (model.Storage, error) {
var stg model.Storage
err := sqlx.Get(ctx, &stg,
"select Storage.* from UserStorage, Storage where UserID = ? and UserStorage.StorageID = ? and UserStorage.StorageID = Storage.StorageID",
userID,
storageID)

return stg, err
}

func (db *StorageDB) ChangeState(ctx SQLContext, storageID int64, state string) error {
_, err := ctx.Exec("update Storage set State = ? where StorageID = ?", state, storageID)
return err
}

+ 116
- 0
pkgs/db/storage_object.go View File

@@ -0,0 +1,116 @@
package db

import (
"fmt"

"github.com/jmoiron/sqlx"
"gitlink.org.cn/cloudream/common/consts"
"gitlink.org.cn/cloudream/storage-common/pkgs/db/model"
)

type StorageObjectDB struct {
*DB
}

func (db *DB) StorageObject() *StorageObjectDB {
return &StorageObjectDB{DB: db}
}

func (*StorageObjectDB) Get(ctx SQLContext, storageID int64, objectID int64, userID int64) (model.StorageObject, error) {
var ret model.StorageObject
err := sqlx.Get(ctx, &ret, "select * from StorageObject where StorageID = ? and ObjectID = ? and UserID = ?", storageID, objectID, userID)
return ret, err
}

func (*StorageObjectDB) GetAllByStorageAndObjectID(ctx SQLContext, storageID int64, objectID int64) ([]model.StorageObject, error) {
var ret []model.StorageObject
err := sqlx.Select(ctx, &ret, "select * from StorageObject where StorageID = ? and ObjectID = ?", storageID, objectID)
return ret, err
}

func (*StorageObjectDB) GetAllByStorageID(ctx SQLContext, storageID int64) ([]model.StorageObject, error) {
var ret []model.StorageObject
err := sqlx.Select(ctx, &ret, "select * from StorageObject where StorageID = ?", storageID)
return ret, err
}

func (*StorageObjectDB) MoveObjectTo(ctx SQLContext, objectID int64, storageID int64, userID int64) error {
_, err := ctx.Exec("insert into StorageObject values(?,?,?,?)", objectID, storageID, userID, consts.StorageObjectStateNormal)
return err
}

func (*StorageObjectDB) ChangeState(ctx SQLContext, storageID int64, objectID int64, userID int64, state string) error {
_, err := ctx.Exec("update StorageObject set State = ? where StorageID = ? and ObjectID = ? and UserID = ?", state, storageID, objectID, userID)
return err
}

// SetStateNormal 将状态设置为Normal,如果记录状态是Deleted,则不进行操作
func (*StorageObjectDB) SetStateNormal(ctx SQLContext, storageID int64, objectID int64, userID int64) error {
_, err := ctx.Exec("update StorageObject set State = ? where StorageID = ? and ObjectID = ? and UserID = ? and State <> ?",
consts.StorageObjectStateNormal,
storageID,
objectID,
userID,
consts.StorageObjectStateDeleted,
)
return err
}

func (*StorageObjectDB) SetAllObjectState(ctx SQLContext, objectID int64, state string) (int64, error) {
ret, err := ctx.Exec(
"update StorageObject set State = ? where ObjectID = ?",
state,
objectID,
)
if err != nil {
return 0, err
}

cnt, err := ret.RowsAffected()
if err != nil {
return 0, fmt.Errorf("get affected rows failed, err: %w", err)
}

return cnt, nil
}

// SetAllObjectOutdated 将Storage中指定对象设置为已过期。
// 注:只会设置Normal状态的对象
func (*StorageObjectDB) SetAllObjectOutdated(ctx SQLContext, objectID int64) (int64, error) {
ret, err := ctx.Exec(
"update StorageObject set State = ? where State = ? and ObjectID = ?",
consts.StorageObjectStateOutdated,
consts.StorageObjectStateNormal,
objectID,
)
if err != nil {
return 0, err
}

cnt, err := ret.RowsAffected()
if err != nil {
return 0, fmt.Errorf("get affected rows failed, err: %w", err)
}

return cnt, nil
}

func (db *StorageObjectDB) SetAllObjectDeleted(ctx SQLContext, objectID int64) (int64, error) {
return db.SetAllObjectState(ctx, objectID, consts.StorageObjectStateDeleted)
}

func (*StorageObjectDB) Delete(ctx SQLContext, storageID int64, objectID int64, userID int64) error {
_, err := ctx.Exec("delete from StorageObject where StorageID = ? and ObjectID = ? and UserID = ?", storageID, objectID, userID)
return err
}

// FindObjectStorages 查询存储了指定对象的Storage
func (*StorageObjectDB) FindObjectStorages(ctx SQLContext, objectID int64) ([]model.Storage, error) {
var ret []model.Storage
err := sqlx.Select(ctx, &ret,
"select Storage.* from StorageObject, Storage where ObjectID = ? and "+
"StorageObject.StorageID = Storage.StorageID",
objectID,
)
return ret, err
}

+ 14
- 0
pkgs/db/user_bucket.go View File

@@ -0,0 +1,14 @@
package db

type UserBucketDB struct {
*DB
}

func (db *DB) UserBucket() *UserBucketDB {
return &UserBucketDB{DB: db}
}

func (*UserBucketDB) Create(ctx SQLContext, userID int64, bucketID int64) error {
_, err := ctx.Exec("insert into UserBucket(UserID,BucketID) values(?,?)", userID, bucketID)
return err
}

+ 38
- 0
pkgs/ec/rs.go View File

@@ -0,0 +1,38 @@
package ec

import (
"fmt"
"os"

"github.com/baohan10/reedsolomon"
)

type rs struct {
r *(reedsolomon.ReedSolomon)
ecN int
ecK int
ecP int
}

func NewRsEnc(ecK int, ecN int) *rs {
enc := rs{
ecN: ecN,
ecK: ecK,
ecP: ecN - ecK,
}
enc.r = reedsolomon.GetReedSolomonIns(ecK, ecN)
return &enc
}
func (r *rs) Encode(all [][]byte) {
r.r.Encode(all)
}

func (r *rs) Repair(all [][]byte) error {
return r.r.Reconstruct(all)
}

func checkErr(err error) {
if err != nil {
fmt.Fprintf(os.Stderr, "Error: %s", err.Error())
}
}

+ 10
- 0
pkgs/mq/client/agent/agent.go View File

@@ -0,0 +1,10 @@
package agent

import (
"gitlink.org.cn/cloudream/common/pkg/mq"
agtmsg "gitlink.org.cn/cloudream/storage-common/pkgs/mq/message/agent"
)

func (client *Client) GetState(msg agtmsg.GetState, opts ...mq.RequestOption) (*agtmsg.GetStateResp, error) {
return mq.Request[agtmsg.GetStateResp](client.rabbitCli, msg, opts...)
}

+ 28
- 0
pkgs/mq/client/agent/client.go View File

@@ -0,0 +1,28 @@
package agent

import (
"gitlink.org.cn/cloudream/common/pkg/mq"
mymq "gitlink.org.cn/cloudream/storage-common/pkgs/mq"
"gitlink.org.cn/cloudream/storage-common/pkgs/mq/config"
)

type Client struct {
rabbitCli *mq.RabbitMQClient
id int64
}

func NewClient(id int64, cfg *config.Config) (*Client, error) {
rabbitCli, err := mq.NewRabbitMQClient(cfg.MakeConnectingURL(), mymq.MakeAgentQueueName(id), "")
if err != nil {
return nil, err
}

return &Client{
rabbitCli: rabbitCli,
id: id,
}, nil
}

func (c *Client) Close() {
c.rabbitCli.Close()
}

+ 10
- 0
pkgs/mq/client/agent/ipfs.go View File

@@ -0,0 +1,10 @@
package agent

import (
"gitlink.org.cn/cloudream/common/pkg/mq"
agtmsg "gitlink.org.cn/cloudream/storage-common/pkgs/mq/message/agent"
)

func (client *Client) CheckIPFS(msg agtmsg.CheckIPFS, opts ...mq.RequestOption) (*agtmsg.CheckIPFSResp, error) {
return mq.Request[agtmsg.CheckIPFSResp](client.rabbitCli, msg, opts...)
}

+ 14
- 0
pkgs/mq/client/agent/object.go View File

@@ -0,0 +1,14 @@
package agent

import (
"gitlink.org.cn/cloudream/common/pkg/mq"
agtmsg "gitlink.org.cn/cloudream/storage-common/pkgs/mq/message/agent"
)

func (client *Client) StartPinningObject(msg agtmsg.StartPinningObject, opts ...mq.RequestOption) (*agtmsg.StartPinningObjectResp, error) {
return mq.Request[agtmsg.StartPinningObjectResp](client.rabbitCli, msg, opts...)
}

func (client *Client) WaitPinningObject(msg agtmsg.WaitPinningObject, opts ...mq.RequestOption) (*agtmsg.WaitPinningObjectResp, error) {
return mq.Request[agtmsg.WaitPinningObjectResp](client.rabbitCli, msg, opts...)
}

+ 26
- 0
pkgs/mq/client/agent/storage.go View File

@@ -0,0 +1,26 @@
package agent

import (
"gitlink.org.cn/cloudream/common/pkg/mq"
agtmsg "gitlink.org.cn/cloudream/storage-common/pkgs/mq/message/agent"
)

func (client *Client) StartStorageMoveObject(msg agtmsg.StartStorageMoveObject, opts ...mq.RequestOption) (*agtmsg.StartStorageMoveObjectResp, error) {
return mq.Request[agtmsg.StartStorageMoveObjectResp](client.rabbitCli, msg, opts...)
}

func (client *Client) WaitStorageMoveObject(msg agtmsg.WaitStorageMoveObject, opts ...mq.RequestOption) (*agtmsg.WaitStorageMoveObjectResp, error) {
return mq.Request[agtmsg.WaitStorageMoveObjectResp](client.rabbitCli, msg, opts...)
}

func (client *Client) StorageCheck(msg agtmsg.StorageCheck, opts ...mq.RequestOption) (*agtmsg.StorageCheckResp, error) {
return mq.Request[agtmsg.StorageCheckResp](client.rabbitCli, msg, opts...)
}

func (client *Client) StartStorageUploadRepObject(msg agtmsg.StartStorageUploadRepObject, opts ...mq.RequestOption) (*agtmsg.StartStorageUploadRepObjectResp, error) {
return mq.Request[agtmsg.StartStorageUploadRepObjectResp](client.rabbitCli, msg, opts...)
}

func (client *Client) WaitStorageUploadRepObject(msg agtmsg.WaitStorageUploadRepObject, opts ...mq.RequestOption) (*agtmsg.WaitStorageUploadRepObjectResp, error) {
return mq.Request[agtmsg.WaitStorageUploadRepObjectResp](client.rabbitCli, msg, opts...)
}

+ 14
- 0
pkgs/mq/client/coordinator/agent.go View File

@@ -0,0 +1,14 @@
package coordinator

import (
"gitlink.org.cn/cloudream/common/pkg/mq"
coormsg "gitlink.org.cn/cloudream/storage-common/pkgs/mq/message/coordinator"
)

func (client *Client) TempCacheReport(msg coormsg.TempCacheReport) error {
return mq.Send(client.rabbitCli, msg)
}

func (client *Client) AgentStatusReport(msg coormsg.AgentStatusReport) error {
return mq.Send(client.rabbitCli, msg)
}

+ 22
- 0
pkgs/mq/client/coordinator/bucket.go View File

@@ -0,0 +1,22 @@
package coordinator

import (
"gitlink.org.cn/cloudream/common/pkg/mq"
coormsg "gitlink.org.cn/cloudream/storage-common/pkgs/mq/message/coordinator"
)

func (client *Client) GetUserBuckets(msg coormsg.GetUserBuckets) (*coormsg.GetUserBucketsResp, error) {
return mq.Request[coormsg.GetUserBucketsResp](client.rabbitCli, msg)
}

func (client *Client) GetBucketObjects(msg coormsg.GetBucketObjects) (*coormsg.GetBucketObjectsResp, error) {
return mq.Request[coormsg.GetBucketObjectsResp](client.rabbitCli, msg)
}

func (client *Client) CreateBucket(msg coormsg.CreateBucket) (*coormsg.CreateBucketResp, error) {
return mq.Request[coormsg.CreateBucketResp](client.rabbitCli, msg)
}

func (client *Client) DeleteBucket(msg coormsg.DeleteBucket) (*coormsg.DeleteBucketResp, error) {
return mq.Request[coormsg.DeleteBucketResp](client.rabbitCli, msg)
}

+ 26
- 0
pkgs/mq/client/coordinator/client.go View File

@@ -0,0 +1,26 @@
package coordinator

import (
"gitlink.org.cn/cloudream/common/pkg/mq"
mymq "gitlink.org.cn/cloudream/storage-common/pkgs/mq"
"gitlink.org.cn/cloudream/storage-common/pkgs/mq/config"
)

type Client struct {
rabbitCli *mq.RabbitMQClient
}

func NewClient(cfg *config.Config) (*Client, error) {
rabbitCli, err := mq.NewRabbitMQClient(cfg.MakeConnectingURL(), mymq.COORDINATOR_QUEUE_NAME, "")
if err != nil {
return nil, err
}

return &Client{
rabbitCli: rabbitCli,
}, nil
}

func (c *Client) Close() {
c.rabbitCli.Close()
}

+ 42
- 0
pkgs/mq/client/coordinator/object.go View File

@@ -0,0 +1,42 @@
package coordinator

import (
"gitlink.org.cn/cloudream/common/pkg/mq"
coormsg "gitlink.org.cn/cloudream/storage-common/pkgs/mq/message/coordinator"
)

func (client *Client) GetObjectsByDirName(msg coormsg.GetObjectsByDirName) (*coormsg.GetObjectsResp, error) {
return mq.Request[coormsg.GetObjectsResp](client.rabbitCli, msg)
}

func (client *Client) PreDownloadObject(msg coormsg.PreDownloadObject) (*coormsg.PreDownloadObjectResp, error) {
return mq.Request[coormsg.PreDownloadObjectResp](client.rabbitCli, msg)
}

func (client *Client) PreUploadRepObject(msg coormsg.PreUploadRepObject) (*coormsg.PreUploadResp, error) {
return mq.Request[coormsg.PreUploadResp](client.rabbitCli, msg)
}

func (client *Client) CreateRepObject(msg coormsg.CreateRepObject) (*coormsg.CreateObjectResp, error) {
return mq.Request[coormsg.CreateObjectResp](client.rabbitCli, msg)
}

func (client *Client) PreUploadEcObject(msg coormsg.PreUploadEcObject) (*coormsg.PreUploadEcResp, error) {
return mq.Request[coormsg.PreUploadEcResp](client.rabbitCli, msg)
}

func (client *Client) CreateEcObject(msg coormsg.CreateEcObject) (*coormsg.CreateObjectResp, error) {
return mq.Request[coormsg.CreateObjectResp](client.rabbitCli, msg)
}

func (client *Client) PreUpdateRepObject(msg coormsg.PreUpdateRepObject) (*coormsg.PreUpdateRepObjectResp, error) {
return mq.Request[coormsg.PreUpdateRepObjectResp](client.rabbitCli, msg)
}

func (client *Client) UpdateRepObject(msg coormsg.UpdateRepObject) (*coormsg.UpdateRepObjectResp, error) {
return mq.Request[coormsg.UpdateRepObjectResp](client.rabbitCli, msg)
}

func (client *Client) DeleteObject(msg coormsg.DeleteObject) (*coormsg.DeleteObjectResp, error) {
return mq.Request[coormsg.DeleteObjectResp](client.rabbitCli, msg)
}

+ 18
- 0
pkgs/mq/client/coordinator/storage.go View File

@@ -0,0 +1,18 @@
package coordinator

import (
"gitlink.org.cn/cloudream/common/pkg/mq"
coormsg "gitlink.org.cn/cloudream/storage-common/pkgs/mq/message/coordinator"
)

func (client *Client) GetStorageInfo(msg coormsg.GetStorageInfo) (*coormsg.GetStorageInfoResp, error) {
return mq.Request[coormsg.GetStorageInfoResp](client.rabbitCli, msg)
}

func (client *Client) PreMoveObjectToStorage(msg coormsg.PreMoveObjectToStorage) (*coormsg.PreMoveObjectToStorageResp, error) {
return mq.Request[coormsg.PreMoveObjectToStorageResp](client.rabbitCli, msg)
}

func (client *Client) MoveObjectToStorage(msg coormsg.MoveObjectToStorage) (*coormsg.MoveObjectToStorageResp, error) {
return mq.Request[coormsg.MoveObjectToStorageResp](client.rabbitCli, msg)
}

+ 26
- 0
pkgs/mq/client/scanner/client.go View File

@@ -0,0 +1,26 @@
package scanner

import (
"gitlink.org.cn/cloudream/common/pkg/mq"
mymq "gitlink.org.cn/cloudream/storage-common/pkgs/mq"
"gitlink.org.cn/cloudream/storage-common/pkgs/mq/config"
)

type Client struct {
rabbitCli *mq.RabbitMQClient
}

func NewClient(cfg *config.Config) (*Client, error) {
rabbitCli, err := mq.NewRabbitMQClient(cfg.MakeConnectingURL(), mymq.SCANNER_QUEUE_NAME, "")
if err != nil {
return nil, err
}

return &Client{
rabbitCli: rabbitCli,
}, nil
}

func (c *Client) Close() {
c.rabbitCli.Close()
}

+ 25
- 0
pkgs/mq/client/scanner/event.go View File

@@ -0,0 +1,25 @@
package scanner

import (
"fmt"
"time"

"gitlink.org.cn/cloudream/common/pkg/mq"
scmsg "gitlink.org.cn/cloudream/storage-common/pkgs/mq/message/scanner"
)

func (cli *Client) PostEvent(event any, isEmergency bool, dontMerge bool, opts ...mq.SendOption) error {
opt := mq.SendOption{
Timeout: time.Second * 30,
}
if len(opts) > 0 {
opt = opts[0]
}

body, err := scmsg.NewPostEvent(event, isEmergency, dontMerge)
if err != nil {
return fmt.Errorf("new post event body failed, err: %w", err)
}

return mq.Send(cli.rabbitCli, body, opt)
}

+ 14
- 0
pkgs/mq/config/config.go View File

@@ -0,0 +1,14 @@
package config

import "fmt"

type Config struct {
Address string `json:"address"`
Account string `json:"account"`
Password string `json:"password"`
VHost string `json:"vhost"`
}

func (cfg *Config) MakeConnectingURL() string {
return fmt.Sprintf("amqp://%s:%s@%s%s", cfg.Account, cfg.Password, cfg.Address, cfg.VHost)
}

+ 12
- 0
pkgs/mq/consts.go View File

@@ -0,0 +1,12 @@
package mq

import "fmt"

const (
COORDINATOR_QUEUE_NAME = "Coordinator"
SCANNER_QUEUE_NAME = "Scanner"
)

func MakeAgentQueueName(id int64) string {
return fmt.Sprintf("Agent@%d", id)
}

+ 27
- 0
pkgs/mq/message/agent/agent.go View File

@@ -0,0 +1,27 @@
package agent

import (
"gitlink.org.cn/cloudream/common/pkg/mq"
)

type GetState struct {
}

func NewGetState() GetState {
return GetState{}
}

type GetStateResp struct {
IPFSState string `json:"ipfsState"`
}

func NewGetStateRespBody(ipfsState string) GetStateResp {
return GetStateResp{
IPFSState: ipfsState,
}
}

func init() {
mq.RegisterMessage[GetState]()
mq.RegisterMessage[GetStateResp]()
}

+ 50
- 0
pkgs/mq/message/agent/ipfs.go View File

@@ -0,0 +1,50 @@
package agent

import (
"gitlink.org.cn/cloudream/common/pkg/mq"
"gitlink.org.cn/cloudream/storage-common/pkgs/db/model"
)

type CheckIPFS struct {
IsComplete bool `json:"isComplete"`
Caches []model.Cache `json:"caches"`
}

func NewCheckIPFS(isComplete bool, caches []model.Cache) CheckIPFS {
return CheckIPFS{
IsComplete: isComplete,
Caches: caches,
}
}

type CheckIPFSResp struct {
Entries []CheckIPFSRespEntry `json:"entries"`
}

const (
CHECK_IPFS_RESP_OP_DELETE_TEMP = "DeleteTemp"
CHECK_IPFS_RESP_OP_CREATE_TEMP = "CreateTemp"
)

type CheckIPFSRespEntry struct {
FileHash string `json:"fileHash"`
Operation string `json:"operation"`
}

func NewCheckIPFSRespEntry(fileHash string, op string) CheckIPFSRespEntry {
return CheckIPFSRespEntry{
FileHash: fileHash,
Operation: op,
}
}

func NewCheckIPFSResp(entries []CheckIPFSRespEntry) CheckIPFSResp {
return CheckIPFSResp{
Entries: entries,
}
}

func init() {
mq.RegisterMessage[CheckIPFS]()
mq.RegisterMessage[CheckIPFSResp]()
}

+ 55
- 0
pkgs/mq/message/agent/object.go View File

@@ -0,0 +1,55 @@
package agent

import "gitlink.org.cn/cloudream/common/pkg/mq"

type StartPinningObject struct {
FileHash string `json:"fileHash"`
}

func NewStartPinningObject(fileHash string) StartPinningObject {
return StartPinningObject{
FileHash: fileHash,
}
}

type StartPinningObjectResp struct {
TaskID string `json:"taskID"`
}

func NewStartPinningObjectResp(taskID string) StartPinningObjectResp {
return StartPinningObjectResp{
TaskID: taskID,
}
}

type WaitPinningObject struct {
TaskID string `json:"taskID"`
WaitTimeoutMs int64 `json:"waitTimeout"`
}

func NewWaitPinningObject(taskID string, waitTimeoutMs int64) WaitPinningObject {
return WaitPinningObject{
TaskID: taskID,
WaitTimeoutMs: waitTimeoutMs,
}
}

type WaitPinningObjectResp struct {
IsComplete bool `json:"isComplete"`
Error string `json:"error"`
}

func NewWaitPinningObjectResp(isComplete bool, err string) WaitPinningObjectResp {
return WaitPinningObjectResp{
IsComplete: isComplete,
Error: err,
}
}

func init() {
mq.RegisterMessage[StartPinningObject]()
mq.RegisterMessage[StartPinningObjectResp]()

mq.RegisterMessage[WaitPinningObject]()
mq.RegisterMessage[WaitPinningObjectResp]()
}

+ 184
- 0
pkgs/mq/message/agent/storage.go View File

@@ -0,0 +1,184 @@
package agent

import (
"gitlink.org.cn/cloudream/common/models"
"gitlink.org.cn/cloudream/common/pkg/mq"
"gitlink.org.cn/cloudream/storage-common/pkgs/db/model"
)

// 客户端发给代理端,告知要调度多副本冗余的数据,以及要调度数据的详情
type StartStorageMoveObject struct {
UserID int64 `json:"userID"`
ObjectID int64 `json:"objectID"`
ObjectName string `json:"objectName"`
Directory string `json:"directory"`
FileSize int64 `json:"fileSize,string"`
Redundancy models.RedundancyDataTypes `json:"redundancy"`
}

func NewStartStorageMoveObject[T models.RedundancyDataTypes](dir string, objectID int64, objectName string, userID int64, fileSize int64, redundancy T) StartStorageMoveObject {
return StartStorageMoveObject{
Directory: dir,
ObjectID: objectID,
ObjectName: objectName,
UserID: userID,
FileSize: fileSize,
Redundancy: redundancy,
}
}

// 代理端发给客户端,告知调度的结果
type StartStorageMoveObjectResp struct {
TaskID string `json:"taskID"`
}

func NewStartStorageMoveObjectResp(taskID string) StartStorageMoveObjectResp {
return StartStorageMoveObjectResp{
TaskID: taskID,
}
}

type WaitStorageMoveObject struct {
TaskID string `json:"taskID"`
WaitTimeoutMs int64 `json:"waitTimeout"`
}

func NewWaitStorageMoveObject(taskID string, waitTimeoutMs int64) WaitStorageMoveObject {
return WaitStorageMoveObject{
TaskID: taskID,
WaitTimeoutMs: waitTimeoutMs,
}
}

type WaitStorageMoveObjectResp struct {
IsComplete bool `json:"isComplete"`
Error string `json:"error"`
}

func NewWaitStorageMoveObjectResp(isComplete bool, err string) WaitStorageMoveObjectResp {
return WaitStorageMoveObjectResp{
IsComplete: isComplete,
Error: err,
}
}

type StorageCheck struct {
StorageID int64 `json:"storageID"`
Directory string `json:"directory"`
IsComplete bool `json:"isComplete"`
Objects []model.StorageObject `json:"objects"`
}

func NewStorageCheck(storageID int64, directory string, isComplete bool, objects []model.StorageObject) StorageCheck {
return StorageCheck{
StorageID: storageID,
Directory: directory,
IsComplete: isComplete,
Objects: objects,
}
}

type StorageCheckResp struct {
DirectoryState string `json:"directoryState"`
Entries []StorageCheckRespEntry `json:"entries"`
}

const (
CHECK_STORAGE_RESP_OP_DELETE = "Delete"
CHECK_STORAGE_RESP_OP_SET_NORMAL = "SetNormal"
)

type StorageCheckRespEntry struct {
ObjectID int64 `json:"objectID"`
UserID int64 `json:"userID"`
Operation string `json:"operation"`
}

func NewStorageCheckRespEntry(objectID int64, userID int64, op string) StorageCheckRespEntry {
return StorageCheckRespEntry{
ObjectID: objectID,
UserID: userID,
Operation: op,
}
}
func NewStorageCheckResp(dirState string, entries []StorageCheckRespEntry) StorageCheckResp {
return StorageCheckResp{
DirectoryState: dirState,
Entries: entries,
}
}

type StartStorageUploadRepObject struct {
UserID int64 `json:"userID"`
FilePath string `json:"filePath"`
BucketID int64 `json:"bucketID"`
ObjectName string `json:"objectName"`
RepCount int `json:"repCount"`
StorageDirectory string `json:"storageDirectory"`
}

func NewStartStorageUploadRepObject(userID int64, filePath string, bucketID int64, objectName string, repCount int, storageDirectory string) StartStorageUploadRepObject {
return StartStorageUploadRepObject{
UserID: userID,
FilePath: filePath,
BucketID: bucketID,
ObjectName: objectName,
RepCount: repCount,
StorageDirectory: storageDirectory,
}
}

type StartStorageUploadRepObjectResp struct {
TaskID string `json:"taskID"`
}

func NewStartStorageUploadRepObjectResp(taskID string) StartStorageUploadRepObjectResp {
return StartStorageUploadRepObjectResp{
TaskID: taskID,
}
}

type WaitStorageUploadRepObject struct {
TaskID string `json:"taskID"`
WaitTimeoutMs int64 `json:"waitTimeout"`
}

func NewWaitStorageUploadRepObject(taskID string, waitTimeoutMs int64) WaitStorageUploadRepObject {
return WaitStorageUploadRepObject{
TaskID: taskID,
WaitTimeoutMs: waitTimeoutMs,
}
}

type WaitStorageUploadRepObjectResp struct {
IsComplete bool `json:"isComplete"`
Error string `json:"error"`
ObjectID int64 `json:"objectID"`
FileHash string `json:"fileHash"`
}

func NewWaitStorageUploadRepObjectResp(isComplete bool, err string, objectID int64, fileHash string) WaitStorageUploadRepObjectResp {
return WaitStorageUploadRepObjectResp{
IsComplete: isComplete,
Error: err,
ObjectID: objectID,
FileHash: fileHash,
}
}

func init() {
mq.RegisterMessage[StartStorageMoveObject]()
mq.RegisterMessage[StartStorageMoveObjectResp]()

mq.RegisterMessage[WaitStorageMoveObject]()
mq.RegisterMessage[WaitStorageMoveObjectResp]()

mq.RegisterMessage[StorageCheck]()
mq.RegisterMessage[StorageCheckResp]()

mq.RegisterMessage[StartStorageUploadRepObject]()
mq.RegisterMessage[StartStorageUploadRepObjectResp]()

mq.RegisterMessage[WaitStorageUploadRepObject]()
mq.RegisterMessage[WaitStorageUploadRepObjectResp]()
}

+ 41
- 0
pkgs/mq/message/coordinator/agent.go View File

@@ -0,0 +1,41 @@
package coordinator

import "gitlink.org.cn/cloudream/common/pkg/mq"

// 代理端发给协调端,告知临时缓存的数据
type TempCacheReport struct {
NodeID int64 `json:"nodeID"`
Hashes []string `json:"hashes"`
}

func NewTempCacheReportBody(nodeID int64, hashes []string) TempCacheReport {
return TempCacheReport{
NodeID: nodeID,
Hashes: hashes,
}
}

// 代理端发给协调端,告知延迟、ipfs和资源目录的可达性
type AgentStatusReport struct {
NodeID int64 `json:"nodeID"`
NodeDelayIDs []int64 `json:"nodeDelayIDs"`
NodeDelays []int `json:"nodeDelays"`
IPFSStatus string `json:"ipfsStatus"`
LocalDirStatus string `json:"localDirStatus"`
}

func NewAgentStatusReportBody(nodeID int64, nodeDelayIDs []int64, nodeDelays []int, ipfsStatus string, localDirStatus string) AgentStatusReport {
return AgentStatusReport{
NodeID: nodeID,
NodeDelayIDs: nodeDelayIDs,
NodeDelays: nodeDelays,
IPFSStatus: ipfsStatus,
LocalDirStatus: localDirStatus,
}
}

func init() {
mq.RegisterMessage[TempCacheReport]()

mq.RegisterMessage[AgentStatusReport]()
}

+ 102
- 0
pkgs/mq/message/coordinator/bucket.go View File

@@ -0,0 +1,102 @@
package coordinator

import (
"gitlink.org.cn/cloudream/common/pkg/mq"
"gitlink.org.cn/cloudream/storage-common/pkgs/db/model"
)

type GetUserBuckets struct {
UserID int64 `json:"userID"`
}

func NewGetUserBuckets(userID int64) GetUserBuckets {
return GetUserBuckets{
UserID: userID,
}
}

type GetUserBucketsResp struct {
Buckets []model.Bucket `json:"buckets"`
}

func NewGetUserBucketsResp(buckets []model.Bucket) GetUserBucketsResp {
return GetUserBucketsResp{
Buckets: buckets,
}
}

type GetBucketObjects struct {
UserID int64 `json:"userID"`
BucketID int64 `json:"bucketID"`
}

func NewGetBucketObjects(userID int64, bucketID int64) GetBucketObjects {
return GetBucketObjects{
UserID: userID,
BucketID: bucketID,
}
}

type GetBucketObjectsResp struct {
Objects []model.Object `json:"objects"`
}

func NewGetBucketObjectsResp(objects []model.Object) GetBucketObjectsResp {
return GetBucketObjectsResp{
Objects: objects,
}
}

type CreateBucket struct {
UserID int64 `json:"userID"`
BucketName string `json:"bucketName"`
}

func NewCreateBucket(userID int64, bucketName string) CreateBucket {
return CreateBucket{
UserID: userID,
BucketName: bucketName,
}
}

type CreateBucketResp struct {
BucketID int64 `json:"bucketID"`
}

func NewCreateBucketResp(bucketID int64) CreateBucketResp {
return CreateBucketResp{
BucketID: bucketID,
}
}

type DeleteBucket struct {
UserID int64 `json:"userID"`
BucketID int64 `json:"bucketID"`
}

func NewDeleteBucket(userID int64, bucketID int64) DeleteBucket {
return DeleteBucket{
UserID: userID,
BucketID: bucketID,
}
}

type DeleteBucketResp struct{}

func NewDeleteBucketResp() DeleteBucketResp {
return DeleteBucketResp{}
}

func init() {
mq.RegisterMessage[GetUserBuckets]()
mq.RegisterMessage[GetUserBucketsResp]()

mq.RegisterMessage[GetBucketObjects]()
mq.RegisterMessage[GetBucketObjectsResp]()

mq.RegisterMessage[CreateBucket]()
mq.RegisterMessage[CreateBucketResp]()

mq.RegisterMessage[DeleteBucket]()
mq.RegisterMessage[DeleteBucketResp]()
}

+ 24
- 0
pkgs/mq/message/coordinator/coordinator_test.go View File

@@ -0,0 +1,24 @@
package coordinator

import (
"testing"

. "github.com/smartystreets/goconvey/convey"
"gitlink.org.cn/cloudream/common/pkg/mq"
)

func TestSerder(t *testing.T) {
Convey("序列化ReadCmd", t, func() {
msg := mq.MakeMessage(NewPreDownloadObject(1, 123, ""))

data, err := mq.Serialize(msg)

So(err, ShouldBeNil)

deMsg, err := mq.Deserialize(data)

So(err, ShouldBeNil)

So(*deMsg, ShouldResemble, msg)
})
}

+ 295
- 0
pkgs/mq/message/coordinator/object.go View File

@@ -0,0 +1,295 @@
package coordinator

import (
"gitlink.org.cn/cloudream/common/pkg/mq"
"gitlink.org.cn/cloudream/storage-common/pkgs/db/model"
ramsg "gitlink.org.cn/cloudream/storage-common/pkgs/mq/message"
)

// 客户端发给协调端,告知要查询的文件夹名称
type GetObjectsByDirName struct {
UserID int64 `json:"userID"`
DirName string `json:"dirName"`
}

func NewGetObjectsByDirName(userID int64, dirName string) GetObjectsByDirName {
return GetObjectsByDirName{
UserID: userID,
DirName: dirName,
}
}

// 协调端告知客户端,查询到的object信息
type GetObjectsResp struct {
Objects []model.Object `json:"objects"`
}

func NewGetObjectsResp(objects []model.Object) GetObjectsResp {
return GetObjectsResp{
Objects: objects,
}
}

// 客户端发给协调端,告知要读取数据
type PreDownloadObject struct {
ObjectID int64 `json:"objectID"`
UserID int64 `json:"userID"`
ClientExternalIP string `json:"clientExternalIP"` // 客户端的外网IP
}

func NewPreDownloadObject(objectID int64, userID int64, clientExternalIP string) PreDownloadObject {
return PreDownloadObject{
ObjectID: objectID,
UserID: userID,
ClientExternalIP: clientExternalIP,
}
}

// 协调端告知客户端,待读取数据的元数据
type PreDownloadObjectResp struct {
FileSize int64 `json:"fileSize,string"`
Redundancy ramsg.RespRedundancyDataTypes `json:"redundancy"`
}

func NewPreDownloadObjectResp[T ramsg.RespRedundancyDataTypesConst](fileSize int64, redundancy T) PreDownloadObjectResp {
return PreDownloadObjectResp{
Redundancy: redundancy,
FileSize: fileSize,
}
}

// 客户端发给协调端,告知要以多副本方式执行写入操作
type PreUploadRepObject struct {
BucketID int64 `json:"bucketID"`
ObjectName string `json:"objectName"`
FileSize int64 `json:"fileSize,string"`
UserID int64 `json:"userID"`
ClientExternalIP string `json:"clientExternalIP"` // 客户端的外网IP
}

func NewPreUploadRepObjectBody(bucketID int64, objectName string, fileSize int64, userID int64, clientExterIP string) PreUploadRepObject {
return PreUploadRepObject{
BucketID: bucketID,
ObjectName: objectName,
FileSize: fileSize,
UserID: userID,
ClientExternalIP: clientExterIP,
}
}

// 协调端发给客户端,返回副本的写入目的地节点IP
type PreUploadResp struct {
Nodes []ramsg.RespNode `json:"nodes"`
}

func NewPreUploadResp(nodes []ramsg.RespNode) PreUploadResp {
return PreUploadResp{
Nodes: nodes,
}
}

// 客户端发给协调端,告知要以纠删码方式执行写入操作
type PreUploadEcObject struct {
BucketID int64 `json:"bucketID"`
ObjectName string `json:"objectName"`
FileSize int64 `json:"fileSize,string"`
EcName string `json:"ecName"`
UserID int64 `json:"userID"`
ClientExternalIP string `json:"clientExternalIP"` // 读取方的外网IP
}

func NewPreUploadEcObject(bucketID int64, objectName string, fileSize int64, ecName string, userID int64, writerExterIP string) PreUploadEcObject {
return PreUploadEcObject{
BucketID: bucketID,
ObjectName: objectName,
FileSize: fileSize,
EcName: ecName,
UserID: userID,
ClientExternalIP: writerExterIP,
}
}

// 协调端发给客户端,返回编码块的写入目的地节点IP
type PreUploadEcResp struct {
Nodes []ramsg.RespNode `json:"nodes"`
Ec ramsg.Ec `json:"ec"`
}

func NewPreUploadEcResp(nodes []ramsg.RespNode, ec ramsg.Ec) PreUploadEcResp {
return PreUploadEcResp{
Nodes: nodes,
Ec: ec,
}
}

type CreateRepObject struct {
BucketID int64 `json:"bucketID"`
ObjectName string `json:"objectName"`
NodeIDs []int64 `json:"nodeIDs"`
FileHash string `json:"fileHash"`
FileSize int64 `json:"fileSize,string"`
RepCount int `json:"repCount"`
UserID int64 `json:"userID"`
DirName string `json:"dirName"`
}

func NewCreateRepObject(bucketID int64, objectName string, fileSize int64, repCount int, userID int64, nodeIDs []int64, fileHash string, dirName string) CreateRepObject {
return CreateRepObject{
BucketID: bucketID,
ObjectName: objectName,
FileSize: fileSize,
RepCount: repCount,
UserID: userID,
NodeIDs: nodeIDs,
FileHash: fileHash,
}
}

type CreateEcObject struct {
BucketID int64 `json:"bucketID"`
ObjectName string `json:"objectName"`
NodeIDs []int64 `json:"nodeIDs"`
Hashes []string `json:"hashes"`
FileSize int64 `json:"fileSize,string"`
UserID int64 `json:"userID"`
EcName string `json:"ecName"`
DirName string `json:"dirName"`
}

func NewCreateEcObject(bucketID int64, objectName string, fileSize int64, userID int64, nodeIDs []int64, hashes []string, ecName string, dirName string) CreateEcObject {
return CreateEcObject{
BucketID: bucketID,
ObjectName: objectName,
FileSize: fileSize,
UserID: userID,
NodeIDs: nodeIDs,
Hashes: hashes,
EcName: ecName,
DirName: dirName,
}
}

// 协调端发给客户端,告知哈希写入结果
type CreateObjectResp struct {
ObjectID int64 `json:"objectID"`
}

func NewCreateObjectResp(objectID int64) CreateObjectResp {
return CreateObjectResp{
ObjectID: objectID,
}
}

// PreUpdateRepObject 更新Rep对象
type PreUpdateRepObject struct {
ObjectID int64 `json:"objectID"`
FileSize int64 `json:"fileSize,string"`
UserID int64 `json:"userID"`
ClientExternalIP string `json:"clientExternalIP"`
}

func NewPreUpdateRepObject(objectID int64, fileSize int64, userID int64, clientExternalIP string) PreUpdateRepObject {
return PreUpdateRepObject{
ObjectID: objectID,
FileSize: fileSize,
UserID: userID,
ClientExternalIP: clientExternalIP,
}
}

type PreUpdateRepObjectResp struct {
Nodes []PreUpdateRepObjectRespNode `json:"nodes"`
}
type PreUpdateRepObjectRespNode struct {
ID int64 `json:"id"`
ExternalIP string `json:"externalIP"`
LocalIP string `json:"localIP"`
IsSameLocation bool `json:"isSameLocation"` // 客户端是否与此节点在同一个地域
HasOldObject bool `json:"hasOldObject"` // 节点是否存有旧的对象文件
}

func NewPreUpdateRepObjectRespNode(id int64, exterIP string, localIP string, isSameLocation bool, hasOldObject bool) PreUpdateRepObjectRespNode {
return PreUpdateRepObjectRespNode{
ID: id,
ExternalIP: exterIP,
LocalIP: localIP,
IsSameLocation: isSameLocation,
HasOldObject: hasOldObject,
}
}

func NewPreUpdateRepObjectResp(nodes []PreUpdateRepObjectRespNode) PreUpdateRepObjectResp {
return PreUpdateRepObjectResp{
Nodes: nodes,
}
}

// UpdateRepObject 更新Rep对象
type UpdateRepObject struct {
ObjectID int64 `json:"objectID"`
FileHash string `json:"fileHash"`
FileSize int64 `json:"fileSize,string"`
NodeIDs []int64 `json:"nodeIDs"`
UserID int64 `json:"userID"`
}

func NewUpdateRepObject(objectID int64, fileHash string, fileSize int64, nodeIDs []int64, userID int64) UpdateRepObject {
return UpdateRepObject{
ObjectID: objectID,
FileHash: fileHash,
FileSize: fileSize,
NodeIDs: nodeIDs,
UserID: userID,
}
}

type UpdateRepObjectResp struct{}

func NewUpdateRepObjectResp() UpdateRepObjectResp {
return UpdateRepObjectResp{}
}

// DeleteObjectBody 删除对象
type DeleteObject struct {
UserID int64 `db:"userID"`
ObjectID int64 `db:"objectID"`
}

func NewDeleteObject(userID int64, objectID int64) DeleteObject {
return DeleteObject{
UserID: userID,
ObjectID: objectID,
}
}

type DeleteObjectResp struct{}

func NewDeleteObjectResp() DeleteObjectResp {
return DeleteObjectResp{}
}

func init() {
mq.RegisterMessage[GetObjectsByDirName]()
mq.RegisterMessage[GetObjectsResp]()

mq.RegisterMessage[PreDownloadObject]()
mq.RegisterMessage[PreDownloadObjectResp]()

mq.RegisterMessage[PreUploadRepObject]()
mq.RegisterMessage[PreUploadResp]()

mq.RegisterMessage[PreUploadEcObject]()
mq.RegisterMessage[PreUploadEcResp]()

mq.RegisterMessage[CreateRepObject]()
mq.RegisterMessage[CreateEcObject]()
mq.RegisterMessage[CreateObjectResp]()

mq.RegisterMessage[PreUpdateRepObject]()
mq.RegisterMessage[PreUpdateRepObjectResp]()
mq.RegisterMessage[UpdateRepObject]()
mq.RegisterMessage[UpdateRepObjectResp]()

mq.RegisterMessage[DeleteObject]()
mq.RegisterMessage[DeleteObjectResp]()
}

+ 100
- 0
pkgs/mq/message/coordinator/storage.go View File

@@ -0,0 +1,100 @@
package coordinator

import (
"gitlink.org.cn/cloudream/common/models"
"gitlink.org.cn/cloudream/common/pkg/mq"
"gitlink.org.cn/cloudream/storage-common/pkgs/db/model"
)

type GetStorageInfo struct {
UserID int64 `json:"userID"`
StorageID int64 `json:"storageID"`
}

func NewGetStorageInfo(userID int64, storageID int64) GetStorageInfo {
return GetStorageInfo{
UserID: userID,
StorageID: storageID,
}
}

type GetStorageInfoResp struct {
model.Storage
}

func NewGetStorageInfoResp(storageID int64, name string, nodeID int64, dir string, state string) GetStorageInfoResp {
return GetStorageInfoResp{
model.Storage{
StorageID: storageID,
Name: name,
NodeID: nodeID,
Directory: dir,
State: state,
},
}
}

// 客户端发给协调端,告知要调度数据
type PreMoveObjectToStorage struct {
ObjectID int64 `json:"objectID"`
StorageID int64 `json:"storageID"`
UserID int64 `json:"userID"`
}

func NewPreMoveObjectToStorage(objectID int64, stgID int64, userID int64) PreMoveObjectToStorage {
return PreMoveObjectToStorage{
ObjectID: objectID,
StorageID: stgID,
UserID: userID,
}
}

// 协调端发给客户端,告知要调度数据的详情
type PreMoveObjectToStorageResp struct {
NodeID int64 `json:"nodeID"`
Directory string `json:"directory"`
Object model.Object `json:"object"`
Redundancy models.RedundancyDataTypes `json:"redundancy"`
}

func NewPreMoveObjectToStorageRespBody[T models.RedundancyDataTypes](nodeID int64, dir string, object model.Object, redundancy T) PreMoveObjectToStorageResp {
return PreMoveObjectToStorageResp{
NodeID: nodeID,
Directory: dir,
Object: object,
Redundancy: redundancy,
}
}

// 调度完成
type MoveObjectToStorage struct {
ObjectID int64 `json:"objectID"`
StorageID int64 `json:"storageID"`
UserID int64 `json:"userID"`
}

func NewMoveObjectToStorage(objectID int64, stgID int64, userID int64) MoveObjectToStorage {
return MoveObjectToStorage{
ObjectID: objectID,
StorageID: stgID,
UserID: userID,
}
}

// 协调端发给客户端,告知要调度数据的详情
type MoveObjectToStorageResp struct{}

func NewMoveObjectToStorageResp() MoveObjectToStorageResp {
return MoveObjectToStorageResp{}
}

func init() {
mq.RegisterMessage[GetStorageInfo]()
mq.RegisterMessage[GetStorageInfoResp]()

mq.RegisterMessage[PreMoveObjectToStorage]()
mq.RegisterMessage[PreMoveObjectToStorageResp]()

mq.RegisterMessage[MoveObjectToStorage]()
mq.RegisterMessage[MoveObjectToStorageResp]()
}

+ 112
- 0
pkgs/mq/message/publics.go View File

@@ -0,0 +1,112 @@
package message

import (
"gitlink.org.cn/cloudream/common/models"
"gitlink.org.cn/cloudream/common/pkg/mq"
myreflect "gitlink.org.cn/cloudream/common/utils/reflect"
)

type Node struct {
ID int64 `json:"id"`
ExternalIP string `json:"externalIP"`
LocalIP string `json:"localIP"`
}

func NewNode(id int64, externalIP string, localIP string) Node {
return Node{
ID: id,
ExternalIP: externalIP,
LocalIP: localIP,
}
}

type RespNode struct {
Node
IsSameLocation bool `json:"isSameLocation"` // 客户端是否与此节点在同一个地域
}

func NewRespNode(id int64, externalIP string, localIP string, isSameLocation bool) RespNode {
return RespNode{
Node: Node{
ID: id,
ExternalIP: externalIP,
LocalIP: localIP,
},
IsSameLocation: isSameLocation,
}
}

// Resp开头的RedundancyData与RedundancyData的区别在于,多了Nodes等字段。需要一个更好的名称。
type RespRedundancyDataTypesConst interface {
RespRepRedundancyData | RespEcRedundancyData
}

type RespRedundancyDataTypes interface{}

type RespRepRedundancyData struct {
models.RepRedundancyData
Nodes []RespNode `json:"nodes"`
}

func NewRespRepRedundancyData(fileHash string, nodes []RespNode) RespRepRedundancyData {
return RespRepRedundancyData{
RepRedundancyData: models.RepRedundancyData{
FileHash: fileHash,
},
Nodes: nodes,
}
}

type RespEcRedundancyData struct {
Ec Ec `json:"ec"`
Nodes [][]RespNode `json:"nodes"`
Blocks []RespObjectBlock `json:"blocks"`
}

func NewRespEcRedundancyData(ec Ec, blocks []RespObjectBlock, nodes [][]RespNode) RespEcRedundancyData {
return RespEcRedundancyData{
Ec: ec,
Nodes: nodes,
Blocks: blocks,
}
}

type RespObjectBlock struct {
models.ObjectBlock
//Node RespNode `json:"node"`
}

// func NewRespObjectBlock(index int, fileHash string, node RespNode) RespObjectBlock {
func NewRespObjectBlock(index int, fileHash string) RespObjectBlock {
return RespObjectBlock{
ObjectBlock: models.ObjectBlock{
Index: index,
FileHash: fileHash,
},
//Node: node,
}
}

type Ec struct {
ID int `json:"id"`
Name string `json:"name"`
EcK int `json:"ecK"`
EcN int `json:"ecN"`
}

func NewEc(id int, name string, k int, n int) Ec {
return Ec{
ID: id,
Name: name,
EcK: k,
EcN: n,
}
}

func init() {
mq.RegisterTypeSet[models.RedundancyConfigTypes](myreflect.TypeOf[models.RepRedundancyConfig](), myreflect.TypeOf[models.ECRedundancyConfig]())

mq.RegisterTypeSet[models.RedundancyDataTypes](myreflect.TypeOf[models.RepRedundancyData](), myreflect.TypeOf[models.ECRedundancyData]())

mq.RegisterTypeSet[RespRedundancyDataTypes](myreflect.TypeOf[RespRepRedundancyData](), myreflect.TypeOf[RespEcRedundancyData]())
}

+ 31
- 0
pkgs/mq/message/scanner/event.go View File

@@ -0,0 +1,31 @@
package scanner

import (
"fmt"

"gitlink.org.cn/cloudream/common/pkg/mq"
scevt "gitlink.org.cn/cloudream/storage-common/pkgs/mq/message/scanner/event"
)

type PostEvent struct {
Event map[string]any `json:"event"`
IsEmergency bool `json:"isEmergency"` // 重要消息,优先处理
DontMerge bool `json:"dontMerge"` // 不可合并此消息
}

func NewPostEvent(event any, isEmergency bool, dontMerge bool) (PostEvent, error) {
mp, err := scevt.MessageToMap(event)
if err != nil {
return PostEvent{}, fmt.Errorf("message to map failed, err: %w", err)
}

return PostEvent{
Event: mp,
IsEmergency: isEmergency,
DontMerge: dontMerge,
}, nil
}

func init() {
mq.RegisterMessage[PostEvent]()
}

+ 17
- 0
pkgs/mq/message/scanner/event/agent_check_cache.go View File

@@ -0,0 +1,17 @@
package event

type AgentCheckCache struct {
NodeID int64 `json:"nodeID"`
FileHashes []string `json:"fileHashes"` // 需要检查的FileHash列表,如果为nil(不是为空),则代表进行全量检查
}

func NewAgentCheckCache(nodeID int64, fileHashes []string) AgentCheckCache {
return AgentCheckCache{
NodeID: nodeID,
FileHashes: fileHashes,
}
}

func init() {
Register[AgentCheckCache]()
}

+ 15
- 0
pkgs/mq/message/scanner/event/agent_check_state.go View File

@@ -0,0 +1,15 @@
package event

type AgentCheckState struct {
NodeID int64 `json:"nodeID"`
}

func NewAgentCheckState(nodeID int64) AgentCheckState {
return AgentCheckState{
NodeID: nodeID,
}
}

func init() {
Register[AgentCheckState]()
}

+ 17
- 0
pkgs/mq/message/scanner/event/agent_check_storage.go View File

@@ -0,0 +1,17 @@
package event

type AgentCheckStorage struct {
StorageID int64 `json:"storageID"`
ObjectIDs []int64 `json:"objectIDs"` // 需要检查的Object文件列表,如果为nil(不是为空),则代表进行全量检查
}

func NewAgentCheckStorage(storageID int64, objectIDs []int64) AgentCheckStorage {
return AgentCheckStorage{
StorageID: storageID,
ObjectIDs: objectIDs,
}
}

func init() {
Register[AgentCheckStorage]()
}

+ 15
- 0
pkgs/mq/message/scanner/event/check_cache.go View File

@@ -0,0 +1,15 @@
package event

type CheckCache struct {
NodeID int64 `json:"nodeID"`
}

func NewCheckCache(nodeID int64) CheckCache {
return CheckCache{
NodeID: nodeID,
}
}

func init() {
Register[CheckCache]()
}

+ 15
- 0
pkgs/mq/message/scanner/event/check_object.go View File

@@ -0,0 +1,15 @@
package event

type CheckObject struct {
ObjectIDs []int64 `json:"objectIDs"`
}

func NewCheckObject(objectIDs []int64) CheckObject {
return CheckObject{
ObjectIDs: objectIDs,
}
}

func init() {
Register[CheckObject]()
}

+ 15
- 0
pkgs/mq/message/scanner/event/check_rep_count.go View File

@@ -0,0 +1,15 @@
package event

type CheckRepCount struct {
FileHashes []string `json:"fileHashes"`
}

func NewCheckRepCount(fileHashes []string) CheckRepCount {
return CheckRepCount{
FileHashes: fileHashes,
}
}

func init() {
Register[CheckRepCount]()
}

+ 25
- 0
pkgs/mq/message/scanner/event/event.go View File

@@ -0,0 +1,25 @@
package event

import (
myreflect "gitlink.org.cn/cloudream/common/utils/reflect"
"gitlink.org.cn/cloudream/common/utils/serder"
)

var typeResolver = serder.NewTypeNameResolver(true)

var serderOption = serder.TypedSerderOption{
TypeResolver: &typeResolver,
TypeFieldName: "@type",
}

func MapToMessage(m map[string]any) (any, error) {
return serder.TypedMapToObject(m, serderOption)
}

func MessageToMap(msg any) (map[string]any, error) {
return serder.ObjectToTypedMap(msg, serderOption)
}

func Register[T any]() {
typeResolver.Register(myreflect.TypeOf[T]())
}

+ 14
- 0
pkgs/mq/server/agent/agent.go View File

@@ -0,0 +1,14 @@
package agent

import (
"gitlink.org.cn/cloudream/common/pkg/mq"
agtmsg "gitlink.org.cn/cloudream/storage-common/pkgs/mq/message/agent"
)

type AgentService interface {
GetState(msg *agtmsg.GetState) (*agtmsg.GetStateResp, *mq.CodeMessage)
}

func init() {
Register(AgentService.GetState)
}

+ 14
- 0
pkgs/mq/server/agent/ipfs.go View File

@@ -0,0 +1,14 @@
package agent

import (
"gitlink.org.cn/cloudream/common/pkg/mq"
agtmsg "gitlink.org.cn/cloudream/storage-common/pkgs/mq/message/agent"
)

type IPFSService interface {
CheckIPFS(msg *agtmsg.CheckIPFS) (*agtmsg.CheckIPFSResp, *mq.CodeMessage)
}

func init() {
Register(IPFSService.CheckIPFS)
}

+ 16
- 0
pkgs/mq/server/agent/object.go View File

@@ -0,0 +1,16 @@
package agent

import (
"gitlink.org.cn/cloudream/common/pkg/mq"
agtmsg "gitlink.org.cn/cloudream/storage-common/pkgs/mq/message/agent"
)

type ObjectService interface {
StartPinningObject(msg *agtmsg.StartPinningObject) (*agtmsg.StartPinningObjectResp, *mq.CodeMessage)
WaitPinningObject(msg *agtmsg.WaitPinningObject) (*agtmsg.WaitPinningObjectResp, *mq.CodeMessage)
}

func init() {
Register(ObjectService.StartPinningObject)
Register(ObjectService.WaitPinningObject)
}

+ 67
- 0
pkgs/mq/server/agent/server.go View File

@@ -0,0 +1,67 @@
package agent

import (
"gitlink.org.cn/cloudream/common/pkg/mq"
mymq "gitlink.org.cn/cloudream/storage-common/pkgs/mq"
"gitlink.org.cn/cloudream/storage-common/pkgs/mq/config"
)

type Service interface {
ObjectService

StorageService

IPFSService

AgentService
}

type Server struct {
service Service
rabbitSvr mq.RabbitMQServer

OnError func(err error)
}

func NewServer(svc Service, id int64, cfg *config.Config) (*Server, error) {
srv := &Server{
service: svc,
}

rabbitSvr, err := mq.NewRabbitMQServer(
cfg.MakeConnectingURL(),
mymq.MakeAgentQueueName(id),
func(msg *mq.Message) (*mq.Message, error) {
return msgDispatcher.Handle(srv.service, msg)
},
)
if err != nil {
return nil, err
}

srv.rabbitSvr = *rabbitSvr

return srv, nil
}

func (s *Server) Stop() {
s.rabbitSvr.Close()
}

func (s *Server) Serve() error {
return s.rabbitSvr.Serve()
}

var msgDispatcher mq.MessageDispatcher = mq.NewMessageDispatcher()

// Register 将Service中的一个接口函数作为指定类型消息的处理函数
// TODO 需要约束:Service实现了TSvc接口
func Register[TSvc any, TReq any, TResp any](svcFn func(svc TSvc, msg *TReq) (*TResp, *mq.CodeMessage)) {
mq.AddServiceFn(&msgDispatcher, svcFn)
}

// RegisterNoReply 将Service中的一个*没有返回值的*接口函数作为指定类型消息的处理函数
// TODO 需要约束:Service实现了TSvc接口
func RegisterNoReply[TSvc any, TReq any](svcFn func(svc TSvc, msg *TReq)) {
mq.AddNoRespServiceFn(&msgDispatcher, svcFn)
}

+ 30
- 0
pkgs/mq/server/agent/storage.go View File

@@ -0,0 +1,30 @@
package agent

import (
"gitlink.org.cn/cloudream/common/pkg/mq"
agtmsg "gitlink.org.cn/cloudream/storage-common/pkgs/mq/message/agent"
)

type StorageService interface {
StartStorageMoveObject(msg *agtmsg.StartStorageMoveObject) (*agtmsg.StartStorageMoveObjectResp, *mq.CodeMessage)

WaitStorageMoveObject(msg *agtmsg.WaitStorageMoveObject) (*agtmsg.WaitStorageMoveObjectResp, *mq.CodeMessage)

StorageCheck(msg *agtmsg.StorageCheck) (*agtmsg.StorageCheckResp, *mq.CodeMessage)

StartStorageUploadRepObject(msg *agtmsg.StartStorageUploadRepObject) (*agtmsg.StartStorageUploadRepObjectResp, *mq.CodeMessage)

WaitStorageUploadRepObject(msg *agtmsg.WaitStorageUploadRepObject) (*agtmsg.WaitStorageUploadRepObjectResp, *mq.CodeMessage)
}

func init() {
Register(StorageService.StartStorageMoveObject)

Register(StorageService.WaitStorageMoveObject)

Register(StorageService.StorageCheck)

Register(StorageService.StartStorageUploadRepObject)

Register(StorageService.WaitStorageUploadRepObject)
}

+ 15
- 0
pkgs/mq/server/coordinator/agent.go View File

@@ -0,0 +1,15 @@
package coordinator

import coormsg "gitlink.org.cn/cloudream/storage-common/pkgs/mq/message/coordinator"

type AgentService interface {
TempCacheReport(msg *coormsg.TempCacheReport)

AgentStatusReport(msg *coormsg.AgentStatusReport)
}

func init() {
RegisterNoReply(AgentService.TempCacheReport)

RegisterNoReply(AgentService.AgentStatusReport)
}

+ 26
- 0
pkgs/mq/server/coordinator/bucket.go View File

@@ -0,0 +1,26 @@
package coordinator

import (
"gitlink.org.cn/cloudream/common/pkg/mq"
coormsg "gitlink.org.cn/cloudream/storage-common/pkgs/mq/message/coordinator"
)

type BucketService interface {
GetUserBuckets(msg *coormsg.GetUserBuckets) (*coormsg.GetUserBucketsResp, *mq.CodeMessage)

GetBucketObjects(msg *coormsg.GetBucketObjects) (*coormsg.GetBucketObjectsResp, *mq.CodeMessage)

CreateBucket(msg *coormsg.CreateBucket) (*coormsg.CreateBucketResp, *mq.CodeMessage)

DeleteBucket(msg *coormsg.DeleteBucket) (*coormsg.DeleteBucketResp, *mq.CodeMessage)
}

func init() {
Register(BucketService.GetUserBuckets)

Register(BucketService.GetBucketObjects)

Register(BucketService.CreateBucket)

Register(BucketService.DeleteBucket)
}

+ 15
- 0
pkgs/mq/server/coordinator/coordinator_test.go View File

@@ -0,0 +1,15 @@
package coordinator

import (
"testing"

. "github.com/smartystreets/goconvey/convey"
)

func TestSerder(t *testing.T) {
Convey("输出注册的Handler", t, func() {
for k, _ := range msgDispatcher.Handlers {
t.Logf("(%s)", k)
}
})
}

+ 38
- 0
pkgs/mq/server/coordinator/object.go View File

@@ -0,0 +1,38 @@
package coordinator

import (
"gitlink.org.cn/cloudream/common/pkg/mq"
coormsg "gitlink.org.cn/cloudream/storage-common/pkgs/mq/message/coordinator"
)

type ObjectService interface {
GetObjectsByDirName(msg *coormsg.GetObjectsByDirName) (*coormsg.GetObjectsResp, *mq.CodeMessage)
PreDownloadObject(msg *coormsg.PreDownloadObject) (*coormsg.PreDownloadObjectResp, *mq.CodeMessage)

PreUploadRepObject(msg *coormsg.PreUploadRepObject) (*coormsg.PreUploadResp, *mq.CodeMessage)
CreateRepObject(msg *coormsg.CreateRepObject) (*coormsg.CreateObjectResp, *mq.CodeMessage)

PreUpdateRepObject(msg *coormsg.PreUpdateRepObject) (*coormsg.PreUpdateRepObjectResp, *mq.CodeMessage)
UpdateRepObject(msg *coormsg.UpdateRepObject) (*coormsg.UpdateRepObjectResp, *mq.CodeMessage)

PreUploadEcObject(msg *coormsg.PreUploadEcObject) (*coormsg.PreUploadEcResp, *mq.CodeMessage)
CreateEcObject(msg *coormsg.CreateEcObject) (*coormsg.CreateObjectResp, *mq.CodeMessage)

DeleteObject(msg *coormsg.DeleteObject) (*coormsg.DeleteObjectResp, *mq.CodeMessage)
}

func init() {
Register(ObjectService.GetObjectsByDirName)
Register(ObjectService.PreDownloadObject)

Register(ObjectService.PreUploadRepObject)
Register(ObjectService.CreateRepObject)

Register(ObjectService.PreUpdateRepObject)
Register(ObjectService.UpdateRepObject)

Register(ObjectService.PreUploadEcObject)
Register(ObjectService.CreateEcObject)

Register(ObjectService.DeleteObject)
}

+ 67
- 0
pkgs/mq/server/coordinator/server.go View File

@@ -0,0 +1,67 @@
package coordinator

import (
"gitlink.org.cn/cloudream/common/pkg/mq"
mymq "gitlink.org.cn/cloudream/storage-common/pkgs/mq"
"gitlink.org.cn/cloudream/storage-common/pkgs/mq/config"
)

// Service 协调端接口
type Service interface {
ObjectService

BucketService

StorageService

AgentService
}

type Server struct {
service Service
rabbitSvr mq.RabbitMQServer

OnError func(err error)
}

func NewServer(svc Service, cfg *config.Config) (*Server, error) {
srv := &Server{
service: svc,
}

rabbitSvr, err := mq.NewRabbitMQServer(
cfg.MakeConnectingURL(),
mymq.COORDINATOR_QUEUE_NAME,
func(msg *mq.Message) (*mq.Message, error) {
return msgDispatcher.Handle(srv.service, msg)
},
)
if err != nil {
return nil, err
}

srv.rabbitSvr = *rabbitSvr

return srv, nil
}
func (s *Server) Stop() {
s.rabbitSvr.Close()
}

func (s *Server) Serve() error {
return s.rabbitSvr.Serve()
}

var msgDispatcher mq.MessageDispatcher = mq.NewMessageDispatcher()

// Register 将Service中的一个接口函数作为指定类型消息的处理函数
// TODO 需要约束:Service实现了TSvc接口
func Register[TSvc any, TReq any, TResp any](svcFn func(svc TSvc, msg *TReq) (*TResp, *mq.CodeMessage)) {
mq.AddServiceFn(&msgDispatcher, svcFn)
}

// RegisterNoReply 将Service中的一个*没有返回值的*接口函数作为指定类型消息的处理函数
// TODO 需要约束:Service实现了TSvc接口
func RegisterNoReply[TSvc any, TReq any](svcFn func(svc TSvc, msg *TReq)) {
mq.AddNoRespServiceFn(&msgDispatcher, svcFn)
}

+ 18
- 0
pkgs/mq/server/coordinator/storage.go View File

@@ -0,0 +1,18 @@
package coordinator

import (
"gitlink.org.cn/cloudream/common/pkg/mq"
coormsg "gitlink.org.cn/cloudream/storage-common/pkgs/mq/message/coordinator"
)

type StorageService interface {
GetStorageInfo(msg *coormsg.GetStorageInfo) (*coormsg.GetStorageInfoResp, *mq.CodeMessage)
PreMoveObjectToStorage(msg *coormsg.PreMoveObjectToStorage) (*coormsg.PreMoveObjectToStorageResp, *mq.CodeMessage)
MoveObjectToStorage(msg *coormsg.MoveObjectToStorage) (*coormsg.MoveObjectToStorageResp, *mq.CodeMessage)
}

func init() {
Register(StorageService.GetStorageInfo)
Register(StorageService.PreMoveObjectToStorage)
Register(StorageService.MoveObjectToStorage)
}

+ 11
- 0
pkgs/mq/server/scanner/event.go View File

@@ -0,0 +1,11 @@
package scanner

import scmsg "gitlink.org.cn/cloudream/storage-common/pkgs/mq/message/scanner"

type EventService interface {
PostEvent(event *scmsg.PostEvent)
}

func init() {
RegisterNoReply(EventService.PostEvent)
}

+ 61
- 0
pkgs/mq/server/scanner/server.go View File

@@ -0,0 +1,61 @@
package scanner

import (
"gitlink.org.cn/cloudream/common/pkg/mq"
mymq "gitlink.org.cn/cloudream/storage-common/pkgs/mq"
"gitlink.org.cn/cloudream/storage-common/pkgs/mq/config"
)

// Service 协调端接口
type Service interface {
EventService
}
type Server struct {
service Service
rabbitSvr mq.RabbitMQServer

OnError func(err error)
}

func NewServer(svc Service, cfg *config.Config) (*Server, error) {
srv := &Server{
service: svc,
}

rabbitSvr, err := mq.NewRabbitMQServer(
cfg.MakeConnectingURL(),
mymq.SCANNER_QUEUE_NAME,
func(msg *mq.Message) (*mq.Message, error) {
return msgDispatcher.Handle(srv.service, msg)
},
)
if err != nil {
return nil, err
}

srv.rabbitSvr = *rabbitSvr

return srv, nil
}

func (s *Server) Stop() {
s.rabbitSvr.Close()
}

func (s *Server) Serve() error {
return s.rabbitSvr.Serve()
}

var msgDispatcher mq.MessageDispatcher = mq.NewMessageDispatcher()

// Register 将Service中的一个接口函数作为指定类型消息的处理函数
// TODO 需要约束:Service实现了TSvc接口
func Register[TSvc any, TReq any, TResp any](svcFn func(svc TSvc, msg *TReq) (*TResp, *mq.CodeMessage)) {
mq.AddServiceFn(&msgDispatcher, svcFn)
}

// RegisterNoReply 将Service中的一个*没有返回值的*接口函数作为指定类型消息的处理函数
// TODO 需要约束:Service实现了TSvc接口
func RegisterNoReply[TSvc any, TReq any](svcFn func(svc TSvc, msg *TReq)) {
mq.AddNoRespServiceFn(&msgDispatcher, svcFn)
}

+ 2
- 0
pkgs/proto/Makefile View File

@@ -0,0 +1,2 @@
protoc:
protoc --go_out=. --go-grpc_out=. .\file_transport.proto

+ 343
- 0
pkgs/proto/file_transport.pb.go View File

@@ -0,0 +1,343 @@
// 使用的语法版本

// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.30.0
// protoc v4.22.3
// source: file_transport.proto

package proto

import (
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
sync "sync"
)

const (
// Verify that this generated code is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
// Verify that runtime/protoimpl is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)

type FileDataPacketType int32

const (
FileDataPacketType_Data FileDataPacketType = 0
FileDataPacketType_EOF FileDataPacketType = 1
)

// Enum value maps for FileDataPacketType.
var (
FileDataPacketType_name = map[int32]string{
0: "Data",
1: "EOF",
}
FileDataPacketType_value = map[string]int32{
"Data": 0,
"EOF": 1,
}
)

func (x FileDataPacketType) Enum() *FileDataPacketType {
p := new(FileDataPacketType)
*p = x
return p
}

func (x FileDataPacketType) String() string {
return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x))
}

func (FileDataPacketType) Descriptor() protoreflect.EnumDescriptor {
return file_file_transport_proto_enumTypes[0].Descriptor()
}

func (FileDataPacketType) Type() protoreflect.EnumType {
return &file_file_transport_proto_enumTypes[0]
}

func (x FileDataPacketType) Number() protoreflect.EnumNumber {
return protoreflect.EnumNumber(x)
}

// Deprecated: Use FileDataPacketType.Descriptor instead.
func (FileDataPacketType) EnumDescriptor() ([]byte, []int) {
return file_file_transport_proto_rawDescGZIP(), []int{0}
}

// 文件数据。注意:只在Type为Data的时候,Data字段才能有数据
type FileDataPacket struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields

Type FileDataPacketType `protobuf:"varint,1,opt,name=Type,proto3,enum=FileDataPacketType" json:"Type,omitempty"`
Data []byte `protobuf:"bytes,2,opt,name=Data,proto3" json:"Data,omitempty"`
}

func (x *FileDataPacket) Reset() {
*x = FileDataPacket{}
if protoimpl.UnsafeEnabled {
mi := &file_file_transport_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}

func (x *FileDataPacket) String() string {
return protoimpl.X.MessageStringOf(x)
}

func (*FileDataPacket) ProtoMessage() {}

func (x *FileDataPacket) ProtoReflect() protoreflect.Message {
mi := &file_file_transport_proto_msgTypes[0]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}

// Deprecated: Use FileDataPacket.ProtoReflect.Descriptor instead.
func (*FileDataPacket) Descriptor() ([]byte, []int) {
return file_file_transport_proto_rawDescGZIP(), []int{0}
}

func (x *FileDataPacket) GetType() FileDataPacketType {
if x != nil {
return x.Type
}
return FileDataPacketType_Data
}

func (x *FileDataPacket) GetData() []byte {
if x != nil {
return x.Data
}
return nil
}

type SendResp struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields

FileHash string `protobuf:"bytes,1,opt,name=FileHash,proto3" json:"FileHash,omitempty"`
}

func (x *SendResp) Reset() {
*x = SendResp{}
if protoimpl.UnsafeEnabled {
mi := &file_file_transport_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}

func (x *SendResp) String() string {
return protoimpl.X.MessageStringOf(x)
}

func (*SendResp) ProtoMessage() {}

func (x *SendResp) ProtoReflect() protoreflect.Message {
mi := &file_file_transport_proto_msgTypes[1]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}

// Deprecated: Use SendResp.ProtoReflect.Descriptor instead.
func (*SendResp) Descriptor() ([]byte, []int) {
return file_file_transport_proto_rawDescGZIP(), []int{1}
}

func (x *SendResp) GetFileHash() string {
if x != nil {
return x.FileHash
}
return ""
}

type GetReq struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields

FileHash string `protobuf:"bytes,1,opt,name=FileHash,proto3" json:"FileHash,omitempty"`
}

func (x *GetReq) Reset() {
*x = GetReq{}
if protoimpl.UnsafeEnabled {
mi := &file_file_transport_proto_msgTypes[2]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}

func (x *GetReq) String() string {
return protoimpl.X.MessageStringOf(x)
}

func (*GetReq) ProtoMessage() {}

func (x *GetReq) ProtoReflect() protoreflect.Message {
mi := &file_file_transport_proto_msgTypes[2]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}

// Deprecated: Use GetReq.ProtoReflect.Descriptor instead.
func (*GetReq) Descriptor() ([]byte, []int) {
return file_file_transport_proto_rawDescGZIP(), []int{2}
}

func (x *GetReq) GetFileHash() string {
if x != nil {
return x.FileHash
}
return ""
}

var File_file_transport_proto protoreflect.FileDescriptor

var file_file_transport_proto_rawDesc = []byte{
0x0a, 0x14, 0x66, 0x69, 0x6c, 0x65, 0x5f, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x70, 0x6f, 0x72, 0x74,
0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x4d, 0x0a, 0x0e, 0x46, 0x69, 0x6c, 0x65, 0x44, 0x61,
0x74, 0x61, 0x50, 0x61, 0x63, 0x6b, 0x65, 0x74, 0x12, 0x27, 0x0a, 0x04, 0x54, 0x79, 0x70, 0x65,
0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x13, 0x2e, 0x46, 0x69, 0x6c, 0x65, 0x44, 0x61, 0x74,
0x61, 0x50, 0x61, 0x63, 0x6b, 0x65, 0x74, 0x54, 0x79, 0x70, 0x65, 0x52, 0x04, 0x54, 0x79, 0x70,
0x65, 0x12, 0x12, 0x0a, 0x04, 0x44, 0x61, 0x74, 0x61, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52,
0x04, 0x44, 0x61, 0x74, 0x61, 0x22, 0x26, 0x0a, 0x08, 0x53, 0x65, 0x6e, 0x64, 0x52, 0x65, 0x73,
0x70, 0x12, 0x1a, 0x0a, 0x08, 0x46, 0x69, 0x6c, 0x65, 0x48, 0x61, 0x73, 0x68, 0x18, 0x01, 0x20,
0x01, 0x28, 0x09, 0x52, 0x08, 0x46, 0x69, 0x6c, 0x65, 0x48, 0x61, 0x73, 0x68, 0x22, 0x24, 0x0a,
0x06, 0x47, 0x65, 0x74, 0x52, 0x65, 0x71, 0x12, 0x1a, 0x0a, 0x08, 0x46, 0x69, 0x6c, 0x65, 0x48,
0x61, 0x73, 0x68, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x46, 0x69, 0x6c, 0x65, 0x48,
0x61, 0x73, 0x68, 0x2a, 0x27, 0x0a, 0x12, 0x46, 0x69, 0x6c, 0x65, 0x44, 0x61, 0x74, 0x61, 0x50,
0x61, 0x63, 0x6b, 0x65, 0x74, 0x54, 0x79, 0x70, 0x65, 0x12, 0x08, 0x0a, 0x04, 0x44, 0x61, 0x74,
0x61, 0x10, 0x00, 0x12, 0x07, 0x0a, 0x03, 0x45, 0x4f, 0x46, 0x10, 0x01, 0x32, 0x64, 0x0a, 0x0d,
0x46, 0x69, 0x6c, 0x65, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x70, 0x6f, 0x72, 0x74, 0x12, 0x2a, 0x0a,
0x08, 0x53, 0x65, 0x6e, 0x64, 0x46, 0x69, 0x6c, 0x65, 0x12, 0x0f, 0x2e, 0x46, 0x69, 0x6c, 0x65,
0x44, 0x61, 0x74, 0x61, 0x50, 0x61, 0x63, 0x6b, 0x65, 0x74, 0x1a, 0x09, 0x2e, 0x53, 0x65, 0x6e,
0x64, 0x52, 0x65, 0x73, 0x70, 0x22, 0x00, 0x28, 0x01, 0x12, 0x27, 0x0a, 0x07, 0x47, 0x65, 0x74,
0x46, 0x69, 0x6c, 0x65, 0x12, 0x07, 0x2e, 0x47, 0x65, 0x74, 0x52, 0x65, 0x71, 0x1a, 0x0f, 0x2e,
0x46, 0x69, 0x6c, 0x65, 0x44, 0x61, 0x74, 0x61, 0x50, 0x61, 0x63, 0x6b, 0x65, 0x74, 0x22, 0x00,
0x30, 0x01, 0x42, 0x10, 0x5a, 0x0e, 0x2e, 0x2e, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x3b, 0x70,
0x72, 0x6f, 0x74, 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}

var (
file_file_transport_proto_rawDescOnce sync.Once
file_file_transport_proto_rawDescData = file_file_transport_proto_rawDesc
)

func file_file_transport_proto_rawDescGZIP() []byte {
file_file_transport_proto_rawDescOnce.Do(func() {
file_file_transport_proto_rawDescData = protoimpl.X.CompressGZIP(file_file_transport_proto_rawDescData)
})
return file_file_transport_proto_rawDescData
}

var file_file_transport_proto_enumTypes = make([]protoimpl.EnumInfo, 1)
var file_file_transport_proto_msgTypes = make([]protoimpl.MessageInfo, 3)
var file_file_transport_proto_goTypes = []interface{}{
(FileDataPacketType)(0), // 0: FileDataPacketType
(*FileDataPacket)(nil), // 1: FileDataPacket
(*SendResp)(nil), // 2: SendResp
(*GetReq)(nil), // 3: GetReq
}
var file_file_transport_proto_depIdxs = []int32{
0, // 0: FileDataPacket.Type:type_name -> FileDataPacketType
1, // 1: FileTransport.SendFile:input_type -> FileDataPacket
3, // 2: FileTransport.GetFile:input_type -> GetReq
2, // 3: FileTransport.SendFile:output_type -> SendResp
1, // 4: FileTransport.GetFile:output_type -> FileDataPacket
3, // [3:5] is the sub-list for method output_type
1, // [1:3] is the sub-list for method input_type
1, // [1:1] is the sub-list for extension type_name
1, // [1:1] is the sub-list for extension extendee
0, // [0:1] is the sub-list for field type_name
}

func init() { file_file_transport_proto_init() }
func file_file_transport_proto_init() {
if File_file_transport_proto != nil {
return
}
if !protoimpl.UnsafeEnabled {
file_file_transport_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*FileDataPacket); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_file_transport_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*SendResp); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_file_transport_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*GetReq); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
}
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_file_transport_proto_rawDesc,
NumEnums: 1,
NumMessages: 3,
NumExtensions: 0,
NumServices: 1,
},
GoTypes: file_file_transport_proto_goTypes,
DependencyIndexes: file_file_transport_proto_depIdxs,
EnumInfos: file_file_transport_proto_enumTypes,
MessageInfos: file_file_transport_proto_msgTypes,
}.Build()
File_file_transport_proto = out.File
file_file_transport_proto_rawDesc = nil
file_file_transport_proto_goTypes = nil
file_file_transport_proto_depIdxs = nil
}

+ 30
- 0
pkgs/proto/file_transport.proto View File

@@ -0,0 +1,30 @@
// 使用的语法版本
syntax = "proto3";

// 生成的go文件包
option go_package = "../proto;proto";//grpc这里生效了


enum FileDataPacketType {
Data = 0;
EOF = 1;
}
// 文件数据。注意:只在Type为Data的时候,Data字段才能有数据
message FileDataPacket {
FileDataPacketType Type = 1;
bytes Data = 2;
}

message SendResp {
string FileHash = 1;
}

message GetReq {
string FileHash = 1;
}

service FileTransport {
rpc SendFile(stream FileDataPacket)returns(SendResp){}
rpc GetFile(GetReq)returns(stream FileDataPacket){}
}


+ 209
- 0
pkgs/proto/file_transport_grpc.pb.go View File

@@ -0,0 +1,209 @@
// 使用的语法版本

// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.3.0
// - protoc v4.22.3
// source: file_transport.proto

package proto

import (
context "context"
grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
)

// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
// Requires gRPC-Go v1.32.0 or later.
const _ = grpc.SupportPackageIsVersion7

const (
FileTransport_SendFile_FullMethodName = "/FileTransport/SendFile"
FileTransport_GetFile_FullMethodName = "/FileTransport/GetFile"
)

// FileTransportClient is the client API for FileTransport service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
type FileTransportClient interface {
SendFile(ctx context.Context, opts ...grpc.CallOption) (FileTransport_SendFileClient, error)
GetFile(ctx context.Context, in *GetReq, opts ...grpc.CallOption) (FileTransport_GetFileClient, error)
}

type fileTransportClient struct {
cc grpc.ClientConnInterface
}

func NewFileTransportClient(cc grpc.ClientConnInterface) FileTransportClient {
return &fileTransportClient{cc}
}

func (c *fileTransportClient) SendFile(ctx context.Context, opts ...grpc.CallOption) (FileTransport_SendFileClient, error) {
stream, err := c.cc.NewStream(ctx, &FileTransport_ServiceDesc.Streams[0], FileTransport_SendFile_FullMethodName, opts...)
if err != nil {
return nil, err
}
x := &fileTransportSendFileClient{stream}
return x, nil
}

type FileTransport_SendFileClient interface {
Send(*FileDataPacket) error
CloseAndRecv() (*SendResp, error)
grpc.ClientStream
}

type fileTransportSendFileClient struct {
grpc.ClientStream
}

func (x *fileTransportSendFileClient) Send(m *FileDataPacket) error {
return x.ClientStream.SendMsg(m)
}

func (x *fileTransportSendFileClient) CloseAndRecv() (*SendResp, error) {
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
m := new(SendResp)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}

func (c *fileTransportClient) GetFile(ctx context.Context, in *GetReq, opts ...grpc.CallOption) (FileTransport_GetFileClient, error) {
stream, err := c.cc.NewStream(ctx, &FileTransport_ServiceDesc.Streams[1], FileTransport_GetFile_FullMethodName, opts...)
if err != nil {
return nil, err
}
x := &fileTransportGetFileClient{stream}
if err := x.ClientStream.SendMsg(in); err != nil {
return nil, err
}
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
return x, nil
}

type FileTransport_GetFileClient interface {
Recv() (*FileDataPacket, error)
grpc.ClientStream
}

type fileTransportGetFileClient struct {
grpc.ClientStream
}

func (x *fileTransportGetFileClient) Recv() (*FileDataPacket, error) {
m := new(FileDataPacket)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}

// FileTransportServer is the server API for FileTransport service.
// All implementations must embed UnimplementedFileTransportServer
// for forward compatibility
type FileTransportServer interface {
SendFile(FileTransport_SendFileServer) error
GetFile(*GetReq, FileTransport_GetFileServer) error
mustEmbedUnimplementedFileTransportServer()
}

// UnimplementedFileTransportServer must be embedded to have forward compatible implementations.
type UnimplementedFileTransportServer struct {
}

func (UnimplementedFileTransportServer) SendFile(FileTransport_SendFileServer) error {
return status.Errorf(codes.Unimplemented, "method SendFile not implemented")
}
func (UnimplementedFileTransportServer) GetFile(*GetReq, FileTransport_GetFileServer) error {
return status.Errorf(codes.Unimplemented, "method GetFile not implemented")
}
func (UnimplementedFileTransportServer) mustEmbedUnimplementedFileTransportServer() {}

// UnsafeFileTransportServer may be embedded to opt out of forward compatibility for this service.
// Use of this interface is not recommended, as added methods to FileTransportServer will
// result in compilation errors.
type UnsafeFileTransportServer interface {
mustEmbedUnimplementedFileTransportServer()
}

func RegisterFileTransportServer(s grpc.ServiceRegistrar, srv FileTransportServer) {
s.RegisterService(&FileTransport_ServiceDesc, srv)
}

func _FileTransport_SendFile_Handler(srv interface{}, stream grpc.ServerStream) error {
return srv.(FileTransportServer).SendFile(&fileTransportSendFileServer{stream})
}

type FileTransport_SendFileServer interface {
SendAndClose(*SendResp) error
Recv() (*FileDataPacket, error)
grpc.ServerStream
}

type fileTransportSendFileServer struct {
grpc.ServerStream
}

func (x *fileTransportSendFileServer) SendAndClose(m *SendResp) error {
return x.ServerStream.SendMsg(m)
}

func (x *fileTransportSendFileServer) Recv() (*FileDataPacket, error) {
m := new(FileDataPacket)
if err := x.ServerStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}

func _FileTransport_GetFile_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(GetReq)
if err := stream.RecvMsg(m); err != nil {
return err
}
return srv.(FileTransportServer).GetFile(m, &fileTransportGetFileServer{stream})
}

type FileTransport_GetFileServer interface {
Send(*FileDataPacket) error
grpc.ServerStream
}

type fileTransportGetFileServer struct {
grpc.ServerStream
}

func (x *fileTransportGetFileServer) Send(m *FileDataPacket) error {
return x.ServerStream.SendMsg(m)
}

// FileTransport_ServiceDesc is the grpc.ServiceDesc for FileTransport service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
var FileTransport_ServiceDesc = grpc.ServiceDesc{
ServiceName: "FileTransport",
HandlerType: (*FileTransportServer)(nil),
Methods: []grpc.MethodDesc{},
Streams: []grpc.StreamDesc{
{
StreamName: "SendFile",
Handler: _FileTransport_SendFile_Handler,
ClientStreams: true,
},
{
StreamName: "GetFile",
Handler: _FileTransport_GetFile_Handler,
ServerStreams: true,
},
},
Metadata: "file_transport.proto",
}

+ 120
- 0
utils/grpc/file_transport.go View File

@@ -0,0 +1,120 @@
package grpc

import (
"context"
"fmt"
"io"

myio "gitlink.org.cn/cloudream/common/utils/io"
"gitlink.org.cn/cloudream/storage-common/pkgs/proto"
)

type fileReadCloser struct {
io.ReadCloser
stream proto.FileTransport_GetFileClient
cancelFn context.CancelFunc
readData []byte
}

func (s *fileReadCloser) Read(p []byte) (int, error) {

if s.readData == nil {
resp, err := s.stream.Recv()
if err != nil {
return 0, err
}

if resp.Type == proto.FileDataPacketType_Data {
s.readData = resp.Data

} else if resp.Type == proto.FileDataPacketType_EOF {
return 0, io.EOF

} else {
return 0, fmt.Errorf("unsuppoted packt type: %v", resp.Type)
}
}

cnt := copy(p, s.readData)

if len(s.readData) == cnt {
s.readData = nil
} else {
s.readData = s.readData[cnt:]
}

return cnt, nil
}

func (s *fileReadCloser) Close() error {
s.cancelFn()

return nil
}

func GetFileAsStream(client proto.FileTransportClient, fileHash string) (io.ReadCloser, error) {
ctx, cancel := context.WithCancel(context.Background())

stream, err := client.GetFile(ctx, &proto.GetReq{
FileHash: fileHash,
})
if err != nil {
cancel()
return nil, fmt.Errorf("request grpc failed, err: %w", err)
}

return &fileReadCloser{
stream: stream,
cancelFn: cancel,
}, nil
}

type fileWriteCloser struct {
myio.PromiseWriteCloser[string]
stream proto.FileTransport_SendFileClient
}

func (s *fileWriteCloser) Write(p []byte) (int, error) {
err := s.stream.Send(&proto.FileDataPacket{
Type: proto.FileDataPacketType_Data,
Data: p,
})

if err != nil {
return 0, err
}

return len(p), nil
}

func (s *fileWriteCloser) Abort(err error) {
s.stream.CloseSend()
}

func (s *fileWriteCloser) Finish() (string, error) {
err := s.stream.Send(&proto.FileDataPacket{
Type: proto.FileDataPacketType_EOF,
})

if err != nil {
return "", fmt.Errorf("send EOF packet failed, err: %w", err)
}

resp, err := s.stream.CloseAndRecv()
if err != nil {
return "", fmt.Errorf("receive response failed, err: %w", err)
}

return resp.FileHash, nil
}

func SendFileAsStream(client proto.FileTransportClient) (myio.PromiseWriteCloser[string], error) {
stream, err := client.SendFile(context.Background())
if err != nil {
return nil, err
}

return &fileWriteCloser{
stream: stream,
}, nil
}

Loading…
Cancel
Save