Browse Source

Merge remote-tracking branch 'origin/mv_coor'

gitlink
Sydonian 2 years ago
parent
commit
ce6cc6d6d4
15 changed files with 798 additions and 0 deletions
  1. +2
    -0
      coordinator/README.md
  2. +35
    -0
      coordinator/go.mod
  3. +64
    -0
      coordinator/go.sum
  4. +24
    -0
      coordinator/internal/config/config.go
  5. +24
    -0
      coordinator/internal/services/agent.go
  6. +74
    -0
      coordinator/internal/services/bucket.go
  7. +29
    -0
      coordinator/internal/services/cache.go
  8. +30
    -0
      coordinator/internal/services/conmmon.go
  9. +36
    -0
      coordinator/internal/services/node.go
  10. +32
    -0
      coordinator/internal/services/object.go
  11. +309
    -0
      coordinator/internal/services/package.go
  12. +18
    -0
      coordinator/internal/services/service.go
  13. +38
    -0
      coordinator/internal/services/storage.go
  14. +20
    -0
      coordinator/magefiles/magefile.go
  15. +63
    -0
      coordinator/main.go

+ 2
- 0
coordinator/README.md View File

@@ -0,0 +1,2 @@
# storage-coordinator


+ 35
- 0
coordinator/go.mod View File

@@ -0,0 +1,35 @@
module gitlink.org.cn/cloudream/storage-coordinator

go 1.20

require (
github.com/jmoiron/sqlx v1.3.5
github.com/samber/lo v1.38.1
gitlink.org.cn/cloudream/common v0.0.0
gitlink.org.cn/cloudream/storage-common v0.0.0
)

require (
github.com/antonfisher/nested-logrus-formatter v1.3.1 // indirect
github.com/go-sql-driver/mysql v1.7.1 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/imdario/mergo v0.3.15 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/magefile/mage v1.15.0 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/otiai10/copy v1.12.0 // indirect
github.com/sirupsen/logrus v1.9.2 // indirect
github.com/streadway/amqp v1.1.0 // indirect
github.com/zyedidia/generic v1.2.1 // indirect
golang.org/x/exp v0.0.0-20230519143937-03e91628a987 // indirect
golang.org/x/sys v0.7.0 // indirect
)

// 运行go mod tidy时需要将下面几行取消注释
replace gitlink.org.cn/cloudream/common => ../../common

replace gitlink.org.cn/cloudream/storage-common => ../storage-common

+ 64
- 0
coordinator/go.sum View File

