diff --git a/coordinator/README.md b/coordinator/README.md new file mode 100644 index 0000000..d09937e --- /dev/null +++ b/coordinator/README.md @@ -0,0 +1,2 @@ +# storage-coordinator + diff --git a/coordinator/go.mod b/coordinator/go.mod new file mode 100644 index 0000000..d051413 --- /dev/null +++ b/coordinator/go.mod @@ -0,0 +1,35 @@ +module gitlink.org.cn/cloudream/storage-coordinator + +go 1.20 + +require ( + github.com/jmoiron/sqlx v1.3.5 + github.com/samber/lo v1.38.1 + gitlink.org.cn/cloudream/common v0.0.0 + gitlink.org.cn/cloudream/storage-common v0.0.0 +) + +require ( + github.com/antonfisher/nested-logrus-formatter v1.3.1 // indirect + github.com/go-sql-driver/mysql v1.7.1 // indirect + github.com/google/uuid v1.3.0 // indirect + github.com/hashicorp/errwrap v1.1.0 // indirect + github.com/hashicorp/go-multierror v1.1.1 // indirect + github.com/imdario/mergo v0.3.15 // indirect + github.com/json-iterator/go v1.1.12 // indirect + github.com/magefile/mage v1.15.0 // indirect + github.com/mitchellh/mapstructure v1.5.0 // indirect + github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 // indirect + github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/otiai10/copy v1.12.0 // indirect + github.com/sirupsen/logrus v1.9.2 // indirect + github.com/streadway/amqp v1.1.0 // indirect + github.com/zyedidia/generic v1.2.1 // indirect + golang.org/x/exp v0.0.0-20230519143937-03e91628a987 // indirect + golang.org/x/sys v0.7.0 // indirect +) + +// 运行go mod tidy时需要将下面几行取消注释 +replace gitlink.org.cn/cloudream/common => ../../common + +replace gitlink.org.cn/cloudream/storage-common => ../storage-common diff --git a/coordinator/go.sum b/coordinator/go.sum new file mode 100644 index 0000000..c00f1b5 --- /dev/null +++ b/coordinator/go.sum @@ -0,0 +1,64 @@ +github.com/antonfisher/nested-logrus-formatter v1.3.1 h1:NFJIr+pzwv5QLHTPyKz9UMEoHck02Q9L0FP13b/xSbQ= +github.com/antonfisher/nested-logrus-formatter v1.3.1/go.mod h1:6WTfyWFkBc9+zyBaKIqRrg/KwMqBbodBjgbHjDz7zjA= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= +github.com/go-sql-driver/mysql v1.7.1 h1:lUIinVbN1DY0xBg0eMOzmmtGoHwWBbvnWubQUrtU8EI= +github.com/go-sql-driver/mysql v1.7.1/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI= +github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= +github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gopherjs/gopherjs v1.17.2 h1:fQnZVsXk8uxXIStYb0N4bGk7jeyTalG/wsZjQ25dO0g= +github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= +github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I= +github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= +github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= +github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= +github.com/imdario/mergo v0.3.15 h1:M8XP7IuFNsqUx6VPK2P9OSmsYsI/YFaGil0uD21V3dM= +github.com/imdario/mergo v0.3.15/go.mod h1:WBLT9ZmE3lPoWsEzCh9LPo3TiwVN+ZKEjmz+hD27ysY= +github.com/jmoiron/sqlx v1.3.5 h1:vFFPA71p1o5gAeqtEAwLU4dnX2napprKtHr7PYIcN3g= +github.com/jmoiron/sqlx v1.3.5/go.mod h1:nRVWtLre0KfCLJvgxzCsLVMogSvQ1zNJtpYr2Ccp0mQ= +github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= +github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= +github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo= +github.com/lib/pq v1.2.0 h1:LXpIM/LZ5xGFhOpXAQUIMM1HdyqzVYM13zNdjCEEcA0= +github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= +github.com/magefile/mage v1.15.0 h1:BvGheCMAsG3bWUDbZ8AyXXpCNwU9u5CB6sM+HNb9HYg= +github.com/magefile/mage v1.15.0/go.mod h1:z5UZb/iS3GoOSn0JgWuiw7dxlurVYTu+/jHXqQg881A= +github.com/mattn/go-sqlite3 v1.14.6 h1:dNPt6NO46WmLVt2DLNpwczCmdV5boIZ6g/tlDrlRUbg= +github.com/mattn/go-sqlite3 v1.14.6/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU= +github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY= +github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 h1:ZqeYNhU3OHLH3mGKHDcjJRFFRrJa6eAM5H+CtDdOsPc= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= +github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/otiai10/copy v1.12.0 h1:cLMgSQnXBs1eehF0Wy/FAGsgDTDmAqFR7rQylBb1nDY= +github.com/otiai10/copy v1.12.0/go.mod h1:rSaLseMUsZFFbsFGc7wCJnnkTAvdc5L6VWxPE4308Ww= +github.com/otiai10/mint v1.5.1 h1:XaPLeE+9vGbuyEHem1JNk3bYc7KKqyI/na0/mLd/Kks= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/samber/lo v1.38.1 h1:j2XEAqXKb09Am4ebOg31SpvzUTTs6EN3VfgeLUhPdXM= +github.com/samber/lo v1.38.1/go.mod h1:+m/ZKRl6ClXCE2Lgf3MsQlWfh4bn1bz6CXEOxnEXnEA= +github.com/sirupsen/logrus v1.9.2 h1:oxx1eChJGI6Uks2ZC4W1zpLlVgqB8ner4EuQwV4Ik1Y= +github.com/sirupsen/logrus v1.9.2/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= +github.com/smartystreets/assertions v1.13.1 h1:Ef7KhSmjZcK6AVf9YbJdvPYG9avaF0ZxudX+ThRdWfU= +github.com/smartystreets/goconvey v1.8.0 h1:Oi49ha/2MURE0WexF052Z0m+BNSGirfjg5RL+JXWq3w= +github.com/streadway/amqp v1.1.0 h1:py12iX8XSyI7aN/3dUT8DFIDJazNJsVJdxNVEpnQTZM= +github.com/streadway/amqp v1.1.0/go.mod h1:WYSrTEYHOXHd0nwFeUXAe2G2hRnQT+deZJJf88uS9Bg= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/zyedidia/generic v1.2.1 h1:Zv5KS/N2m0XZZiuLS82qheRG4X1o5gsWreGb0hR7XDc= +github.com/zyedidia/generic v1.2.1/go.mod h1:ly2RBz4mnz1yeuVbQA/VFwGjK3mnHGRj1JuoG336Bis= +golang.org/x/exp v0.0.0-20230519143937-03e91628a987 h1:3xJIFvzUFbu4ls0BTBYcgbCGhA63eAOEMxIHugyXJqA= +golang.org/x/exp v0.0.0-20230519143937-03e91628a987/go.mod h1:V1LtkGg67GoY2N1AnLN78QLrzxkLyJw7RJb1gzOOz9w= +golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.7.0 h1:3jlCCIQZPdOYu1h8BkNvLz8Kgwtae2cagcG/VamtZRU= +golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/coordinator/internal/config/config.go b/coordinator/internal/config/config.go new file mode 100644 index 0000000..864295f --- /dev/null +++ b/coordinator/internal/config/config.go @@ -0,0 +1,24 @@ +package config + +import ( + log "gitlink.org.cn/cloudream/common/pkgs/logger" + c "gitlink.org.cn/cloudream/common/utils/config" + db "gitlink.org.cn/cloudream/storage-common/pkgs/db/config" + stgmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq" +) + +type Config struct { + Logger log.Config `json:"logger"` + DB db.Config `json:"db"` + RabbitMQ stgmq.Config `json:"rabbitMQ"` +} + +var cfg Config + +func Init() error { + return c.DefaultLoad("coordinator", &cfg) +} + +func Cfg() *Config { + return &cfg +} diff --git a/coordinator/internal/services/agent.go b/coordinator/internal/services/agent.go new file mode 100644 index 0000000..8a31e1c --- /dev/null +++ b/coordinator/internal/services/agent.go @@ -0,0 +1,24 @@ +package services + +import ( + coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator" +) + +func (service *Service) TempCacheReport(msg *coormq.TempCacheReport) { + //service.db.BatchInsertOrUpdateCache(msg.Hashes, msg.NodeID) +} + +func (service *Service) AgentStatusReport(msg *coormq.AgentStatusReport) { + //jh:根据command中的Ip,插入节点延迟表,和节点表的NodeStatus + //根据command中的Ip,插入节点延迟表 + + // TODO + /* + ips := utils.GetAgentIps() + Insert_NodeDelay(msg.Body.IP, ips, msg.Body.AgentDelay) + + //从配置表里读取节点地域NodeLocation + //插入节点表的NodeStatus + Insert_Node(msg.Body.IP, msg.Body.IP, msg.Body.IPFSStatus, msg.Body.LocalDirStatus) + */ +} diff --git a/coordinator/internal/services/bucket.go b/coordinator/internal/services/bucket.go new file mode 100644 index 0000000..5ba7526 --- /dev/null +++ b/coordinator/internal/services/bucket.go @@ -0,0 +1,74 @@ +package services + +import ( + "database/sql" + + "github.com/jmoiron/sqlx" + "gitlink.org.cn/cloudream/common/consts/errorcode" + "gitlink.org.cn/cloudream/common/pkgs/logger" + "gitlink.org.cn/cloudream/common/pkgs/mq" + "gitlink.org.cn/cloudream/storage-common/pkgs/db/model" + coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator" +) + +func (svc *Service) GetBucket(userID int, bucketID int) (model.Bucket, error) { + // TODO + panic("not implement yet") +} + +func (svc *Service) GetUserBuckets(msg *coormq.GetUserBuckets) (*coormq.GetUserBucketsResp, *mq.CodeMessage) { + buckets, err := svc.db.Bucket().GetUserBuckets(svc.db.SQLCtx(), msg.UserID) + + if err != nil { + logger.WithField("UserID", msg.UserID). + Warnf("get user buckets failed, err: %s", err.Error()) + return mq.ReplyFailed[coormq.GetUserBucketsResp](errorcode.OperationFailed, "get all buckets failed") + } + + return mq.ReplyOK(coormq.NewGetUserBucketsResp(buckets)) +} + +func (svc *Service) GetBucketPackages(msg *coormq.GetBucketPackages) (*coormq.GetBucketPackagesResp, *mq.CodeMessage) { + packages, err := svc.db.Package().GetBucketPackages(svc.db.SQLCtx(), msg.UserID, msg.BucketID) + + if err != nil { + logger.WithField("UserID", msg.UserID). + WithField("BucketID", msg.BucketID). + Warnf("get bucket packages failed, err: %s", err.Error()) + return mq.ReplyFailed[coormq.GetBucketPackagesResp](errorcode.OperationFailed, "get bucket packages failed") + } + + return mq.ReplyOK(coormq.NewGetBucketPackagesResp(packages)) +} + +func (svc *Service) CreateBucket(msg *coormq.CreateBucket) (*coormq.CreateBucketResp, *mq.CodeMessage) { + var bucketID int64 + var err error + svc.db.DoTx(sql.LevelDefault, func(tx *sqlx.Tx) error { + // 这里用的是外部的err + bucketID, err = svc.db.Bucket().Create(tx, msg.UserID, msg.BucketName) + return err + }) + if err != nil { + logger.WithField("UserID", msg.UserID). + WithField("BucketName", msg.BucketName). + Warnf("create bucket failed, err: %s", err.Error()) + return mq.ReplyFailed[coormq.CreateBucketResp](errorcode.OperationFailed, "create bucket failed") + } + + return mq.ReplyOK(coormq.NewCreateBucketResp(bucketID)) +} + +func (svc *Service) DeleteBucket(msg *coormq.DeleteBucket) (*coormq.DeleteBucketResp, *mq.CodeMessage) { + err := svc.db.DoTx(sql.LevelDefault, func(tx *sqlx.Tx) error { + return svc.db.Bucket().Delete(tx, msg.BucketID) + }) + if err != nil { + logger.WithField("UserID", msg.UserID). + WithField("BucketID", msg.BucketID). + Warnf("delete bucket failed, err: %s", err.Error()) + return mq.ReplyFailed[coormq.DeleteBucketResp](errorcode.OperationFailed, "delete bucket failed") + } + + return mq.ReplyOK(coormq.NewDeleteBucketResp()) +} diff --git a/coordinator/internal/services/cache.go b/coordinator/internal/services/cache.go new file mode 100644 index 0000000..b7677f9 --- /dev/null +++ b/coordinator/internal/services/cache.go @@ -0,0 +1,29 @@ +package services + +import ( + "gitlink.org.cn/cloudream/common/consts/errorcode" + "gitlink.org.cn/cloudream/common/pkgs/logger" + "gitlink.org.cn/cloudream/common/pkgs/mq" + coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator" +) + +func (svc *Service) CachePackageMoved(msg *coormq.CachePackageMoved) (*coormq.CachePackageMovedResp, *mq.CodeMessage) { + pkg, err := svc.db.Package().GetByID(svc.db.SQLCtx(), msg.PackageID) + if err != nil { + logger.WithField("PackageID", msg.PackageID). + Warnf("getting package: %s", err.Error()) + return nil, mq.Failed(errorcode.OperationFailed, "get package failed") + } + + if pkg.Redundancy.IsRepInfo() { + // TODO 优先级 + if err := svc.db.Cache().BatchCreatePinned(svc.db.SQLCtx(), msg.FileHashes, msg.NodeID, 0); err != nil { + logger.Warnf("batch creating pinned cache: %s", err.Error()) + return nil, mq.Failed(errorcode.OperationFailed, "batch create pinned cache failed") + } + } + + // TODO EC的逻辑 + + return mq.ReplyOK(coormq.NewCachePackageMovedResp()) +} diff --git a/coordinator/internal/services/conmmon.go b/coordinator/internal/services/conmmon.go new file mode 100644 index 0000000..928679e --- /dev/null +++ b/coordinator/internal/services/conmmon.go @@ -0,0 +1,30 @@ +package services + +import ( + "gitlink.org.cn/cloudream/common/consts/errorcode" + "gitlink.org.cn/cloudream/common/pkgs/logger" + "gitlink.org.cn/cloudream/common/pkgs/mq" + coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator" +) + +func (svc *Service) FindClientLocation(msg *coormq.FindClientLocation) (*coormq.FindClientLocationResp, *mq.CodeMessage) { + location, err := svc.db.Location().FindLocationByExternalIP(svc.db.SQLCtx(), msg.IP) + if err != nil { + logger.WithField("IP", msg.IP). + Warnf("query client location failed, err: %s", err.Error()) + return nil, mq.Failed(errorcode.OperationFailed, "query client location failed") + } + + return mq.ReplyOK(coormq.NewFindClientLocationResp(location)) +} + +func (svc *Service) GetECConfig(msg *coormq.GetECConfig) (*coormq.GetECConfigResp, *mq.CodeMessage) { + ec, err := svc.db.Ec().GetEc(svc.db.SQLCtx(), msg.ECName) + if err != nil { + logger.WithField("ECName", msg.ECName). + Warnf("query ec failed, err: %s", err.Error()) + return nil, mq.Failed(errorcode.OperationFailed, "query ec failed") + } + + return mq.ReplyOK(coormq.NewGetECConfigResp(ec)) +} diff --git a/coordinator/internal/services/node.go b/coordinator/internal/services/node.go new file mode 100644 index 0000000..9a12708 --- /dev/null +++ b/coordinator/internal/services/node.go @@ -0,0 +1,36 @@ +package services + +import ( + "gitlink.org.cn/cloudream/common/consts/errorcode" + "gitlink.org.cn/cloudream/common/pkgs/logger" + "gitlink.org.cn/cloudream/common/pkgs/mq" + "gitlink.org.cn/cloudream/storage-common/pkgs/db/model" + coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator" +) + +func (svc *Service) GetUserNodes(msg *coormq.GetUserNodes) (*coormq.GetUserNodesResp, *mq.CodeMessage) { + nodes, err := svc.db.Node().GetUserNodes(svc.db.SQLCtx(), msg.UserID) + if err != nil { + logger.WithField("UserID", msg.UserID). + Warnf("query user nodes failed, err: %s", err.Error()) + return nil, mq.Failed(errorcode.OperationFailed, "query user nodes failed") + } + + return mq.ReplyOK(coormq.NewGetUserNodesResp(nodes)) +} + +func (svc *Service) GetNodes(msg *coormq.GetNodes) (*coormq.GetNodesResp, *mq.CodeMessage) { + var nodes []model.Node + for _, id := range msg.NodeIDs { + node, err := svc.db.Node().GetByID(svc.db.SQLCtx(), id) + if err != nil { + logger.WithField("NodeID", id). + Warnf("query node failed, err: %s", err.Error()) + return nil, mq.Failed(errorcode.OperationFailed, "query node failed") + } + + nodes = append(nodes, node) + } + + return mq.ReplyOK(coormq.NewGetNodesResp(nodes)) +} diff --git a/coordinator/internal/services/object.go b/coordinator/internal/services/object.go new file mode 100644 index 0000000..68272d3 --- /dev/null +++ b/coordinator/internal/services/object.go @@ -0,0 +1,32 @@ +package services + +import ( + "gitlink.org.cn/cloudream/common/consts/errorcode" + "gitlink.org.cn/cloudream/common/pkgs/logger" + "gitlink.org.cn/cloudream/common/pkgs/mq" + coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator" +) + +func (svc *Service) GetPackageObjectRepData(msg *coormq.GetPackageObjectRepData) (*coormq.GetPackageObjectRepDataResp, *mq.CodeMessage) { + data, err := svc.db.ObjectRep().GetWithNodeIDInPackage(svc.db.SQLCtx(), msg.PackageID) + if err != nil { + logger.WithField("PackageID", msg.PackageID). + Warnf("query object rep and node id in package: %s", err.Error()) + + return nil, mq.Failed(errorcode.OperationFailed, "query object rep and node id in package failed") + } + + return mq.ReplyOK(coormq.NewGetPackageObjectRepDataResp(data)) +} + +func (svc *Service) GetPackageObjectECData(msg *coormq.GetPackageObjectECData) (*coormq.GetPackageObjectECDataResp, *mq.CodeMessage) { + data, err := svc.db.ObjectBlock().GetWithNodeIDInPackage(svc.db.SQLCtx(), msg.PackageID) + if err != nil { + logger.WithField("PackageID", msg.PackageID). + Warnf("query object ec and node id in package: %s", err.Error()) + + return nil, mq.Failed(errorcode.OperationFailed, "query object ec and node id in package failed") + } + + return mq.ReplyOK(coormq.NewGetPackageObjectECDataResp(data)) +} diff --git a/coordinator/internal/services/package.go b/coordinator/internal/services/package.go new file mode 100644 index 0000000..afe5e4b --- /dev/null +++ b/coordinator/internal/services/package.go @@ -0,0 +1,309 @@ +package services + +import ( + "database/sql" + "fmt" + "sort" + + "github.com/jmoiron/sqlx" + "gitlink.org.cn/cloudream/common/consts/errorcode" + "gitlink.org.cn/cloudream/common/models" + "gitlink.org.cn/cloudream/common/pkgs/logger" + "gitlink.org.cn/cloudream/common/pkgs/mq" + coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator" + scevt "gitlink.org.cn/cloudream/storage-common/pkgs/mq/scanner/event" +) + +func (svc *Service) GetPackage(msg *coormq.GetPackage) (*coormq.GetPackageResp, *mq.CodeMessage) { + pkg, err := svc.db.Package().GetByID(svc.db.SQLCtx(), msg.PackageID) + if err != nil { + logger.WithField("PackageID", msg.PackageID). + Warnf("get package: %s", err.Error()) + + return nil, mq.Failed(errorcode.OperationFailed, "get package failed") + } + + return mq.ReplyOK(coormq.NewGetPackageResp(pkg)) +} + +func (svc *Service) GetPackageObjects(msg *coormq.GetPackageObjects) (*coormq.GetPackageObjectsResp, *mq.CodeMessage) { + // TODO 检查用户是否有权限 + objs, err := svc.db.Object().GetPackageObjects(svc.db.SQLCtx(), msg.PackageID) + if err != nil { + logger.WithField("PackageID", msg.PackageID). + Warnf("get package objects: %s", err.Error()) + + return nil, mq.Failed(errorcode.OperationFailed, "get package objects failed") + } + + return mq.ReplyOK(coormq.NewGetPackageObjectsResp(objs)) +} + +func (svc *Service) CreatePackage(msg *coormq.CreatePackage) (*coormq.CreatePackageResp, *mq.CodeMessage) { + var pkgID int64 + err := svc.db.DoTx(sql.LevelDefault, func(tx *sqlx.Tx) error { + var err error + pkgID, err = svc.db.Package().Create(svc.db.SQLCtx(), msg.BucketID, msg.Name, msg.Redundancy) + return err + }) + if err != nil { + logger.WithField("BucketID", msg.BucketID). + WithField("Name", msg.Name). + Warnf("creating package: %s", err.Error()) + + return nil, mq.Failed(errorcode.OperationFailed, "creating package failed") + } + + return mq.ReplyOK(coormq.NewCreatePackageResp(pkgID)) +} + +func (svc *Service) UpdateRepPackage(msg *coormq.UpdateRepPackage) (*coormq.UpdateRepPackageResp, *mq.CodeMessage) { + _, err := svc.db.Package().GetByID(svc.db.SQLCtx(), msg.PackageID) + if err != nil { + logger.WithField("PackageID", msg.PackageID). + Warnf("get package: %s", err.Error()) + + return nil, mq.Failed(errorcode.OperationFailed, "get package failed") + } + + err = svc.db.DoTx(sql.LevelDefault, func(tx *sqlx.Tx) error { + // 先执行删除操作 + if len(msg.Deletes) > 0 { + if err := svc.db.Object().BatchDelete(tx, msg.Deletes); err != nil { + return fmt.Errorf("deleting objects: %w", err) + } + } + + // 再执行添加操作 + if len(msg.Adds) > 0 { + if _, err := svc.db.Object().BatchAddRep(tx, msg.PackageID, msg.Adds); err != nil { + return fmt.Errorf("adding objects: %w", err) + } + } + + return nil + }) + if err != nil { + logger.Warn(err.Error()) + return nil, mq.Failed(errorcode.OperationFailed, "update rep package failed") + } + + // 紧急任务 + var affectFileHashes []string + for _, add := range msg.Adds { + affectFileHashes = append(affectFileHashes, add.FileHash) + } + + err = svc.scanner.PostEvent(scevt.NewCheckRepCount(affectFileHashes), true, true) + if err != nil { + logger.Warnf("post event to scanner failed, but this will not affect creating, err: %s", err.Error()) + } + + return mq.ReplyOK(coormq.NewUpdateRepPackageResp()) +} + +func (svc *Service) UpdateECPackage(msg *coormq.UpdateECPackage) (*coormq.UpdateECPackageResp, *mq.CodeMessage) { + _, err := svc.db.Package().GetByID(svc.db.SQLCtx(), msg.PackageID) + if err != nil { + logger.WithField("PackageID", msg.PackageID). + Warnf("get package: %s", err.Error()) + + return nil, mq.Failed(errorcode.OperationFailed, "get package failed") + } + + err = svc.db.DoTx(sql.LevelDefault, func(tx *sqlx.Tx) error { + // 先执行删除操作 + if len(msg.Deletes) > 0 { + if err := svc.db.Object().BatchDelete(tx, msg.Deletes); err != nil { + return fmt.Errorf("deleting objects: %w", err) + } + } + + // 再执行添加操作 + if len(msg.Adds) > 0 { + if _, err := svc.db.Object().BatchAddEC(tx, msg.PackageID, msg.Adds); err != nil { + return fmt.Errorf("adding objects: %w", err) + } + } + + return nil + }) + if err != nil { + logger.Warn(err.Error()) + return nil, mq.Failed(errorcode.OperationFailed, "update ec package failed") + } + + return mq.ReplyOK(coormq.NewUpdateECPackageResp()) +} + +func (svc *Service) DeletePackage(msg *coormq.DeletePackage) (*coormq.DeletePackageResp, *mq.CodeMessage) { + isAva, err := svc.db.Package().IsAvailable(svc.db.SQLCtx(), msg.UserID, msg.PackageID) + if err != nil { + logger.WithField("UserID", msg.UserID). + WithField("PackageID", msg.PackageID). + Warnf("check package available failed, err: %s", err.Error()) + return mq.ReplyFailed[coormq.DeletePackageResp](errorcode.OperationFailed, "check package available failed") + } + if !isAva { + logger.WithField("UserID", msg.UserID). + WithField("PackageID", msg.PackageID). + Warnf("package is not available to the user") + return mq.ReplyFailed[coormq.DeletePackageResp](errorcode.OperationFailed, "package is not available to the user") + } + + err = svc.db.DoTx(sql.LevelDefault, func(tx *sqlx.Tx) error { + return svc.db.Package().SoftDelete(tx, msg.PackageID) + }) + if err != nil { + logger.WithField("UserID", msg.UserID). + WithField("PackageID", msg.PackageID). + Warnf("set package deleted failed, err: %s", err.Error()) + return mq.ReplyFailed[coormq.DeletePackageResp](errorcode.OperationFailed, "set package deleted failed") + } + + stgs, err := svc.db.StoragePackage().FindPackageStorages(svc.db.SQLCtx(), msg.PackageID) + if err != nil { + logger.Warnf("find package storages failed, but this will not affect the deleting, err: %s", err.Error()) + return mq.ReplyOK(coormq.NewDeletePackageResp()) + } + + // 不追求及时、准确 + if len(stgs) == 0 { + // 如果没有被引用,直接投递CheckPackage的任务 + err := svc.scanner.PostEvent(scevt.NewCheckPackage([]int64{msg.PackageID}), false, false) + if err != nil { + logger.Warnf("post event to scanner failed, but this will not affect deleting, err: %s", err.Error()) + } + logger.Debugf("post check package event") + + } else { + // 有引用则让Agent去检查StoragePackage + for _, stg := range stgs { + err := svc.scanner.PostEvent(scevt.NewAgentCheckStorage(stg.StorageID, []int64{msg.PackageID}), false, false) + if err != nil { + logger.Warnf("post event to scanner failed, but this will not affect deleting, err: %s", err.Error()) + } + } + logger.Debugf("post agent check storage event") + } + + return mq.ReplyOK(coormq.NewDeletePackageResp()) +} + +func (svc *Service) GetPackageCachedNodes(msg *coormq.GetPackageCachedNodes) (*coormq.GetPackageCachedNodesResp, *mq.CodeMessage) { + isAva, err := svc.db.Package().IsAvailable(svc.db.SQLCtx(), msg.UserID, msg.PackageID) + if err != nil { + logger.WithField("UserID", msg.UserID). + WithField("PackageID", msg.PackageID). + Warnf("check package available failed, err: %s", err.Error()) + return mq.ReplyFailed[coormq.GetPackageCachedNodesResp](errorcode.OperationFailed, "check package available failed") + } + if !isAva { + logger.WithField("UserID", msg.UserID). + WithField("PackageID", msg.PackageID). + Warnf("package is not available to the user") + return mq.ReplyFailed[coormq.GetPackageCachedNodesResp](errorcode.OperationFailed, "package is not available to the user") + } + + pkg, err := svc.db.Package().GetByID(svc.db.SQLCtx(), msg.PackageID) + if err != nil { + logger.WithField("PackageID", msg.PackageID). + Warnf("get package: %s", err.Error()) + + return nil, mq.Failed(errorcode.OperationFailed, "get package failed") + } + + var packageSize int64 + nodeInfoMap := make(map[int64]*models.NodePackageCachingInfo) + if pkg.Redundancy.IsRepInfo() { + // 备份方式为rep + objectRepDatas, err := svc.db.ObjectRep().GetWithNodeIDInPackage(svc.db.SQLCtx(), msg.PackageID) + if err != nil { + logger.WithField("PackageID", msg.PackageID). + Warnf("get objectRepDatas by packageID failed, err: %s", err.Error()) + return mq.ReplyFailed[coormq.GetPackageCachedNodesResp](errorcode.OperationFailed, "get objectRepDatas by packageID failed") + } + + for _, data := range objectRepDatas { + packageSize += data.Object.Size + for _, nodeID := range data.NodeIDs { + + nodeInfo, exists := nodeInfoMap[nodeID] + if !exists { + nodeInfo = &models.NodePackageCachingInfo{ + NodeID: nodeID, + FileSize: data.Object.Size, + ObjectCount: 1, + } + } else { + nodeInfo.FileSize += data.Object.Size + nodeInfo.ObjectCount++ + } + nodeInfoMap[nodeID] = nodeInfo + } + } + } else if pkg.Redundancy.IsECInfo() { + // 备份方式为ec + objectECDatas, err := svc.db.ObjectBlock().GetWithNodeIDInPackage(svc.db.SQLCtx(), msg.PackageID) + if err != nil { + logger.WithField("PackageID", msg.PackageID). + Warnf("get objectECDatas by packageID failed, err: %s", err.Error()) + return mq.ReplyFailed[coormq.GetPackageCachedNodesResp](errorcode.OperationFailed, "get objectECDatas by packageID failed") + } + + for _, ecData := range objectECDatas { + packageSize += ecData.Object.Size + for _, block := range ecData.Blocks { + for _, nodeID := range block.NodeIDs { + + nodeInfo, exists := nodeInfoMap[nodeID] + if !exists { + nodeInfo = &models.NodePackageCachingInfo{ + NodeID: nodeID, + FileSize: ecData.Object.Size, + ObjectCount: 1, + } + } else { + nodeInfo.FileSize += ecData.Object.Size + nodeInfo.ObjectCount++ + } + nodeInfoMap[nodeID] = nodeInfo + } + } + } + } else { + logger.WithField("PackageID", msg.PackageID). + Warnf("Redundancy type %s is wrong", pkg.Redundancy.Type) + return mq.ReplyFailed[coormq.GetPackageCachedNodesResp](errorcode.OperationFailed, "redundancy type is wrong") + } + + var nodeInfos []models.NodePackageCachingInfo + for _, nodeInfo := range nodeInfoMap { + nodeInfos = append(nodeInfos, *nodeInfo) + } + + sort.Slice(nodeInfos, func(i, j int) bool { + return nodeInfos[i].NodeID < nodeInfos[j].NodeID + }) + return mq.ReplyOK(coormq.NewGetPackageCachedNodesResp(nodeInfos, packageSize, pkg.Redundancy.Type)) +} + +func (svc *Service) GetPackageLoadedNodes(msg *coormq.GetPackageLoadedNodes) (*coormq.GetPackageLoadedNodesResp, *mq.CodeMessage) { + storages, err := svc.db.StoragePackage().FindPackageStorages(svc.db.SQLCtx(), msg.PackageID) + if err != nil { + logger.WithField("PackageID", msg.PackageID). + Warnf("get storages by packageID failed, err: %s", err.Error()) + return mq.ReplyFailed[coormq.GetPackageLoadedNodesResp](errorcode.OperationFailed, "get storages by packageID failed") + } + + uniqueNodeIDs := make(map[int64]bool) + var nodeIDs []int64 + for _, stg := range storages { + if !uniqueNodeIDs[stg.NodeID] { + uniqueNodeIDs[stg.NodeID] = true + nodeIDs = append(nodeIDs, stg.NodeID) + } + } + + return mq.ReplyOK(coormq.NewGetPackageLoadedNodesResp(nodeIDs)) +} diff --git a/coordinator/internal/services/service.go b/coordinator/internal/services/service.go new file mode 100644 index 0000000..dbc4f8a --- /dev/null +++ b/coordinator/internal/services/service.go @@ -0,0 +1,18 @@ +package services + +import ( + mydb "gitlink.org.cn/cloudream/storage-common/pkgs/db" + scmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/scanner" +) + +type Service struct { + db *mydb.DB + scanner *scmq.Client +} + +func NewService(db *mydb.DB, scanner *scmq.Client) *Service { + return &Service{ + db: db, + scanner: scanner, + } +} diff --git a/coordinator/internal/services/storage.go b/coordinator/internal/services/storage.go new file mode 100644 index 0000000..82138c6 --- /dev/null +++ b/coordinator/internal/services/storage.go @@ -0,0 +1,38 @@ +package services + +import ( + "database/sql" + + "github.com/jmoiron/sqlx" + "gitlink.org.cn/cloudream/common/consts/errorcode" + "gitlink.org.cn/cloudream/common/pkgs/logger" + + "gitlink.org.cn/cloudream/common/pkgs/mq" + coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator" +) + +func (svc *Service) GetStorageInfo(msg *coormq.GetStorageInfo) (*coormq.GetStorageInfoResp, *mq.CodeMessage) { + stg, err := svc.db.Storage().GetUserStorage(svc.db.SQLCtx(), msg.UserID, msg.StorageID) + if err != nil { + logger.Warnf("getting user storage: %s", err.Error()) + return nil, mq.Failed(errorcode.OperationFailed, "get user storage failed") + } + + return mq.ReplyOK(coormq.NewGetStorageInfoResp(stg.StorageID, stg.Name, stg.NodeID, stg.Directory, stg.State)) +} + +func (svc *Service) StoragePackageLoaded(msg *coormq.StoragePackageLoaded) (*coormq.StoragePackageLoadedResp, *mq.CodeMessage) { + // TODO: 对于的storage中已经存在的文件,直接覆盖已有文件 + err := svc.db.DoTx(sql.LevelDefault, func(tx *sqlx.Tx) error { + return svc.db.StoragePackage().LoadPackage(tx, msg.PackageID, msg.StorageID, msg.UserID) + }) + if err != nil { + logger.WithField("UserID", msg.UserID). + WithField("PackageID", msg.PackageID). + WithField("StorageID", msg.StorageID). + Warnf("user load package to storage failed, err: %s", err.Error()) + return nil, mq.Failed(errorcode.OperationFailed, "user load package to storage failed") + } + + return mq.ReplyOK(coormq.NewStoragePackageLoadedResp()) +} diff --git a/coordinator/magefiles/magefile.go b/coordinator/magefiles/magefile.go new file mode 100644 index 0000000..2ca1169 --- /dev/null +++ b/coordinator/magefiles/magefile.go @@ -0,0 +1,20 @@ +//go:build mage + +package main + +import ( + "gitlink.org.cn/cloudream/common/magefiles" + + //mage:import + _ "gitlink.org.cn/cloudream/common/magefiles/targets" +) + +var Default = Build + +func Build() error { + return magefiles.Build(magefiles.BuildArgs{ + OutputName: "coordinator", + OutputDir: "coordinator", + AssetsDir: "assets", + }) +} diff --git a/coordinator/main.go b/coordinator/main.go new file mode 100644 index 0000000..f08b964 --- /dev/null +++ b/coordinator/main.go @@ -0,0 +1,63 @@ +package main + +import ( + "fmt" + "os" + + "gitlink.org.cn/cloudream/common/pkgs/logger" + mydb "gitlink.org.cn/cloudream/storage-common/pkgs/db" + coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator" + scmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/scanner" + "gitlink.org.cn/cloudream/storage-coordinator/internal/config" + "gitlink.org.cn/cloudream/storage-coordinator/internal/services" +) + +func main() { + err := config.Init() + if err != nil { + fmt.Printf("init config failed, err: %s", err.Error()) + os.Exit(1) + } + + err = logger.Init(&config.Cfg().Logger) + if err != nil { + fmt.Printf("init logger failed, err: %s", err.Error()) + os.Exit(1) + } + + db, err := mydb.NewDB(&config.Cfg().DB) + if err != nil { + logger.Fatalf("new db failed, err: %s", err.Error()) + } + + scanner, err := scmq.NewClient(&config.Cfg().RabbitMQ) + if err != nil { + logger.Fatalf("new scanner client failed, err: %s", err.Error()) + } + + coorSvr, err := coormq.NewServer(services.NewService(db, scanner), &config.Cfg().RabbitMQ) + if err != nil { + logger.Fatalf("new coordinator server failed, err: %s", err.Error()) + } + + coorSvr.OnError = func(err error) { + logger.Warnf("coordinator server err: %s", err.Error()) + } + + // 启动服务 + go serveCoorServer(coorSvr) + + forever := make(chan bool) + <-forever +} + +func serveCoorServer(server *coormq.Server) { + logger.Info("start serving command server") + + err := server.Serve() + if err != nil { + logger.Errorf("command server stopped with error: %s", err.Error()) + } + + logger.Info("command server stopped") +}