Browse Source

简单增加serverless的feature

gitlink
Sydonian 1 year ago
parent
commit
1cb5c75b8f
6 changed files with 84 additions and 4 deletions
  1. +60
    -0
      common/pkgs/ioswitch2/ops2/faas.go
  2. +2
    -2
      common/pkgs/storage/local/temp_store.go
  3. +1
    -0
      common/pkgs/storage/obs/faas.go
  4. +8
    -0
      common/pkgs/storage/types/bypass.go
  5. +8
    -0
      common/pkgs/storage/types/faas.go
  6. +5
    -2
      common/pkgs/storage/types/temp_store.go

+ 60
- 0
common/pkgs/ioswitch2/ops2/faas.go View File

@@ -0,0 +1,60 @@
package ops2

import (
"fmt"

"github.com/samber/lo"
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/mgr"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/types"
)

type InternalFaaSGalMultiply struct {
Coefs [][]byte `json:"coefs"`
InputFilePathes []exec.VarID `json:"inputFilePathes"` // 输入的文件的路径
OutputFilePathes []exec.VarID `json:"outputFilePathes"` // 输出的文件的路径
ChunkSize int `json:"chunkSize"`
StorageID cdssdk.StorageID `json:"storageID"`
}

func (o *InternalFaaSGalMultiply) Execute(ctx *exec.ExecContext, e *exec.Executor) error {
stgMgr, err := exec.GetValueByType[*mgr.Manager](ctx)
if err != nil {
return err
}

fass, err := mgr.GetComponent[types.InternalFaaSCall](stgMgr, o.StorageID)
if err != nil {
return fmt.Errorf("getting faas component: %w", err)
}

tmp, err := mgr.GetComponent[types.TempStore](stgMgr, o.StorageID)
if err != nil {
return fmt.Errorf("getting temp store component: %w", err)
}

inputVars, err := exec.BindArray[*exec.StringValue](e, ctx.Context, o.InputFilePathes)
if err != nil {
return err
}

var outputs []string
for i := 0; i < len(o.OutputFilePathes); i++ {
outputs = append(outputs, tmp.CreateTemp())
}
var outputVars []*exec.StringValue
for _, output := range outputs {
outputVars = append(outputVars, &exec.StringValue{Value: output})
}

inputs := lo.Map(inputVars, func(v *exec.StringValue, idx int) string { return v.Value })

err = fass.GalMultiply(ctx.Context, o.Coefs, inputs, outputs, o.ChunkSize)
if err != nil {
return fmt.Errorf("faas gal multiply: %w", err)
}

exec.PutArray(e, o.OutputFilePathes, outputVars)
return nil
}

+ 2
- 2
common/pkgs/storage/local/temp_store.go View File

@@ -3,11 +3,11 @@ package local
import cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"

type LocalTempStore struct {
cfg cdssdk.BypassUploadFeature
cfg cdssdk.BypassWriteFeature
stg cdssdk.Storage
}

func NewLocalTempStore(stg cdssdk.Storage, cfg cdssdk.BypassUploadFeature) *LocalTempStore {
func NewLocalTempStore(stg cdssdk.Storage, cfg cdssdk.BypassWriteFeature) *LocalTempStore {
return &LocalTempStore{
cfg: cfg,
stg: stg,


+ 1
- 0
common/pkgs/storage/obs/faas.go View File

@@ -0,0 +1 @@
package obs

+ 8
- 0
common/pkgs/storage/types/bypass.go View File

@@ -0,0 +1,8 @@
package types

import "io"

type BypassWriter interface {
StorageComponent
Write(stream io.Reader) (string, error)
}

+ 8
- 0
common/pkgs/storage/types/faas.go View File

@@ -0,0 +1,8 @@
package types

import "context"

type InternalFaaSCall interface {
StorageComponent
GalMultiply(ctx context.Context, coef [][]byte, inputs []string, outputs []string, chunkSize int) error
}

+ 5
- 2
common/pkgs/storage/types/temp_store.go View File

@@ -2,7 +2,10 @@ package types

type TempStore interface {
StorageComponent
// 生成并注册一个临时文件名。在名字有效期间此临时文件不会被清理
CreateTemp() string
Commited(objectName string)
Drop(objectName string)
// 指示一个临时文件已经被移动作它用,不需要再关注它了(也不需要删除这个文件)。
Commited(filePath string)
// 临时文件被放弃,可以删除这个文件了
Drop(filePath string)
}

Loading…
Cancel
Save