@@ -0,0 +1,64 @@
github.com/antonfisher/nested-logrus-formatter v1.3.1 h1:NFJIr+pzwv5QLHTPyKz9UMEoHck02Q9L0FP13b/xSbQ=
github.com/antonfisher/nested-logrus-formatter v1.3.1/go.mod h1:6WTfyWFkBc9+zyBaKIqRrg/KwMqBbodBjgbHjDz7zjA=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
github.com/go-sql-driver/mysql v1.7.1 h1:lUIinVbN1DY0xBg0eMOzmmtGoHwWBbvnWubQUrtU8EI=
github.com/go-sql-driver/mysql v1.7.1/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gopherjs/gopherjs v1.17.2 h1:fQnZVsXk8uxXIStYb0N4bGk7jeyTalG/wsZjQ25dO0g=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I=
github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM=
github.com/imdario/mergo v0.3.15 h1:M8XP7IuFNsqUx6VPK2P9OSmsYsI/YFaGil0uD21V3dM=
github.com/imdario/mergo v0.3.15/go.mod h1:WBLT9ZmE3lPoWsEzCh9LPo3TiwVN+ZKEjmz+hD27ysY=
github.com/jmoiron/sqlx v1.3.5 h1:vFFPA71p1o5gAeqtEAwLU4dnX2napprKtHr7PYIcN3g=
github.com/jmoiron/sqlx v1.3.5/go.mod h1:nRVWtLre0KfCLJvgxzCsLVMogSvQ1zNJtpYr2Ccp0mQ=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo=
github.com/lib/pq v1.2.0 h1:LXpIM/LZ5xGFhOpXAQUIMM1HdyqzVYM13zNdjCEEcA0=
github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/magefile/mage v1.15.0 h1:BvGheCMAsG3bWUDbZ8AyXXpCNwU9u5CB6sM+HNb9HYg=
github.com/magefile/mage v1.15.0/go.mod h1:z5UZb/iS3GoOSn0JgWuiw7dxlurVYTu+/jHXqQg881A=
github.com/mattn/go-sqlite3 v1.14.6 h1:dNPt6NO46WmLVt2DLNpwczCmdV5boIZ6g/tlDrlRUbg=
github.com/mattn/go-sqlite3 v1.14.6/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU=
github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY=
github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 h1:ZqeYNhU3OHLH3mGKHDcjJRFFRrJa6eAM5H+CtDdOsPc=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/otiai10/copy v1.12.0 h1:cLMgSQnXBs1eehF0Wy/FAGsgDTDmAqFR7rQylBb1nDY=
github.com/otiai10/copy v1.12.0/go.mod h1:rSaLseMUsZFFbsFGc7wCJnnkTAvdc5L6VWxPE4308Ww=
github.com/otiai10/mint v1.5.1 h1:XaPLeE+9vGbuyEHem1JNk3bYc7KKqyI/na0/mLd/Kks=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/samber/lo v1.38.1 h1:j2XEAqXKb09Am4ebOg31SpvzUTTs6EN3VfgeLUhPdXM=
github.com/samber/lo v1.38.1/go.mod h1:+m/ZKRl6ClXCE2Lgf3MsQlWfh4bn1bz6CXEOxnEXnEA=
github.com/sirupsen/logrus v1.9.2 h1:oxx1eChJGI6Uks2ZC4W1zpLlVgqB8ner4EuQwV4Ik1Y=
github.com/sirupsen/logrus v1.9.2/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
github.com/smartystreets/assertions v1.13.1 h1:Ef7KhSmjZcK6AVf9YbJdvPYG9avaF0ZxudX+ThRdWfU=
github.com/smartystreets/goconvey v1.8.0 h1:Oi49ha/2MURE0WexF052Z0m+BNSGirfjg5RL+JXWq3w=
github.com/streadway/amqp v1.1.0 h1:py12iX8XSyI7aN/3dUT8DFIDJazNJsVJdxNVEpnQTZM=
github.com/streadway/amqp v1.1.0/go.mod h1:WYSrTEYHOXHd0nwFeUXAe2G2hRnQT+deZJJf88uS9Bg=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/zyedidia/generic v1.2.1 h1:Zv5KS/N2m0XZZiuLS82qheRG4X1o5gsWreGb0hR7XDc=
github.com/zyedidia/generic v1.2.1/go.mod h1:ly2RBz4mnz1yeuVbQA/VFwGjK3mnHGRj1JuoG336Bis=
golang.org/x/exp v0.0.0-20230519143937-03e91628a987 h1:3xJIFvzUFbu4ls0BTBYcgbCGhA63eAOEMxIHugyXJqA=
golang.org/x/exp v0.0.0-20230519143937-03e91628a987/go.mod h1:V1LtkGg67GoY2N1AnLN78QLrzxkLyJw7RJb1gzOOz9w=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.7.0 h1:3jlCCIQZPdOYu1h8BkNvLz8Kgwtae2cagcG/VamtZRU=
golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

+ 24
- 0
coordinator/internal/config/config.go View File

@@ -0,0 +1,24 @@
package config

import (
log "gitlink.org.cn/cloudream/common/pkgs/logger"
c "gitlink.org.cn/cloudream/common/utils/config"
db "gitlink.org.cn/cloudream/storage-common/pkgs/db/config"
stgmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq"
)

type Config struct {
Logger log.Config `json:"logger"`
DB db.Config `json:"db"`
RabbitMQ stgmq.Config `json:"rabbitMQ"`
}

var cfg Config

func Init() error {
return c.DefaultLoad("coordinator", &cfg)
}

func Cfg() *Config {
return &cfg
}

+ 24
- 0
coordinator/internal/services/agent.go View File

@@ -0,0 +1,24 @@
package services

import (
coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator"
)

func (service *Service) TempCacheReport(msg *coormq.TempCacheReport) {
//service.db.BatchInsertOrUpdateCache(msg.Hashes, msg.NodeID)
}

func (service *Service) AgentStatusReport(msg *coormq.AgentStatusReport) {
//jh:根据command中的Ip,插入节点延迟表,和节点表的NodeStatus
//根据command中的Ip,插入节点延迟表

// TODO
/*
ips := utils.GetAgentIps()
Insert_NodeDelay(msg.Body.IP, ips, msg.Body.AgentDelay)

//从配置表里读取节点地域NodeLocation
//插入节点表的NodeStatus
Insert_Node(msg.Body.IP, msg.Body.IP, msg.Body.IPFSStatus, msg.Body.LocalDirStatus)
*/
}

