Browse Source

支持通用的S3存储

gitlink
Sydonian 1 year ago
parent
commit
56b474a691
3 changed files with 245 additions and 1 deletions
  1. +52
    -0
      common/pkgs/storage/s3/agent.go
  2. +62
    -1
      common/pkgs/storage/s3/public_store.go
  3. +131
    -0
      common/pkgs/storage/s3/s3.go

+ 52
- 0
common/pkgs/storage/s3/agent.go View File

@@ -0,0 +1,52 @@
package s3

import (
stgmod "gitlink.org.cn/cloudream/storage/common/models"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/types"
)

type Agent struct {
Detail stgmod.StorageDetail
ShardStore *ShardStore
PublicStore *PublicStore
}

func (s *Agent) Start(ch *types.StorageEventChan) {
if s.ShardStore != nil {
s.ShardStore.Start(ch)
}

if s.PublicStore != nil {
s.PublicStore.Start(ch)
}
}

func (a *Agent) Stop() {
if a.ShardStore != nil {
a.ShardStore.Stop()
}

if a.PublicStore != nil {
a.PublicStore.Stop()
}
}

func (a *Agent) Info() stgmod.StorageDetail {
return a.Detail
}

func (a *Agent) GetShardStore() (types.ShardStore, error) {
if a.ShardStore == nil {
return nil, types.ErrUnsupported
}

return a.ShardStore, nil
}

func (a *Agent) GetPublicStore() (types.PublicStore, error) {
if a.PublicStore == nil {
return nil, types.ErrUnsupported
}

return a.PublicStore, nil
}

+ 62
- 1
common/pkgs/storage/s3/public_store.go View File

@@ -1,7 +1,68 @@
package s3

import "gitlink.org.cn/cloudream/storage/common/pkgs/storage/types"
import (
"context"
"io"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/s3"
"gitlink.org.cn/cloudream/common/pkgs/logger"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
stgmod "gitlink.org.cn/cloudream/storage/common/models"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/types"
)

type PublicStoreDesc struct {
types.EmptyPublicStoreDesc
Detail *stgmod.StorageDetail
}

func NewPublicStoreDesc(detail *stgmod.StorageDetail) PublicStoreDesc {
return PublicStoreDesc{
Detail: detail,
}
}

func (d *PublicStoreDesc) Enabled() bool {
return d.Detail.Storage.PublicStore != nil
}

type PublicStore struct {
Detail stgmod.StorageDetail
Bucket string
cli *s3.Client
cfg cdssdk.S3PublicStorage
}

func NewPublicStore(detail stgmod.StorageDetail, cli *s3.Client, bkt string, cfg cdssdk.S3PublicStorage) (*PublicStore, error) {
return &PublicStore{
Detail: detail,
Bucket: bkt,
cli: cli,
cfg: cfg,
}, nil
}

func (s *PublicStore) Start(ch *types.StorageEventChan) {
s.getLogger().Infof("component start, LoadBase: %v", s.cfg.LoadBase)
}

func (s *PublicStore) Stop() {
s.getLogger().Infof("component stop")
}

func (s *PublicStore) Write(objPath string, stream io.Reader) error {
key := JoinKey(objPath, s.cfg.LoadBase)

_, err := s.cli.PutObject(context.TODO(), &s3.PutObjectInput{
Bucket: aws.String(s.Bucket),
Key: aws.String(key),
Body: stream,
})

return err
}

func (s *PublicStore) getLogger() logger.Logger {
return logger.WithField("PublicStore", "S3").WithField("Storage", s.Detail.Storage.String())
}

+ 131
- 0
common/pkgs/storage/s3/s3.go View File

@@ -0,0 +1,131 @@
package s3

import (
"fmt"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/service/s3"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
stgmod "gitlink.org.cn/cloudream/storage/common/models"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/factory/reg"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/types"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/utils"
)

func init() {
reg.RegisterBuilder[*cdssdk.S3Type](newBuilder)
}

type builder struct {
types.EmptyBuilder
detail stgmod.StorageDetail
}

func newBuilder(detail stgmod.StorageDetail) types.StorageBuilder {
return &builder{
detail: detail,
}
}

func (b *builder) CreateAgent() (types.StorageAgent, error) {
s3Type, ok := b.detail.Storage.Type.(*cdssdk.S3Type)
if !ok {
return nil, fmt.Errorf("invalid storage type %T for obs agent", b.detail.Storage.Type)
}

agt := &Agent{
Detail: b.detail,
}

if b.detail.Storage.ShardStore != nil {
cfg, ok := b.detail.Storage.ShardStore.(*cdssdk.S3ShardStorage)
if !ok {
return nil, fmt.Errorf("invalid shard store type %T for local storage", b.detail.Storage.ShardStore)
}

cli, bkt, err := createClient(s3Type)
if err != nil {
return nil, err
}

store, err := NewShardStore(b.detail, cli, bkt, *cfg, ShardStoreOption{UseAWSSha256: true})
if err != nil {
return nil, err
}

agt.ShardStore = store
}

if b.detail.Storage.PublicStore != nil {
cfg, ok := b.detail.Storage.PublicStore.(*cdssdk.S3PublicStorage)
if !ok {
return nil, fmt.Errorf("invalid public store type %T for local storage", b.detail.Storage.PublicStore)
}

cli, bkt, err := createClient(s3Type)
if err != nil {
return nil, err
}

store, err := NewPublicStore(b.detail, cli, bkt, *cfg)
if err != nil {
return nil, err
}

agt.PublicStore = store
}

return agt, nil
}

func (b *builder) ShardStoreDesc() types.ShardStoreDesc {
desc := NewShardStoreDesc(&b.detail)
return &desc
}

func (b *builder) PublicStoreDesc() types.PublicStoreDesc {
desc := NewPublicStoreDesc(&b.detail)
return &desc
}

func createClient(addr *cdssdk.S3Type) (*s3.Client, string, error) {
awsConfig := aws.Config{}

if addr.AK != "" && addr.SK != "" {
cre := aws.Credentials{
AccessKeyID: addr.AK,
SecretAccessKey: addr.SK,
}
awsConfig.Credentials = &credentials.StaticCredentialsProvider{Value: cre}
}

awsConfig.Region = addr.Region

options := []func(*s3.Options){}
options = append(options, func(s3Opt *s3.Options) {
s3Opt.BaseEndpoint = &addr.Endpoint
})

cli := s3.NewFromConfig(awsConfig, options...)
return cli, addr.Bucket, nil
}

func (b *builder) CreateMultiparter() (types.Multiparter, error) {
feat := utils.FindFeature[*cdssdk.MultipartUploadFeature](b.detail)
if feat == nil {
return nil, fmt.Errorf("feature %T not found", cdssdk.MultipartUploadFeature{})
}

cli, bucket, err := createClient(b.detail.Storage.Type.(*cdssdk.S3Type))
if err != nil {
return nil, err
}

return NewMultiparter(
b.detail,
feat,
bucket,
cli,
), nil
}

Loading…
Cancel
Save