|
- package s3
-
- import (
- "bytes"
- "context"
- "crypto/sha256"
- "errors"
- "fmt"
- "io"
-
- "github.com/aws/aws-sdk-go-v2/aws"
- "github.com/aws/aws-sdk-go-v2/service/s3"
- s3types "github.com/aws/aws-sdk-go-v2/service/s3/types"
- "gitlink.org.cn/cloudream/common/pkgs/logger"
- "gitlink.org.cn/cloudream/common/utils/io2"
- clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types"
- "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/types"
- )
-
- type BaseStore struct {
- Detail *clitypes.UserSpaceDetail
- Bucket string
- cli *s3.Client
- opt BaseStoreOption
- }
-
- type BaseStoreOption struct {
- UseAWSSha256 bool // 能否直接使用AWS提供的SHA256校验,如果不行,则使用本地计算。默认使用本地计算。
- }
-
- func NewBaseStore(detail *clitypes.UserSpaceDetail, cli *s3.Client, bkt string, opt BaseStoreOption) (*BaseStore, error) {
- return &BaseStore{
- Detail: detail,
- Bucket: bkt,
- cli: cli,
- opt: opt,
- }, nil
- }
-
- func (s *BaseStore) Write(objPath string, stream io.Reader) (types.FileInfo, error) {
- key := objPath
-
- counter := io2.Counter(stream)
-
- if s.opt.UseAWSSha256 {
- resp, err := s.cli.PutObject(context.TODO(), &s3.PutObjectInput{
- Bucket: aws.String(s.Bucket),
- Key: aws.String(key),
- Body: counter,
- ChecksumAlgorithm: s3types.ChecksumAlgorithmSha256,
- })
- if err != nil {
- return types.FileInfo{}, err
- }
- if resp.ChecksumSHA256 == nil {
- return types.FileInfo{}, errors.New("SHA256 checksum not found in response")
- }
-
- hash, err := DecodeBase64Hash(*resp.ChecksumSHA256)
- if err != nil {
- return types.FileInfo{}, fmt.Errorf("decode SHA256 checksum: %v", err)
- }
-
- return types.FileInfo{
- Path: key,
- Hash: clitypes.NewFullHash(hash),
- Size: counter.Count(),
- }, nil
- }
-
- hashStr := io2.NewReadHasher(sha256.New(), counter)
- _, err := s.cli.PutObject(context.TODO(), &s3.PutObjectInput{
- Bucket: aws.String(s.Bucket),
- Key: aws.String(key),
- Body: counter,
- })
- if err != nil {
- return types.FileInfo{}, err
- }
-
- return types.FileInfo{
- Path: key,
- Hash: clitypes.NewFullHash(hashStr.Sum()),
- Size: counter.Count(),
- }, nil
- }
-
- func (s *BaseStore) Read(objPath string, opt types.OpenOption) (io.ReadCloser, error) {
- key := objPath
-
- rngStr := fmt.Sprintf("bytes=%d-", opt.Offset)
- if opt.Length >= 0 {
- rngStr += fmt.Sprintf("%d", opt.Offset+opt.Length-1)
- }
-
- resp, err := s.cli.GetObject(context.TODO(), &s3.GetObjectInput{
- Bucket: aws.String(s.Bucket),
- Key: aws.String(key),
- Range: aws.String(rngStr),
- })
-
- if err != nil {
- return nil, err
- }
-
- return resp.Body, nil
- }
-
- func (s *BaseStore) Mkdir(path string) error {
- _, err := s.cli.PutObject(context.TODO(), &s3.PutObjectInput{
- Bucket: aws.String(s.Bucket),
- Key: aws.String(path + "/"),
- Body: bytes.NewReader([]byte{}),
- })
- return err
- }
-
- func (s *BaseStore) ListAll(path string) ([]types.ListEntry, error) {
- key := path
- // TODO 待测试
-
- input := &s3.ListObjectsInput{
- Bucket: aws.String(s.Bucket),
- Prefix: aws.String(key),
- Delimiter: aws.String("/"),
- }
-
- var objs []types.ListEntry
-
- var marker *string
- for {
- input.Marker = marker
- resp, err := s.cli.ListObjects(context.Background(), input)
- if err != nil {
- return nil, err
- }
-
- for _, obj := range resp.Contents {
- objs = append(objs, types.ListEntry{
- Path: *obj.Key,
- Size: *obj.Size,
- IsDir: false,
- })
- }
-
- if !*resp.IsTruncated {
- break
- }
-
- marker = resp.NextMarker
- }
-
- return objs, nil
- }
-
- func (s *BaseStore) CleanTemps() {
- log := s.getLogger()
-
- var deletes []s3types.ObjectIdentifier
- deleteObjs := make(map[string]s3types.Object)
- var marker *string
- for {
- resp, err := s.cli.ListObjects(context.Background(), &s3.ListObjectsInput{
- Bucket: aws.String(s.Bucket),
- Prefix: aws.String(types.PathJoin(s.Detail.UserSpace.WorkingDir, types.TempWorkingDir, "/")),
- Marker: marker,
- })
-
- if err != nil {
- log.Warnf("read temp dir: %v", err)
- return
- }
-
- for _, obj := range resp.Contents {
- deletes = append(deletes, s3types.ObjectIdentifier{
- Key: obj.Key,
- })
- deleteObjs[*obj.Key] = obj
- }
-
- if !*resp.IsTruncated {
- break
- }
-
- marker = resp.NextMarker
- }
-
- if len(deletes) == 0 {
- return
- }
-
- resp, err := s.cli.DeleteObjects(context.Background(), &s3.DeleteObjectsInput{
- Bucket: aws.String(s.Bucket),
- Delete: &s3types.Delete{
- Objects: deletes,
- },
- })
- if err != nil {
- log.Warnf("delete temp files: %v", err)
- return
- }
-
- for _, del := range resp.Deleted {
- obj := deleteObjs[*del.Key]
- log.Infof("remove unused temp file %v, size: %v, last mod time: %v", *obj.Key, *obj.Size, *obj.LastModified)
- }
- }
-
- func (s *BaseStore) getLogger() logger.Logger {
- return logger.WithField("BaseStore", "S3").WithField("Storage", s.Detail.UserSpace.Storage.String())
- }
|