+ 74
- 0
coordinator/internal/services/bucket.go View File

@@ -0,0 +1,74 @@
package services

import (
"database/sql"

"github.com/jmoiron/sqlx"
"gitlink.org.cn/cloudream/common/consts/errorcode"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/pkgs/mq"
"gitlink.org.cn/cloudream/storage-common/pkgs/db/model"
coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator"
)

func (svc *Service) GetBucket(userID int, bucketID int) (model.Bucket, error) {
// TODO
panic("not implement yet")
}

func (svc *Service) GetUserBuckets(msg *coormq.GetUserBuckets) (*coormq.GetUserBucketsResp, *mq.CodeMessage) {
buckets, err := svc.db.Bucket().GetUserBuckets(svc.db.SQLCtx(), msg.UserID)

if err != nil {
logger.WithField("UserID", msg.UserID).
Warnf("get user buckets failed, err: %s", err.Error())
return mq.ReplyFailed[coormq.GetUserBucketsResp](errorcode.OperationFailed, "get all buckets failed")
}

return mq.ReplyOK(coormq.NewGetUserBucketsResp(buckets))
}

func (svc *Service) GetBucketPackages(msg *coormq.GetBucketPackages) (*coormq.GetBucketPackagesResp, *mq.CodeMessage) {
packages, err := svc.db.Package().GetBucketPackages(svc.db.SQLCtx(), msg.UserID, msg.BucketID)

if err != nil {
logger.WithField("UserID", msg.UserID).
WithField("BucketID", msg.BucketID).
Warnf("get bucket packages failed, err: %s", err.Error())
return mq.ReplyFailed[coormq.GetBucketPackagesResp](errorcode.OperationFailed, "get bucket packages failed")
}

return mq.ReplyOK(coormq.NewGetBucketPackagesResp(packages))
}

func (svc *Service) CreateBucket(msg *coormq.CreateBucket) (*coormq.CreateBucketResp, *mq.CodeMessage) {
var bucketID int64
var err error
svc.db.DoTx(sql.LevelDefault, func(tx *sqlx.Tx) error {
// 这里用的是外部的err
bucketID, err = svc.db.Bucket().Create(tx, msg.UserID, msg.BucketName)
return err
})
if err != nil {
logger.WithField("UserID", msg.UserID).
WithField("BucketName", msg.BucketName).
Warnf("create bucket failed, err: %s", err.Error())
return mq.ReplyFailed[coormq.CreateBucketResp](errorcode.OperationFailed, "create bucket failed")
}

return mq.ReplyOK(coormq.NewCreateBucketResp(bucketID))
}

func (svc *Service) DeleteBucket(msg *coormq.DeleteBucket) (*coormq.DeleteBucketResp, *mq.CodeMessage) {
err := svc.db.DoTx(sql.LevelDefault, func(tx *sqlx.Tx) error {
return svc.db.Bucket().Delete(tx, msg.BucketID)
})
if err != nil {
logger.WithField("UserID", msg.UserID).
WithField("BucketID", msg.BucketID).
Warnf("delete bucket failed, err: %s", err.Error())
return mq.ReplyFailed[coormq.DeleteBucketResp](errorcode.OperationFailed, "delete bucket failed")
}

return mq.ReplyOK(coormq.NewDeleteBucketResp())
}

+ 29
- 0
coordinator/internal/services/cache.go View File

@@ -0,0 +1,29 @@
package services

import (
"gitlink.org.cn/cloudream/common/consts/errorcode"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/pkgs/mq"
coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator"
)

func (svc *Service) CachePackageMoved(msg *coormq.CachePackageMoved) (*coormq.CachePackageMovedResp, *mq.CodeMessage) {
pkg, err := svc.db.Package().GetByID(svc.db.SQLCtx(), msg.PackageID)
if err != nil {
logger.WithField("PackageID", msg.PackageID).
Warnf("getting package: %s", err.Error())
return nil, mq.Failed(errorcode.OperationFailed, "get package failed")
}

if pkg.Redundancy.IsRepInfo() {
// TODO 优先级
if err := svc.db.Cache().BatchCreatePinned(svc.db.SQLCtx(), msg.FileHashes, msg.NodeID, 0); err != nil {
logger.Warnf("batch creating pinned cache: %s", err.Error())
return nil, mq.Failed(errorcode.OperationFailed, "batch create pinned cache failed")
}
}

// TODO EC的逻辑

return mq.ReplyOK(coormq.NewCachePackageMovedResp())
}

