| @@ -55,24 +55,32 @@ func PackageDownloadPackage(ctx CommandContext, outputDir string, packageID int6 | |||
| if err != nil { | |||
| return err | |||
| } | |||
| defer objInfo.File.Close() | |||
| fullPath := filepath.Join(outputDir, objInfo.Object.Path) | |||
| err = func() error { | |||
| defer objInfo.File.Close() | |||
| dirPath := filepath.Dir(fullPath) | |||
| if err := os.MkdirAll(dirPath, 0755); err != nil { | |||
| return fmt.Errorf("creating object dir: %w", err) | |||
| } | |||
| fullPath := filepath.Join(outputDir, objInfo.Object.Path) | |||
| outputFile, err := os.Create(fullPath) | |||
| if err != nil { | |||
| return fmt.Errorf("creating object file: %w", err) | |||
| } | |||
| defer outputFile.Close() | |||
| dirPath := filepath.Dir(fullPath) | |||
| if err := os.MkdirAll(dirPath, 0755); err != nil { | |||
| return fmt.Errorf("creating object dir: %w", err) | |||
| } | |||
| outputFile, err := os.Create(fullPath) | |||
| if err != nil { | |||
| return fmt.Errorf("creating object file: %w", err) | |||
| } | |||
| defer outputFile.Close() | |||
| _, err = io.Copy(outputFile, objInfo.File) | |||
| if err != nil { | |||
| return fmt.Errorf("copy object data to local file failed, err: %w", err) | |||
| } | |||
| _, err = io.Copy(outputFile, objInfo.File) | |||
| return nil | |||
| }() | |||
| if err != nil { | |||
| return fmt.Errorf("copy object data to local file failed, err: %w", err) | |||
| return err | |||
| } | |||
| } | |||
| @@ -8,8 +8,10 @@ import ( | |||
| "github.com/gin-gonic/gin" | |||
| "gitlink.org.cn/cloudream/common/consts/errorcode" | |||
| "gitlink.org.cn/cloudream/common/models" | |||
| "gitlink.org.cn/cloudream/common/pkgs/iterator" | |||
| "gitlink.org.cn/cloudream/common/pkgs/logger" | |||
| "gitlink.org.cn/cloudream/storage/common/pkgs/iterator" | |||
| stgiter "gitlink.org.cn/cloudream/storage/common/pkgs/iterator" | |||
| ) | |||
| type PackageService struct { | |||
| @@ -72,7 +74,7 @@ func (s *PackageService) uploadRep(ctx *gin.Context, req *PackageUploadReq) { | |||
| return | |||
| } | |||
| objIter := iterator.NewHTTPObjectIterator(req.Files) | |||
| objIter := mapMultiPartFileToUploadingObject(req.Files) | |||
| taskID, err := s.svc.PackageSvc().StartCreatingRepPackage(*req.Info.UserID, *req.Info.BucketID, req.Info.Name, objIter, repInfo) | |||
| @@ -116,7 +118,7 @@ func (s *PackageService) uploadEC(ctx *gin.Context, req *PackageUploadReq) { | |||
| return | |||
| } | |||
| objIter := iterator.NewHTTPObjectIterator(req.Files) | |||
| objIter := mapMultiPartFileToUploadingObject(req.Files) | |||
| taskID, err := s.svc.PackageSvc().StartCreatingECPackage(*req.Info.UserID, *req.Info.BucketID, req.Info.Name, objIter, ecInfo) | |||
| @@ -232,3 +234,21 @@ func (s *PackageService) GetLoadedNodes(ctx *gin.Context) { | |||
| NodeIDs: nodeIDs, | |||
| })) | |||
| } | |||
| func mapMultiPartFileToUploadingObject(files []*multipart.FileHeader) stgiter.UploadingObjectIterator { | |||
| return iterator.Map[*multipart.FileHeader]( | |||
| iterator.Array(files...), | |||
| func(file *multipart.FileHeader) (*stgiter.IterUploadingObject, error) { | |||
| stream, err := file.Open() | |||
| if err != nil { | |||
| return nil, err | |||
| } | |||
| return &stgiter.IterUploadingObject{ | |||
| Path: file.Filename, | |||
| Size: file.Size, | |||
| File: stream, | |||
| }, nil | |||
| }, | |||
| ) | |||
| } | |||
| @@ -4,6 +4,8 @@ import ( | |||
| "fmt" | |||
| "gitlink.org.cn/cloudream/storage/common/globals" | |||
| scmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/scanner" | |||
| scevt "gitlink.org.cn/cloudream/storage/common/pkgs/mq/scanner/event" | |||
| ) | |||
| type ScannerService struct { | |||
| @@ -14,14 +16,14 @@ func (svc *Service) ScannerSvc() *ScannerService { | |||
| return &ScannerService{Service: svc} | |||
| } | |||
| func (svc *ScannerService) PostEvent(event any, isEmergency bool, dontMerge bool) error { | |||
| func (svc *ScannerService) PostEvent(event scevt.Event, isEmergency bool, dontMerge bool) error { | |||
| scCli, err := globals.ScannerMQPool.Acquire() | |||
| if err != nil { | |||
| return fmt.Errorf("new scacnner client: %w", err) | |||
| } | |||
| defer scCli.Close() | |||
| err = scCli.PostEvent(event, isEmergency, dontMerge) | |||
| err = scCli.PostEvent(scmq.NewPostEvent(event, isEmergency, dontMerge)) | |||
| if err != nil { | |||
| return fmt.Errorf("request to scanner failed, err: %w", err) | |||
| } | |||
| @@ -152,17 +152,24 @@ func uploadAndUpdateECPackage(packageID int64, objectIter iterator.UploadingObje | |||
| if err != nil { | |||
| return nil, fmt.Errorf("reading object: %w", err) | |||
| } | |||
| err = func() error { | |||
| defer objInfo.File.Close() | |||
| fileHashes, uploadedNodeIDs, err := uploadECObject(objInfo, uploadNodes, ecInfo, ec) | |||
| uploadRets = append(uploadRets, ECObjectUploadResult{ | |||
| Info: objInfo, | |||
| Error: err, | |||
| }) | |||
| if err != nil { | |||
| return fmt.Errorf("uploading object: %w", err) | |||
| } | |||
| fileHashes, uploadedNodeIDs, err := uploadECObject(objInfo, uploadNodes, ecInfo, ec) | |||
| uploadRets = append(uploadRets, ECObjectUploadResult{ | |||
| Info: objInfo, | |||
| Error: err, | |||
| }) | |||
| adds = append(adds, coormq.NewAddECObjectInfo(objInfo.Path, objInfo.Size, fileHashes, uploadedNodeIDs)) | |||
| return nil | |||
| }() | |||
| if err != nil { | |||
| return nil, fmt.Errorf("uploading object: %w", err) | |||
| return nil, err | |||
| } | |||
| adds = append(adds, coormq.NewAddECObjectInfo(objInfo.Path, objInfo.Size, fileHashes, uploadedNodeIDs)) | |||
| } | |||
| _, err = coorCli.UpdateECPackage(coormq.NewUpdateECPackage(packageID, adds, nil)) | |||
| @@ -152,17 +152,24 @@ func uploadAndUpdateRepPackage(packageID int64, objectIter iterator.UploadingObj | |||
| return nil, fmt.Errorf("reading object: %w", err) | |||
| } | |||
| fileHash, err := uploadFile(objInfo.File, uploadNode) | |||
| uploadRets = append(uploadRets, RepObjectUploadResult{ | |||
| Info: objInfo, | |||
| Error: err, | |||
| FileHash: fileHash, | |||
| }) | |||
| err = func() error { | |||
| defer objInfo.File.Close() | |||
| fileHash, err := uploadFile(objInfo.File, uploadNode) | |||
| uploadRets = append(uploadRets, RepObjectUploadResult{ | |||
| Info: objInfo, | |||
| Error: err, | |||
| FileHash: fileHash, | |||
| }) | |||
| if err != nil { | |||
| return fmt.Errorf("uploading object: %w", err) | |||
| } | |||
| adds = append(adds, coormq.NewAddRepObjectInfo(objInfo.Path, objInfo.Size, fileHash, []int64{uploadNode.Node.NodeID})) | |||
| return nil | |||
| }() | |||
| if err != nil { | |||
| return nil, fmt.Errorf("uploading object: %w", err) | |||
| return nil, err | |||
| } | |||
| adds = append(adds, coormq.NewAddRepObjectInfo(objInfo.Path, objInfo.Size, fileHash, []int64{uploadNode.Node.NodeID})) | |||
| } | |||
| _, err = coorCli.UpdateRepPackage(coormq.NewUpdateRepPackage(packageID, adds, nil)) | |||
| @@ -195,7 +202,7 @@ func uploadFile(file io.Reader, uploadNode UploadNodeInfo) (string, error) { | |||
| if uploadNode.IsSameLocation { | |||
| nodeIP = uploadNode.Node.LocalIP | |||
| logger.Infof("client and node %d are at the same location, use local ip\n", uploadNode.Node.NodeID) | |||
| logger.Infof("client and node %d are at the same location, use local ip", uploadNode.Node.NodeID) | |||
| } | |||
| fileHash, err := uploadToNode(file, nodeIP) | |||
| @@ -126,24 +126,32 @@ func (t *DownloadPackage) writeObject(objIter iterator.DownloadingObjectIterator | |||
| if err != nil { | |||
| return err | |||
| } | |||
| defer objInfo.File.Close() | |||
| fullPath := filepath.Join(t.outputPath, objInfo.Object.Path) | |||
| err = func() error { | |||
| defer objInfo.File.Close() | |||
| dirPath := filepath.Dir(fullPath) | |||
| if err := os.MkdirAll(dirPath, 0755); err != nil { | |||
| return fmt.Errorf("creating object dir: %w", err) | |||
| } | |||
| fullPath := filepath.Join(t.outputPath, objInfo.Object.Path) | |||
| outputFile, err := os.Create(fullPath) | |||
| if err != nil { | |||
| return fmt.Errorf("creating object file: %w", err) | |||
| } | |||
| defer outputFile.Close() | |||
| dirPath := filepath.Dir(fullPath) | |||
| if err := os.MkdirAll(dirPath, 0755); err != nil { | |||
| return fmt.Errorf("creating object dir: %w", err) | |||
| } | |||
| _, err = io.Copy(outputFile, objInfo.File) | |||
| outputFile, err := os.Create(fullPath) | |||
| if err != nil { | |||
| return fmt.Errorf("creating object file: %w", err) | |||
| } | |||
| defer outputFile.Close() | |||
| _, err = io.Copy(outputFile, objInfo.File) | |||
| if err != nil { | |||
| return fmt.Errorf("copy object data to local file failed, err: %w", err) | |||
| } | |||
| return nil | |||
| }() | |||
| if err != nil { | |||
| return fmt.Errorf("copy object data to local file failed, err: %w", err) | |||
| return err | |||
| } | |||
| } | |||
| @@ -104,7 +104,7 @@ func (iter *ECObjectIterator) doMove(coorCli *coormq.PoolClient) (*IterDownloadi | |||
| nodeIPs[i] = nds[i].Node.ExternalIP | |||
| if nds[i].IsSameLocation { | |||
| nodeIPs[i] = nds[i].Node.LocalIP | |||
| logger.Infof("client and node %d are at the same location, use local ip\n", nds[i].Node.NodeID) | |||
| logger.Infof("client and node %d are at the same location, use local ip", nds[i].Node.NodeID) | |||
| } | |||
| } | |||
| @@ -1,45 +0,0 @@ | |||
| package iterator | |||
| import ( | |||
| "mime/multipart" | |||
| ) | |||
| type HTTPUploadingIterator struct { | |||
| files []*multipart.FileHeader | |||
| currentIndex int | |||
| } | |||
| func NewHTTPObjectIterator(files []*multipart.FileHeader) *HTTPUploadingIterator { | |||
| return &HTTPUploadingIterator{ | |||
| files: files, | |||
| } | |||
| } | |||
| func (i *HTTPUploadingIterator) MoveNext() (*IterUploadingObject, error) { | |||
| if i.currentIndex >= len(i.files) { | |||
| return nil, ErrNoMoreItem | |||
| } | |||
| item, err := i.doMove() | |||
| i.currentIndex++ | |||
| return item, err | |||
| } | |||
| func (i *HTTPUploadingIterator) doMove() (*IterUploadingObject, error) { | |||
| fileInfo := i.files[i.currentIndex] | |||
| file, err := fileInfo.Open() | |||
| if err != nil { | |||
| return nil, err | |||
| } | |||
| return &IterUploadingObject{ | |||
| Path: fileInfo.Filename, | |||
| Size: fileInfo.Size, | |||
| File: file, | |||
| }, nil | |||
| } | |||
| func (i *HTTPUploadingIterator) Close() { | |||
| } | |||
| @@ -1,12 +1,9 @@ | |||
| package iterator | |||
| import ( | |||
| "errors" | |||
| "gitlink.org.cn/cloudream/common/pkgs/iterator" | |||
| ) | |||
| var ErrNoMoreItem = errors.New("no more item") | |||
| var ErrNoMoreItem = iterator.ErrNoMoreItem | |||
| type Iterator[T any] interface { | |||
| MoveNext() (T, error) | |||
| Close() | |||
| } | |||
| type Iterator[T any] iterator.Iterator[T] | |||
| @@ -105,7 +105,7 @@ func (i *RepObjectIterator) doMove(coorCli *coormq.PoolClient) (*IterDownloading | |||
| if downloadNode.IsSameLocation { | |||
| nodeIP = downloadNode.Node.LocalIP | |||
| logger.Infof("client and node %d are at the same location, use local ip\n", downloadNode.Node.NodeID) | |||
| logger.Infof("client and node %d are at the same location, use local ip", downloadNode.Node.NodeID) | |||
| } | |||
| reader, err := downloadFile(i.downloadCtx, downloadNode.Node.NodeID, nodeIP, repData.FileHash) | |||
| @@ -1,9 +1,6 @@ | |||
| package scanner | |||
| import ( | |||
| "fmt" | |||
| "time" | |||
| "gitlink.org.cn/cloudream/common/pkgs/mq" | |||
| scevt "gitlink.org.cn/cloudream/storage/common/pkgs/mq/scanner/event" | |||
| ) | |||
| @@ -16,35 +13,22 @@ type EventService interface { | |||
| var _ = RegisterNoReply(EventService.PostEvent) | |||
| type PostEvent struct { | |||
| Event map[string]any `json:"event"` | |||
| IsEmergency bool `json:"isEmergency"` // 重要消息,优先处理 | |||
| DontMerge bool `json:"dontMerge"` // 不可合并此消息 | |||
| Event scevt.Event `json:"event"` | |||
| IsEmergency bool `json:"isEmergency"` // 重要消息,优先处理 | |||
| DontMerge bool `json:"dontMerge"` // 不可合并此消息 | |||
| } | |||
| func NewPostEvent(event any, isEmergency bool, dontMerge bool) (PostEvent, error) { | |||
| mp, err := scevt.MessageToMap(event) | |||
| if err != nil { | |||
| return PostEvent{}, fmt.Errorf("message to map failed, err: %w", err) | |||
| } | |||
| func NewPostEvent(event scevt.Event, isEmergency bool, dontMerge bool) PostEvent { | |||
| return PostEvent{ | |||
| Event: mp, | |||
| Event: event, | |||
| IsEmergency: isEmergency, | |||
| DontMerge: dontMerge, | |||
| }, nil | |||
| } | |||
| func (cli *Client) PostEvent(event any, isEmergency bool, dontMerge bool, opts ...mq.SendOption) error { | |||
| opt := mq.SendOption{ | |||
| Timeout: time.Second * 30, | |||
| } | |||
| if len(opts) > 0 { | |||
| opt = opts[0] | |||
| } | |||
| body, err := NewPostEvent(event, isEmergency, dontMerge) | |||
| if err != nil { | |||
| return fmt.Errorf("new post event body failed, err: %w", err) | |||
| } | |||
| } | |||
| func (client *Client) PostEvent(msg PostEvent) error { | |||
| return mq.Send[PostEvent](client.rabbitCli, msg) | |||
| } | |||
| return mq.Send(cli.rabbitCli, body, opt) | |||
| func init() { | |||
| mq.RegisterUnionType(scevt.EventTypeUnino) | |||
| } | |||
| @@ -5,21 +5,11 @@ import ( | |||
| "gitlink.org.cn/cloudream/common/utils/serder" | |||
| ) | |||
| var typeResolver = serder.NewTypeNameResolver(true) | |||
| type Event interface{} | |||
| var serderOption = serder.TypedSerderOption{ | |||
| TypeResolver: &typeResolver, | |||
| TypeFieldName: "@type", | |||
| } | |||
| func MapToMessage(m map[string]any) (any, error) { | |||
| return serder.TypedMapToObject(m, serderOption) | |||
| } | |||
| func MessageToMap(msg any) (map[string]any, error) { | |||
| return serder.ObjectToTypedMap(msg, serderOption) | |||
| } | |||
| var eventUnionEles = serder.NewTypeNameResolver(true) | |||
| var EventTypeUnino = serder.NewTypeUnion[Event]("@type", eventUnionEles) | |||
| func Register[T any]() { | |||
| typeResolver.Register(myreflect.TypeOf[T]()) | |||
| eventUnionEles.Register(myreflect.TypeOf[T]()) | |||
| } | |||
| @@ -11,6 +11,7 @@ import ( | |||
| "gitlink.org.cn/cloudream/common/pkgs/logger" | |||
| "gitlink.org.cn/cloudream/common/pkgs/mq" | |||
| coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" | |||
| scmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/scanner" | |||
| scevt "gitlink.org.cn/cloudream/storage/common/pkgs/mq/scanner/event" | |||
| ) | |||
| @@ -94,7 +95,7 @@ func (svc *Service) UpdateRepPackage(msg *coormq.UpdateRepPackage) (*coormq.Upda | |||
| affectFileHashes = append(affectFileHashes, add.FileHash) | |||
| } | |||
| err = svc.scanner.PostEvent(scevt.NewCheckRepCount(affectFileHashes), true, true) | |||
| err = svc.scanner.PostEvent(scmq.NewPostEvent(scevt.NewCheckRepCount(affectFileHashes), true, true)) | |||
| if err != nil { | |||
| logger.Warnf("post event to scanner failed, but this will not affect creating, err: %s", err.Error()) | |||
| } | |||
| @@ -170,7 +171,7 @@ func (svc *Service) DeletePackage(msg *coormq.DeletePackage) (*coormq.DeletePack | |||
| // 不追求及时、准确 | |||
| if len(stgs) == 0 { | |||
| // 如果没有被引用,直接投递CheckPackage的任务 | |||
| err := svc.scanner.PostEvent(scevt.NewCheckPackage([]int64{msg.PackageID}), false, false) | |||
| err := svc.scanner.PostEvent(scmq.NewPostEvent(scevt.NewCheckPackage([]int64{msg.PackageID}), false, false)) | |||
| if err != nil { | |||
| logger.Warnf("post event to scanner failed, but this will not affect deleting, err: %s", err.Error()) | |||
| } | |||
| @@ -179,7 +180,7 @@ func (svc *Service) DeletePackage(msg *coormq.DeletePackage) (*coormq.DeletePack | |||
| } else { | |||
| // 有引用则让Agent去检查StoragePackage | |||
| for _, stg := range stgs { | |||
| err := svc.scanner.PostEvent(scevt.NewAgentCheckStorage(stg.StorageID, []int64{msg.PackageID}), false, false) | |||
| err := svc.scanner.PostEvent(scmq.NewPostEvent(scevt.NewAgentCheckStorage(stg.StorageID, []int64{msg.PackageID}), false, false)) | |||
| if err != nil { | |||
| logger.Warnf("post event to scanner failed, but this will not affect deleting, err: %s", err.Error()) | |||
| } | |||
| @@ -17,24 +17,19 @@ const ( | |||
| BuildDir = "./build" | |||
| ) | |||
| var Global = struct { | |||
| OS string | |||
| Arch string | |||
| }{} | |||
| // [配置项]设置编译平台为windows | |||
| func Win() { | |||
| Global.OS = "win" | |||
| magefiles.Global.OS = "win" | |||
| } | |||
| // [配置项]设置编译平台为linux | |||
| func Linux() { | |||
| Global.OS = "linux" | |||
| magefiles.Global.OS = "linux" | |||
| } | |||
| // [配置项]设置编译架构为amd64 | |||
| func AMD64() { | |||
| Global.Arch = "amd64" | |||
| magefiles.Global.Arch = "amd64" | |||
| } | |||
| func All() error { | |||
| @@ -8,6 +8,7 @@ import ( | |||
| event "gitlink.org.cn/cloudream/common/pkgs/event" | |||
| "gitlink.org.cn/cloudream/common/pkgs/typedispatcher" | |||
| mydb "gitlink.org.cn/cloudream/storage/common/pkgs/db" | |||
| scevt "gitlink.org.cn/cloudream/storage/common/pkgs/mq/scanner/event" | |||
| ) | |||
| type ExecuteArgs struct { | |||
| @@ -32,7 +33,7 @@ func NewExecutor(db *mydb.DB, distLock *distlocksvc.Service) Executor { | |||
| var msgDispatcher = typedispatcher.NewTypeDispatcher[Event]() | |||
| func FromMessage(msg any) (Event, error) { | |||
| func FromMessage(msg scevt.Event) (Event, error) { | |||
| event, ok := msgDispatcher.Dispatch(msg) | |||
| if !ok { | |||
| return nil, fmt.Errorf("unknow event message type: %s", reflect.TypeOf(msg).Name()) | |||
| @@ -3,19 +3,11 @@ package services | |||
| import ( | |||
| "gitlink.org.cn/cloudream/common/pkgs/logger" | |||
| scmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/scanner" | |||
| scevt "gitlink.org.cn/cloudream/storage/common/pkgs/mq/scanner/event" | |||
| "gitlink.org.cn/cloudream/storage/scanner/internal/event" | |||
| ) | |||
| func (svc *Service) PostEvent(msg *scmq.PostEvent) { | |||
| evtMsg, err := scevt.MapToMessage(msg.Event) | |||
| if err != nil { | |||
| logger.Warnf("convert map to event message failed, err: %s", err.Error()) | |||
| return | |||
| } | |||
| evt, err := event.FromMessage(evtMsg) | |||
| evt, err := event.FromMessage(msg.Event) | |||
| if err != nil { | |||
| logger.Warnf("create event from event message failed, err: %s", err.Error()) | |||
| return | |||