From 102b7da935711552c2de646851e67ddc4456c475 Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Wed, 30 Aug 2023 16:48:57 +0800 Subject: [PATCH 1/2] =?UTF-8?q?=E4=BC=98=E5=8C=96serder?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- client/internal/services/scanner.go | 6 ++-- common/pkgs/mq/scanner/event.go | 38 +++++++----------------- common/pkgs/mq/scanner/event/event.go | 18 +++-------- coordinator/internal/services/package.go | 7 +++-- magefiles/main.go | 11 ++----- scanner/internal/event/event.go | 3 +- scanner/internal/services/event.go | 10 +------ 7 files changed, 29 insertions(+), 64 deletions(-) diff --git a/client/internal/services/scanner.go b/client/internal/services/scanner.go index 7acb249..2ad0869 100644 --- a/client/internal/services/scanner.go +++ b/client/internal/services/scanner.go @@ -4,6 +4,8 @@ import ( "fmt" "gitlink.org.cn/cloudream/storage/common/globals" + scmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/scanner" + scevt "gitlink.org.cn/cloudream/storage/common/pkgs/mq/scanner/event" ) type ScannerService struct { @@ -14,14 +16,14 @@ func (svc *Service) ScannerSvc() *ScannerService { return &ScannerService{Service: svc} } -func (svc *ScannerService) PostEvent(event any, isEmergency bool, dontMerge bool) error { +func (svc *ScannerService) PostEvent(event scevt.Event, isEmergency bool, dontMerge bool) error { scCli, err := globals.ScannerMQPool.Acquire() if err != nil { return fmt.Errorf("new scacnner client: %w", err) } defer scCli.Close() - err = scCli.PostEvent(event, isEmergency, dontMerge) + err = scCli.PostEvent(scmq.NewPostEvent(event, isEmergency, dontMerge)) if err != nil { return fmt.Errorf("request to scanner failed, err: %w", err) } diff --git a/common/pkgs/mq/scanner/event.go b/common/pkgs/mq/scanner/event.go index c94dd2c..6062d01 100644 --- a/common/pkgs/mq/scanner/event.go +++ b/common/pkgs/mq/scanner/event.go @@ -1,9 +1,6 @@ package scanner import ( - "fmt" - "time" - "gitlink.org.cn/cloudream/common/pkgs/mq" scevt "gitlink.org.cn/cloudream/storage/common/pkgs/mq/scanner/event" ) @@ -16,35 +13,22 @@ type EventService interface { var _ = RegisterNoReply(EventService.PostEvent) type PostEvent struct { - Event map[string]any `json:"event"` - IsEmergency bool `json:"isEmergency"` // 重要消息,优先处理 - DontMerge bool `json:"dontMerge"` // 不可合并此消息 + Event scevt.Event `json:"event"` + IsEmergency bool `json:"isEmergency"` // 重要消息,优先处理 + DontMerge bool `json:"dontMerge"` // 不可合并此消息 } -func NewPostEvent(event any, isEmergency bool, dontMerge bool) (PostEvent, error) { - mp, err := scevt.MessageToMap(event) - if err != nil { - return PostEvent{}, fmt.Errorf("message to map failed, err: %w", err) - } - +func NewPostEvent(event scevt.Event, isEmergency bool, dontMerge bool) PostEvent { return PostEvent{ - Event: mp, + Event: event, IsEmergency: isEmergency, DontMerge: dontMerge, - }, nil -} -func (cli *Client) PostEvent(event any, isEmergency bool, dontMerge bool, opts ...mq.SendOption) error { - opt := mq.SendOption{ - Timeout: time.Second * 30, - } - if len(opts) > 0 { - opt = opts[0] - } - - body, err := NewPostEvent(event, isEmergency, dontMerge) - if err != nil { - return fmt.Errorf("new post event body failed, err: %w", err) } +} +func (client *Client) PostEvent(msg PostEvent) error { + return mq.Send[PostEvent](client.rabbitCli, msg) +} - return mq.Send(cli.rabbitCli, body, opt) +func init() { + mq.RegisterUnionType(scevt.EventTypeUnino) } diff --git a/common/pkgs/mq/scanner/event/event.go b/common/pkgs/mq/scanner/event/event.go index 9281ca2..d989d5f 100644 --- a/common/pkgs/mq/scanner/event/event.go +++ b/common/pkgs/mq/scanner/event/event.go @@ -5,21 +5,11 @@ import ( "gitlink.org.cn/cloudream/common/utils/serder" ) -var typeResolver = serder.NewTypeNameResolver(true) +type Event interface{} -var serderOption = serder.TypedSerderOption{ - TypeResolver: &typeResolver, - TypeFieldName: "@type", -} - -func MapToMessage(m map[string]any) (any, error) { - return serder.TypedMapToObject(m, serderOption) -} - -func MessageToMap(msg any) (map[string]any, error) { - return serder.ObjectToTypedMap(msg, serderOption) -} +var eventUnionEles = serder.NewTypeNameResolver(true) +var EventTypeUnino = serder.NewTypeUnion[Event]("@type", eventUnionEles) func Register[T any]() { - typeResolver.Register(myreflect.TypeOf[T]()) + eventUnionEles.Register(myreflect.TypeOf[T]()) } diff --git a/coordinator/internal/services/package.go b/coordinator/internal/services/package.go index 02adee3..4098dd5 100644 --- a/coordinator/internal/services/package.go +++ b/coordinator/internal/services/package.go @@ -11,6 +11,7 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/mq" coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" + scmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/scanner" scevt "gitlink.org.cn/cloudream/storage/common/pkgs/mq/scanner/event" ) @@ -94,7 +95,7 @@ func (svc *Service) UpdateRepPackage(msg *coormq.UpdateRepPackage) (*coormq.Upda affectFileHashes = append(affectFileHashes, add.FileHash) } - err = svc.scanner.PostEvent(scevt.NewCheckRepCount(affectFileHashes), true, true) + err = svc.scanner.PostEvent(scmq.NewPostEvent(scevt.NewCheckRepCount(affectFileHashes), true, true)) if err != nil { logger.Warnf("post event to scanner failed, but this will not affect creating, err: %s", err.Error()) } @@ -170,7 +171,7 @@ func (svc *Service) DeletePackage(msg *coormq.DeletePackage) (*coormq.DeletePack // 不追求及时、准确 if len(stgs) == 0 { // 如果没有被引用,直接投递CheckPackage的任务 - err := svc.scanner.PostEvent(scevt.NewCheckPackage([]int64{msg.PackageID}), false, false) + err := svc.scanner.PostEvent(scmq.NewPostEvent(scevt.NewCheckPackage([]int64{msg.PackageID}), false, false)) if err != nil { logger.Warnf("post event to scanner failed, but this will not affect deleting, err: %s", err.Error()) } @@ -179,7 +180,7 @@ func (svc *Service) DeletePackage(msg *coormq.DeletePackage) (*coormq.DeletePack } else { // 有引用则让Agent去检查StoragePackage for _, stg := range stgs { - err := svc.scanner.PostEvent(scevt.NewAgentCheckStorage(stg.StorageID, []int64{msg.PackageID}), false, false) + err := svc.scanner.PostEvent(scmq.NewPostEvent(scevt.NewAgentCheckStorage(stg.StorageID, []int64{msg.PackageID}), false, false)) if err != nil { logger.Warnf("post event to scanner failed, but this will not affect deleting, err: %s", err.Error()) } diff --git a/magefiles/main.go b/magefiles/main.go index f59c3cd..bd70eb4 100644 --- a/magefiles/main.go +++ b/magefiles/main.go @@ -17,24 +17,19 @@ const ( BuildDir = "./build" ) -var Global = struct { - OS string - Arch string -}{} - // [配置项]设置编译平台为windows func Win() { - Global.OS = "win" + magefiles.Global.OS = "win" } // [配置项]设置编译平台为linux func Linux() { - Global.OS = "linux" + magefiles.Global.OS = "linux" } // [配置项]设置编译架构为amd64 func AMD64() { - Global.Arch = "amd64" + magefiles.Global.Arch = "amd64" } func All() error { diff --git a/scanner/internal/event/event.go b/scanner/internal/event/event.go index e117dc1..27a1aba 100644 --- a/scanner/internal/event/event.go +++ b/scanner/internal/event/event.go @@ -8,6 +8,7 @@ import ( event "gitlink.org.cn/cloudream/common/pkgs/event" "gitlink.org.cn/cloudream/common/pkgs/typedispatcher" mydb "gitlink.org.cn/cloudream/storage/common/pkgs/db" + scevt "gitlink.org.cn/cloudream/storage/common/pkgs/mq/scanner/event" ) type ExecuteArgs struct { @@ -32,7 +33,7 @@ func NewExecutor(db *mydb.DB, distLock *distlocksvc.Service) Executor { var msgDispatcher = typedispatcher.NewTypeDispatcher[Event]() -func FromMessage(msg any) (Event, error) { +func FromMessage(msg scevt.Event) (Event, error) { event, ok := msgDispatcher.Dispatch(msg) if !ok { return nil, fmt.Errorf("unknow event message type: %s", reflect.TypeOf(msg).Name()) diff --git a/scanner/internal/services/event.go b/scanner/internal/services/event.go index c7ceb2a..9333c13 100644 --- a/scanner/internal/services/event.go +++ b/scanner/internal/services/event.go @@ -3,19 +3,11 @@ package services import ( "gitlink.org.cn/cloudream/common/pkgs/logger" scmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/scanner" - scevt "gitlink.org.cn/cloudream/storage/common/pkgs/mq/scanner/event" "gitlink.org.cn/cloudream/storage/scanner/internal/event" ) func (svc *Service) PostEvent(msg *scmq.PostEvent) { - - evtMsg, err := scevt.MapToMessage(msg.Event) - if err != nil { - logger.Warnf("convert map to event message failed, err: %s", err.Error()) - return - } - - evt, err := event.FromMessage(evtMsg) + evt, err := event.FromMessage(msg.Event) if err != nil { logger.Warnf("create event from event message failed, err: %s", err.Error()) return From 745db0931a9b92b4b31f68b312cec8eceb1bba9d Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Fri, 1 Sep 2023 10:05:52 +0800 Subject: [PATCH 2/2] =?UTF-8?q?=E4=BC=98=E5=8C=96iterator?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- client/internal/cmdline/package.go | 34 ++++++++------ client/internal/http/package.go | 26 +++++++++-- common/pkgs/cmd/create_ec_package.go | 23 ++++++---- common/pkgs/cmd/create_rep_package.go | 27 ++++++----- common/pkgs/cmd/download_package.go | 34 ++++++++------ common/pkgs/iterator/ec_object_iterator.go | 2 +- .../pkgs/iterator/http_uploading_iterator.go | 45 ------------------- common/pkgs/iterator/iterator.go | 9 ++-- common/pkgs/iterator/rep_object_iterator.go | 2 +- 9 files changed, 102 insertions(+), 100 deletions(-) delete mode 100644 common/pkgs/iterator/http_uploading_iterator.go diff --git a/client/internal/cmdline/package.go b/client/internal/cmdline/package.go index cf2538f..2837d07 100644 --- a/client/internal/cmdline/package.go +++ b/client/internal/cmdline/package.go @@ -55,24 +55,32 @@ func PackageDownloadPackage(ctx CommandContext, outputDir string, packageID int6 if err != nil { return err } - defer objInfo.File.Close() - fullPath := filepath.Join(outputDir, objInfo.Object.Path) + err = func() error { + defer objInfo.File.Close() - dirPath := filepath.Dir(fullPath) - if err := os.MkdirAll(dirPath, 0755); err != nil { - return fmt.Errorf("creating object dir: %w", err) - } + fullPath := filepath.Join(outputDir, objInfo.Object.Path) - outputFile, err := os.Create(fullPath) - if err != nil { - return fmt.Errorf("creating object file: %w", err) - } - defer outputFile.Close() + dirPath := filepath.Dir(fullPath) + if err := os.MkdirAll(dirPath, 0755); err != nil { + return fmt.Errorf("creating object dir: %w", err) + } + + outputFile, err := os.Create(fullPath) + if err != nil { + return fmt.Errorf("creating object file: %w", err) + } + defer outputFile.Close() + + _, err = io.Copy(outputFile, objInfo.File) + if err != nil { + return fmt.Errorf("copy object data to local file failed, err: %w", err) + } - _, err = io.Copy(outputFile, objInfo.File) + return nil + }() if err != nil { - return fmt.Errorf("copy object data to local file failed, err: %w", err) + return err } } diff --git a/client/internal/http/package.go b/client/internal/http/package.go index ef425a9..8eb5b2e 100644 --- a/client/internal/http/package.go +++ b/client/internal/http/package.go @@ -8,8 +8,10 @@ import ( "github.com/gin-gonic/gin" "gitlink.org.cn/cloudream/common/consts/errorcode" "gitlink.org.cn/cloudream/common/models" + "gitlink.org.cn/cloudream/common/pkgs/iterator" "gitlink.org.cn/cloudream/common/pkgs/logger" - "gitlink.org.cn/cloudream/storage/common/pkgs/iterator" + + stgiter "gitlink.org.cn/cloudream/storage/common/pkgs/iterator" ) type PackageService struct { @@ -72,7 +74,7 @@ func (s *PackageService) uploadRep(ctx *gin.Context, req *PackageUploadReq) { return } - objIter := iterator.NewHTTPObjectIterator(req.Files) + objIter := mapMultiPartFileToUploadingObject(req.Files) taskID, err := s.svc.PackageSvc().StartCreatingRepPackage(*req.Info.UserID, *req.Info.BucketID, req.Info.Name, objIter, repInfo) @@ -116,7 +118,7 @@ func (s *PackageService) uploadEC(ctx *gin.Context, req *PackageUploadReq) { return } - objIter := iterator.NewHTTPObjectIterator(req.Files) + objIter := mapMultiPartFileToUploadingObject(req.Files) taskID, err := s.svc.PackageSvc().StartCreatingECPackage(*req.Info.UserID, *req.Info.BucketID, req.Info.Name, objIter, ecInfo) @@ -232,3 +234,21 @@ func (s *PackageService) GetLoadedNodes(ctx *gin.Context) { NodeIDs: nodeIDs, })) } + +func mapMultiPartFileToUploadingObject(files []*multipart.FileHeader) stgiter.UploadingObjectIterator { + return iterator.Map[*multipart.FileHeader]( + iterator.Array(files...), + func(file *multipart.FileHeader) (*stgiter.IterUploadingObject, error) { + stream, err := file.Open() + if err != nil { + return nil, err + } + + return &stgiter.IterUploadingObject{ + Path: file.Filename, + Size: file.Size, + File: stream, + }, nil + }, + ) +} diff --git a/common/pkgs/cmd/create_ec_package.go b/common/pkgs/cmd/create_ec_package.go index 3760276..5193796 100644 --- a/common/pkgs/cmd/create_ec_package.go +++ b/common/pkgs/cmd/create_ec_package.go @@ -152,17 +152,24 @@ func uploadAndUpdateECPackage(packageID int64, objectIter iterator.UploadingObje if err != nil { return nil, fmt.Errorf("reading object: %w", err) } + err = func() error { + defer objInfo.File.Close() + + fileHashes, uploadedNodeIDs, err := uploadECObject(objInfo, uploadNodes, ecInfo, ec) + uploadRets = append(uploadRets, ECObjectUploadResult{ + Info: objInfo, + Error: err, + }) + if err != nil { + return fmt.Errorf("uploading object: %w", err) + } - fileHashes, uploadedNodeIDs, err := uploadECObject(objInfo, uploadNodes, ecInfo, ec) - uploadRets = append(uploadRets, ECObjectUploadResult{ - Info: objInfo, - Error: err, - }) + adds = append(adds, coormq.NewAddECObjectInfo(objInfo.Path, objInfo.Size, fileHashes, uploadedNodeIDs)) + return nil + }() if err != nil { - return nil, fmt.Errorf("uploading object: %w", err) + return nil, err } - - adds = append(adds, coormq.NewAddECObjectInfo(objInfo.Path, objInfo.Size, fileHashes, uploadedNodeIDs)) } _, err = coorCli.UpdateECPackage(coormq.NewUpdateECPackage(packageID, adds, nil)) diff --git a/common/pkgs/cmd/create_rep_package.go b/common/pkgs/cmd/create_rep_package.go index 7308a67..7fd4d8b 100644 --- a/common/pkgs/cmd/create_rep_package.go +++ b/common/pkgs/cmd/create_rep_package.go @@ -152,17 +152,24 @@ func uploadAndUpdateRepPackage(packageID int64, objectIter iterator.UploadingObj return nil, fmt.Errorf("reading object: %w", err) } - fileHash, err := uploadFile(objInfo.File, uploadNode) - uploadRets = append(uploadRets, RepObjectUploadResult{ - Info: objInfo, - Error: err, - FileHash: fileHash, - }) + err = func() error { + defer objInfo.File.Close() + fileHash, err := uploadFile(objInfo.File, uploadNode) + uploadRets = append(uploadRets, RepObjectUploadResult{ + Info: objInfo, + Error: err, + FileHash: fileHash, + }) + if err != nil { + return fmt.Errorf("uploading object: %w", err) + } + + adds = append(adds, coormq.NewAddRepObjectInfo(objInfo.Path, objInfo.Size, fileHash, []int64{uploadNode.Node.NodeID})) + return nil + }() if err != nil { - return nil, fmt.Errorf("uploading object: %w", err) + return nil, err } - - adds = append(adds, coormq.NewAddRepObjectInfo(objInfo.Path, objInfo.Size, fileHash, []int64{uploadNode.Node.NodeID})) } _, err = coorCli.UpdateRepPackage(coormq.NewUpdateRepPackage(packageID, adds, nil)) @@ -195,7 +202,7 @@ func uploadFile(file io.Reader, uploadNode UploadNodeInfo) (string, error) { if uploadNode.IsSameLocation { nodeIP = uploadNode.Node.LocalIP - logger.Infof("client and node %d are at the same location, use local ip\n", uploadNode.Node.NodeID) + logger.Infof("client and node %d are at the same location, use local ip", uploadNode.Node.NodeID) } fileHash, err := uploadToNode(file, nodeIP) diff --git a/common/pkgs/cmd/download_package.go b/common/pkgs/cmd/download_package.go index efaeb50..93cf999 100644 --- a/common/pkgs/cmd/download_package.go +++ b/common/pkgs/cmd/download_package.go @@ -126,24 +126,32 @@ func (t *DownloadPackage) writeObject(objIter iterator.DownloadingObjectIterator if err != nil { return err } - defer objInfo.File.Close() - fullPath := filepath.Join(t.outputPath, objInfo.Object.Path) + err = func() error { + defer objInfo.File.Close() - dirPath := filepath.Dir(fullPath) - if err := os.MkdirAll(dirPath, 0755); err != nil { - return fmt.Errorf("creating object dir: %w", err) - } + fullPath := filepath.Join(t.outputPath, objInfo.Object.Path) - outputFile, err := os.Create(fullPath) - if err != nil { - return fmt.Errorf("creating object file: %w", err) - } - defer outputFile.Close() + dirPath := filepath.Dir(fullPath) + if err := os.MkdirAll(dirPath, 0755); err != nil { + return fmt.Errorf("creating object dir: %w", err) + } - _, err = io.Copy(outputFile, objInfo.File) + outputFile, err := os.Create(fullPath) + if err != nil { + return fmt.Errorf("creating object file: %w", err) + } + defer outputFile.Close() + + _, err = io.Copy(outputFile, objInfo.File) + if err != nil { + return fmt.Errorf("copy object data to local file failed, err: %w", err) + } + + return nil + }() if err != nil { - return fmt.Errorf("copy object data to local file failed, err: %w", err) + return err } } diff --git a/common/pkgs/iterator/ec_object_iterator.go b/common/pkgs/iterator/ec_object_iterator.go index a354158..8ae5b1e 100644 --- a/common/pkgs/iterator/ec_object_iterator.go +++ b/common/pkgs/iterator/ec_object_iterator.go @@ -104,7 +104,7 @@ func (iter *ECObjectIterator) doMove(coorCli *coormq.PoolClient) (*IterDownloadi nodeIPs[i] = nds[i].Node.ExternalIP if nds[i].IsSameLocation { nodeIPs[i] = nds[i].Node.LocalIP - logger.Infof("client and node %d are at the same location, use local ip\n", nds[i].Node.NodeID) + logger.Infof("client and node %d are at the same location, use local ip", nds[i].Node.NodeID) } } diff --git a/common/pkgs/iterator/http_uploading_iterator.go b/common/pkgs/iterator/http_uploading_iterator.go deleted file mode 100644 index 81c2060..0000000 --- a/common/pkgs/iterator/http_uploading_iterator.go +++ /dev/null @@ -1,45 +0,0 @@ -package iterator - -import ( - "mime/multipart" -) - -type HTTPUploadingIterator struct { - files []*multipart.FileHeader - currentIndex int -} - -func NewHTTPObjectIterator(files []*multipart.FileHeader) *HTTPUploadingIterator { - return &HTTPUploadingIterator{ - files: files, - } -} - -func (i *HTTPUploadingIterator) MoveNext() (*IterUploadingObject, error) { - if i.currentIndex >= len(i.files) { - return nil, ErrNoMoreItem - } - - item, err := i.doMove() - i.currentIndex++ - return item, err -} - -func (i *HTTPUploadingIterator) doMove() (*IterUploadingObject, error) { - fileInfo := i.files[i.currentIndex] - - file, err := fileInfo.Open() - if err != nil { - return nil, err - } - - return &IterUploadingObject{ - Path: fileInfo.Filename, - Size: fileInfo.Size, - File: file, - }, nil -} - -func (i *HTTPUploadingIterator) Close() { - -} diff --git a/common/pkgs/iterator/iterator.go b/common/pkgs/iterator/iterator.go index 3ced512..0c20c3f 100644 --- a/common/pkgs/iterator/iterator.go +++ b/common/pkgs/iterator/iterator.go @@ -1,12 +1,9 @@ package iterator import ( - "errors" + "gitlink.org.cn/cloudream/common/pkgs/iterator" ) -var ErrNoMoreItem = errors.New("no more item") +var ErrNoMoreItem = iterator.ErrNoMoreItem -type Iterator[T any] interface { - MoveNext() (T, error) - Close() -} +type Iterator[T any] iterator.Iterator[T] diff --git a/common/pkgs/iterator/rep_object_iterator.go b/common/pkgs/iterator/rep_object_iterator.go index e161781..540eff0 100644 --- a/common/pkgs/iterator/rep_object_iterator.go +++ b/common/pkgs/iterator/rep_object_iterator.go @@ -105,7 +105,7 @@ func (i *RepObjectIterator) doMove(coorCli *coormq.PoolClient) (*IterDownloading if downloadNode.IsSameLocation { nodeIP = downloadNode.Node.LocalIP - logger.Infof("client and node %d are at the same location, use local ip\n", downloadNode.Node.NodeID) + logger.Infof("client and node %d are at the same location, use local ip", downloadNode.Node.NodeID) } reader, err := downloadFile(i.downloadCtx, downloadNode.Node.NodeID, nodeIP, repData.FileHash)