+ 30
- 0
coordinator/internal/services/conmmon.go View File

@@ -0,0 +1,30 @@
package services

import (
"gitlink.org.cn/cloudream/common/consts/errorcode"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/pkgs/mq"
coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator"
)

func (svc *Service) FindClientLocation(msg *coormq.FindClientLocation) (*coormq.FindClientLocationResp, *mq.CodeMessage) {
location, err := svc.db.Location().FindLocationByExternalIP(svc.db.SQLCtx(), msg.IP)
if err != nil {
logger.WithField("IP", msg.IP).
Warnf("query client location failed, err: %s", err.Error())
return nil, mq.Failed(errorcode.OperationFailed, "query client location failed")
}

return mq.ReplyOK(coormq.NewFindClientLocationResp(location))
}

func (svc *Service) GetECConfig(msg *coormq.GetECConfig) (*coormq.GetECConfigResp, *mq.CodeMessage) {
ec, err := svc.db.Ec().GetEc(svc.db.SQLCtx(), msg.ECName)
if err != nil {
logger.WithField("ECName", msg.ECName).
Warnf("query ec failed, err: %s", err.Error())
return nil, mq.Failed(errorcode.OperationFailed, "query ec failed")
}

return mq.ReplyOK(coormq.NewGetECConfigResp(ec))
}

+ 36
- 0
coordinator/internal/services/node.go View File

@@ -0,0 +1,36 @@
package services

import (
"gitlink.org.cn/cloudream/common/consts/errorcode"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/pkgs/mq"
"gitlink.org.cn/cloudream/storage-common/pkgs/db/model"
coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator"
)

func (svc *Service) GetUserNodes(msg *coormq.GetUserNodes) (*coormq.GetUserNodesResp, *mq.CodeMessage) {
nodes, err := svc.db.Node().GetUserNodes(svc.db.SQLCtx(), msg.UserID)
if err != nil {
logger.WithField("UserID", msg.UserID).
Warnf("query user nodes failed, err: %s", err.Error())
return nil, mq.Failed(errorcode.OperationFailed, "query user nodes failed")
}

return mq.ReplyOK(coormq.NewGetUserNodesResp(nodes))
}

func (svc *Service) GetNodes(msg *coormq.GetNodes) (*coormq.GetNodesResp, *mq.CodeMessage) {
var nodes []model.Node
for _, id := range msg.NodeIDs {
node, err := svc.db.Node().GetByID(svc.db.SQLCtx(), id)
if err != nil {
logger.WithField("NodeID", id).
Warnf("query node failed, err: %s", err.Error())
return nil, mq.Failed(errorcode.OperationFailed, "query node failed")
}

nodes = append(nodes, node)
}

return mq.ReplyOK(coormq.NewGetNodesResp(nodes))
}

+ 32
- 0
coordinator/internal/services/object.go View File

@@ -0,0 +1,32 @@
package services

import (
"gitlink.org.cn/cloudream/common/consts/errorcode"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/pkgs/mq"
coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator"
)

func (svc *Service) GetPackageObjectRepData(msg *coormq.GetPackageObjectRepData) (*coormq.GetPackageObjectRepDataResp, *mq.CodeMessage) {
data, err := svc.db.ObjectRep().GetWithNodeIDInPackage(svc.db.SQLCtx(), msg.PackageID)
if err != nil {
logger.WithField("PackageID", msg.PackageID).
Warnf("query object rep and node id in package: %s", err.Error())

return nil, mq.Failed(errorcode.OperationFailed, "query object rep and node id in package failed")
}

return mq.ReplyOK(coormq.NewGetPackageObjectRepDataResp(data))
}

func (svc *Service) GetPackageObjectECData(msg *coormq.GetPackageObjectECData) (*coormq.GetPackageObjectECDataResp, *mq.CodeMessage) {
data, err := svc.db.ObjectBlock().GetWithNodeIDInPackage(svc.db.SQLCtx(), msg.PackageID)
if err != nil {
logger.WithField("PackageID", msg.PackageID).
Warnf("query object ec and node id in package: %s", err.Error())

return nil, mq.Failed(errorcode.OperationFailed, "query object ec and node id in package failed")
}

return mq.ReplyOK(coormq.NewGetPackageObjectECDataResp(data))
}

+ 309
- 0
coordinator/internal/services/package.go View File

@@ -0,0 +1,309 @@
package services

import (
"database/sql"
"fmt"
"sort"

"github.com/jmoiron/sqlx"
"gitlink.org.cn/cloudream/common/consts/errorcode"
"gitlink.org.cn/cloudream/common/models"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/pkgs/mq"
coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator"
scevt "gitlink.org.cn/cloudream/storage-common/pkgs/mq/scanner/event"
)

func (svc *Service) GetPackage(msg *coormq.GetPackage) (*coormq.GetPackageResp, *mq.CodeMessage) {
pkg, err := svc.db.Package().GetByID(svc.db.SQLCtx(), msg.PackageID)
if err != nil {
logger.WithField("PackageID", msg.PackageID).
Warnf("get package: %s", err.Error())

return nil, mq.Failed(errorcode.OperationFailed, "get package failed")
}

return mq.ReplyOK(coormq.NewGetPackageResp(pkg))
}

func (svc *Service) GetPackageObjects(msg *coormq.GetPackageObjects) (*coormq.GetPackageObjectsResp, *mq.CodeMessage) {
// TODO 检查用户是否有权限
objs, err := svc.db.Object().GetPackageObjects(svc.db.SQLCtx(), msg.PackageID)
if err != nil {
logger.WithField("PackageID", msg.PackageID).
Warnf("get package objects: %s", err.Error())

return nil, mq.Failed(errorcode.OperationFailed, "get package objects failed")
}

return mq.ReplyOK(coormq.NewGetPackageObjectsResp(objs))
}

func (svc *Service) CreatePackage(msg *coormq.CreatePackage) (*coormq.CreatePackageResp, *mq.CodeMessage) {
var pkgID int64
err := svc.db.DoTx(sql.LevelDefault, func(tx *sqlx.Tx) error {
var err error
pkgID, err = svc.db.Package().Create(svc.db.SQLCtx(), msg.BucketID, msg.Name, msg.Redundancy)
return err
})
if err != nil {
logger.WithField("BucketID", msg.BucketID).
WithField("Name", msg.Name).
Warnf("creating package: %s", err.Error())

return nil, mq.Failed(errorcode.OperationFailed, "creating package failed")
}

return mq.ReplyOK(coormq.NewCreatePackageResp(pkgID))
}

func (svc *Service) UpdateRepPackage(msg *coormq.UpdateRepPackage) (*coormq.UpdateRepPackageResp, *mq.CodeMessage) {
_, err := svc.db.Package().GetByID(svc.db.SQLCtx(), msg.PackageID)
if err != nil {
logger.WithField("PackageID", msg.PackageID).
Warnf("get package: %s", err.Error())

return nil, mq.Failed(errorcode.OperationFailed, "get package failed")
}

err = svc.db.DoTx(sql.LevelDefault, func(tx *sqlx.Tx) error {
// 先执行删除操作
if len(msg.Deletes) > 0 {
if err := svc.db.Object().BatchDelete(tx, msg.Deletes); err != nil {
return fmt.Errorf("deleting objects: %w", err)
}
}

// 再执行添加操作
if len(msg.Adds) > 0 {
if _, err := svc.db.Object().BatchAddRep(tx, msg.PackageID, msg.Adds); err != nil {
return fmt.Errorf("adding objects: %w", err)
}
}

return nil
})
if err != nil {
logger.Warn(err.Error())
return nil, mq.Failed(errorcode.OperationFailed, "update rep package failed")
}

// 紧急任务
var affectFileHashes []string
for _, add := range msg.Adds {
affectFileHashes = append(affectFileHashes, add.FileHash)
}

err = svc.scanner.PostEvent(scevt.NewCheckRepCount(affectFileHashes), true, true)
if err != nil {
logger.Warnf("post event to scanner failed, but this will not affect creating, err: %s", err.Error())
}

return mq.ReplyOK(coormq.NewUpdateRepPackageResp())
}

func (svc *Service) UpdateECPackage(msg *coormq.UpdateECPackage) (*coormq.UpdateECPackageResp, *mq.CodeMessage) {
_, err := svc.db.Package().GetByID(svc.db.SQLCtx(), msg.PackageID)
if err != nil {
logger.WithField("PackageID", msg.PackageID).
Warnf("get package: %s", err.Error())

return nil, mq.Failed(errorcode.OperationFailed, "get package failed")
}

err = svc.db.DoTx(sql.LevelDefault, func(tx *sqlx.Tx) error {
// 先执行删除操作
if len(msg.Deletes) > 0 {
if err := svc.db.Object().BatchDelete(tx, msg.Deletes); err != nil {
return fmt.Errorf("deleting objects: %w", err)
}
}

// 再执行添加操作
if len(msg.Adds) > 0 {
if _, err := svc.db.Object().BatchAddEC(tx, msg.PackageID, msg.Adds); err != nil {
return fmt.Errorf("adding objects: %w", err)
}
}

return nil
})
if err != nil {
logger.Warn(err.Error())
return nil, mq.Failed(errorcode.OperationFailed, "update ec package failed")
}

return mq.ReplyOK(coormq.NewUpdateECPackageResp())
}

func (svc *Service) DeletePackage(msg *coormq.DeletePackage) (*coormq.DeletePackageResp, *mq.CodeMessage) {
isAva, err := svc.db.Package().IsAvailable(svc.db.SQLCtx(), msg.UserID, msg.PackageID)
if err != nil {
logger.WithField("UserID", msg.UserID).
WithField("PackageID", msg.PackageID).
Warnf("check package available failed, err: %s", err.Error())
return mq.ReplyFailed[coormq.DeletePackageResp](errorcode.OperationFailed, "check package available failed")
}
if !isAva {
logger.WithField("UserID", msg.UserID).
WithField("PackageID", msg.PackageID).
Warnf("package is not available to the user")
return mq.ReplyFailed[coormq.DeletePackageResp](errorcode.OperationFailed, "package is not available to the user")
}

err = svc.db.DoTx(sql.LevelDefault, func(tx *sqlx.Tx) error {
return svc.db.Package().SoftDelete(tx, msg.PackageID)
})
if err != nil {
logger.WithField("UserID", msg.UserID).
WithField("PackageID", msg.PackageID).
Warnf("set package deleted failed, err: %s", err.Error())
return mq.ReplyFailed[coormq.DeletePackageResp](errorcode.OperationFailed, "set package deleted failed")
}

stgs, err := svc.db.StoragePackage().FindPackageStorages(svc.db.SQLCtx(), msg.PackageID)
if err != nil {
logger.Warnf("find package storages failed, but this will not affect the deleting, err: %s", err.Error())
return mq.ReplyOK(coormq.NewDeletePackageResp())
}

// 不追求及时、准确
if len(stgs) == 0 {
// 如果没有被引用,直接投递CheckPackage的任务
err := svc.scanner.PostEvent(scevt.NewCheckPackage([]int64{msg.PackageID}), false, false)
if err != nil {
logger.Warnf("post event to scanner failed, but this will not affect deleting, err: %s", err.Error())
}
logger.Debugf("post check package event")

} else {
// 有引用则让Agent去检查StoragePackage
for _, stg := range stgs {
err := svc.scanner.PostEvent(scevt.NewAgentCheckStorage(stg.StorageID, []int64{msg.PackageID}), false, false)
if err != nil {
logger.Warnf("post event to scanner failed, but this will not affect deleting, err: %s", err.Error())
}
}
logger.Debugf("post agent check storage event")
}

return mq.ReplyOK(coormq.NewDeletePackageResp())
}

func (svc *Service) GetPackageCachedNodes(msg *coormq.GetPackageCachedNodes) (*coormq.GetPackageCachedNodesResp, *mq.CodeMessage) {
isAva, err := svc.db.Package().IsAvailable(svc.db.SQLCtx(), msg.UserID, msg.PackageID)
if err != nil {
logger.WithField("UserID", msg.UserID).
WithField("PackageID", msg.PackageID).
Warnf("check package available failed, err: %s", err.Error())
return mq.ReplyFailed[coormq.GetPackageCachedNodesResp](errorcode.OperationFailed, "check package available failed")
}
if !isAva {
logger.WithField("UserID", msg.UserID).
WithField("PackageID", msg.PackageID).
Warnf("package is not available to the user")
return mq.ReplyFailed[coormq.GetPackageCachedNodesResp](errorcode.OperationFailed, "package is not available to the user")
}

pkg, err := svc.db.Package().GetByID(svc.db.SQLCtx(), msg.PackageID)
if err != nil {
logger.WithField("PackageID", msg.PackageID).
Warnf("get package: %s", err.Error())

return nil, mq.Failed(errorcode.OperationFailed, "get package failed")
}

var packageSize int64
nodeInfoMap := make(map[int64]*models.NodePackageCachingInfo)
if pkg.Redundancy.IsRepInfo() {
// 备份方式为rep
objectRepDatas, err := svc.db.ObjectRep().GetWithNodeIDInPackage(svc.db.SQLCtx(), msg.PackageID)
if err != nil {
logger.WithField("PackageID", msg.PackageID).
Warnf("get objectRepDatas by packageID failed, err: %s", err.Error())
return mq.ReplyFailed[coormq.GetPackageCachedNodesResp](errorcode.OperationFailed, "get objectRepDatas by packageID failed")
}

for _, data := range objectRepDatas {
packageSize += data.Object.Size
for _, nodeID := range data.NodeIDs {

nodeInfo, exists := nodeInfoMap[nodeID]
if !exists {
nodeInfo = &models.NodePackageCachingInfo{
NodeID: nodeID,
FileSize: data.Object.Size,
ObjectCount: 1,
}
} else {
nodeInfo.FileSize += data.Object.Size
nodeInfo.ObjectCount++
}
nodeInfoMap[nodeID] = nodeInfo
}
}
} else if pkg.Redundancy.IsECInfo() {
// 备份方式为ec
objectECDatas, err := svc.db.ObjectBlock().GetWithNodeIDInPackage(svc.db.SQLCtx(), msg.PackageID)
if err != nil {
logger.WithField("PackageID", msg.PackageID).
Warnf("get objectECDatas by packageID failed, err: %s", err.Error())
return mq.ReplyFailed[coormq.GetPackageCachedNodesResp](errorcode.OperationFailed, "get objectECDatas by packageID failed")
}

for _, ecData := range objectECDatas {
packageSize += ecData.Object.Size
for _, block := range ecData.Blocks {
for _, nodeID := range block.NodeIDs {

nodeInfo, exists := nodeInfoMap[nodeID]
if !exists {
nodeInfo = &models.NodePackageCachingInfo{
NodeID: nodeID,
FileSize: ecData.Object.Size,
ObjectCount: 1,
}
} else {
nodeInfo.FileSize += ecData.Object.Size
nodeInfo.ObjectCount++
}
nodeInfoMap[nodeID] = nodeInfo
}
}
}
} else {
logger.WithField("PackageID", msg.PackageID).
Warnf("Redundancy type %s is wrong", pkg.Redundancy.Type)
return mq.ReplyFailed[coormq.GetPackageCachedNodesResp](errorcode.OperationFailed, "redundancy type is wrong")
}

var nodeInfos []models.NodePackageCachingInfo
for _, nodeInfo := range nodeInfoMap {
nodeInfos = append(nodeInfos, *nodeInfo)
}

sort.Slice(nodeInfos, func(i, j int) bool {
return nodeInfos[i].NodeID < nodeInfos[j].NodeID
})
return mq.ReplyOK(coormq.NewGetPackageCachedNodesResp(nodeInfos, packageSize, pkg.Redundancy.Type))
}

func (svc *Service) GetPackageLoadedNodes(msg *coormq.GetPackageLoadedNodes) (*coormq.GetPackageLoadedNodesResp, *mq.CodeMessage) {
storages, err := svc.db.StoragePackage().FindPackageStorages(svc.db.SQLCtx(), msg.PackageID)
if err != nil {
logger.WithField("PackageID", msg.PackageID).
Warnf("get storages by packageID failed, err: %s", err.Error())
return mq.ReplyFailed[coormq.GetPackageLoadedNodesResp](errorcode.OperationFailed, "get storages by packageID failed")
}

uniqueNodeIDs := make(map[int64]bool)
var nodeIDs []int64
for _, stg := range storages {
if !uniqueNodeIDs[stg.NodeID] {
uniqueNodeIDs[stg.NodeID] = true
nodeIDs = append(nodeIDs, stg.NodeID)
}
}

return mq.ReplyOK(coormq.NewGetPackageLoadedNodesResp(nodeIDs))
}

+ 18
- 0
coordinator/internal/services/service.go View File

@@ -0,0 +1,18 @@
package services

import (
mydb "gitlink.org.cn/cloudream/storage-common/pkgs/db"
scmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/scanner"
)

type Service struct {
db *mydb.DB
scanner *scmq.Client
}

func NewService(db *mydb.DB, scanner *scmq.Client) *Service {
return &Service{
db: db,
scanner: scanner,
}
}

+ 38
- 0
coordinator/internal/services/storage.go View File

@@ -0,0 +1,38 @@
package services

import (
"database/sql"

"github.com/jmoiron/sqlx"
"gitlink.org.cn/cloudream/common/consts/errorcode"
"gitlink.org.cn/cloudream/common/pkgs/logger"

"gitlink.org.cn/cloudream/common/pkgs/mq"
coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator"
)

func (svc *Service) GetStorageInfo(msg *coormq.GetStorageInfo) (*coormq.GetStorageInfoResp, *mq.CodeMessage) {
stg, err := svc.db.Storage().GetUserStorage(svc.db.SQLCtx(), msg.UserID, msg.StorageID)
if err != nil {
logger.Warnf("getting user storage: %s", err.Error())
return nil, mq.Failed(errorcode.OperationFailed, "get user storage failed")
}

return mq.ReplyOK(coormq.NewGetStorageInfoResp(stg.StorageID, stg.Name, stg.NodeID, stg.Directory, stg.State))
}

func (svc *Service) StoragePackageLoaded(msg *coormq.StoragePackageLoaded) (*coormq.StoragePackageLoadedResp, *mq.CodeMessage) {
// TODO: 对于的storage中已经存在的文件,直接覆盖已有文件
err := svc.db.DoTx(sql.LevelDefault, func(tx *sqlx.Tx) error {
return svc.db.StoragePackage().LoadPackage(tx, msg.PackageID, msg.StorageID, msg.UserID)
})
if err != nil {
logger.WithField("UserID", msg.UserID).
WithField("PackageID", msg.PackageID).
WithField("StorageID", msg.StorageID).
Warnf("user load package to storage failed, err: %s", err.Error())
return nil, mq.Failed(errorcode.OperationFailed, "user load package to storage failed")
}

return mq.ReplyOK(coormq.NewStoragePackageLoadedResp())
}

+ 20
- 0
coordinator/magefiles/magefile.go View File

@@ -0,0 +1,20 @@
//go:build mage

package main

import (
"gitlink.org.cn/cloudream/common/magefiles"

//mage:import
_ "gitlink.org.cn/cloudream/common/magefiles/targets"
)

var Default = Build

func Build() error {
return magefiles.Build(magefiles.BuildArgs{
OutputName: "coordinator",
OutputDir: "coordinator",
AssetsDir: "assets",
})
}

+ 63
- 0
coordinator/main.go View File

@@ -0,0 +1,63 @@
package main

import (
"fmt"
"os"

"gitlink.org.cn/cloudream/common/pkgs/logger"
mydb "gitlink.org.cn/cloudream/storage-common/pkgs/db"
coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator"
scmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/scanner"
"gitlink.org.cn/cloudream/storage-coordinator/internal/config"
"gitlink.org.cn/cloudream/storage-coordinator/internal/services"
)

func main() {
err := config.Init()
if err != nil {
fmt.Printf("init config failed, err: %s", err.Error())
os.Exit(1)
}

err = logger.Init(&config.Cfg().Logger)
if err != nil {
fmt.Printf("init logger failed, err: %s", err.Error())
os.Exit(1)
}

db, err := mydb.NewDB(&config.Cfg().DB)
if err != nil {
logger.Fatalf("new db failed, err: %s", err.Error())
}

scanner, err := scmq.NewClient(&config.Cfg().RabbitMQ)
if err != nil {
logger.Fatalf("new scanner client failed, err: %s", err.Error())
}

coorSvr, err := coormq.NewServer(services.NewService(db, scanner), &config.Cfg().RabbitMQ)
if err != nil {
logger.Fatalf("new coordinator server failed, err: %s", err.Error())
}

coorSvr.OnError = func(err error) {
logger.Warnf("coordinator server err: %s", err.Error())
}

// 启动服务
go serveCoorServer(coorSvr)

forever := make(chan bool)
<-forever
}

func serveCoorServer(server *coormq.Server) {
logger.Info("start serving command server")

err := server.Serve()
if err != nil {
logger.Errorf("command server stopped with error: %s", err.Error())
}

logger.Info("command server stopped")
}

Loading…
Cancel
Save