From 2d7d23ad2e51e6bc95b4640e9cb8fcf226733604 Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Thu, 27 Mar 2025 11:11:10 +0800 Subject: [PATCH 1/3] =?UTF-8?q?=E6=9F=A5=E8=AF=A2=E5=AF=B9=E8=B1=A1?= =?UTF-8?q?=E6=8E=A5=E5=8F=A3=E6=94=AF=E6=8C=81=E9=A2=84=E7=AD=BE=E5=90=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- client/internal/http/presigned.go | 20 ++++++++++++++++++++ client/internal/http/server.go | 1 + coordinator/internal/cmd/migrate.go | 1 + 3 files changed, 22 insertions(+) diff --git a/client/internal/http/presigned.go b/client/internal/http/presigned.go index 926ef36..3e4d73d 100644 --- a/client/internal/http/presigned.go +++ b/client/internal/http/presigned.go @@ -27,6 +27,26 @@ func (s *Server) Presigned() *PresignedService { } } +func (s *PresignedService) ObjectListByPath(ctx *gin.Context) { + log := logger.WithField("HTTP", "Presigned.ObjectListByPath") + + var req cdsapi.PresignedObjectListByPath + if err := ctx.ShouldBindQuery(&req); err != nil { + log.Warnf("binding body: %s", err.Error()) + ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) + return + } + + resp, err := s.svc.ObjectSvc().GetByPath(req.ObjectListByPath) + if err != nil { + log.Warnf("listing objects: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, fmt.Sprintf("listing objects: %v", err))) + return + } + + ctx.JSON(http.StatusOK, OK(resp)) +} + func (s *PresignedService) ObjectDownloadByPath(ctx *gin.Context) { log := logger.WithField("HTTP", "Presigned.ObjectDownloadByPath") diff --git a/client/internal/http/server.go b/client/internal/http/server.go index 9726f25..66faf64 100644 --- a/client/internal/http/server.go +++ b/client/internal/http/server.go @@ -124,6 +124,7 @@ func (s *Server) routeV1(eg *gin.Engine, rt gin.IRoutes) { rt.POST(cdsapi.ObjectUploadPartPath, s.Object().UploadPart) rt.POST(cdsapi.ObjectCompleteMultipartUploadPath, s.Object().CompleteMultipartUpload) + rt.GET(cdsapi.PresignedObjectListByPathPath, s.awsAuth.PresignedAuth, s.Presigned().ObjectListByPath) rt.GET(cdsapi.PresignedObjectDownloadByPathPath, s.awsAuth.PresignedAuth, s.Presigned().ObjectDownloadByPath) rt.GET(cdsapi.PresignedObjectDownloadPath, s.awsAuth.PresignedAuth, s.Presigned().ObjectDownload) rt.POST(cdsapi.PresignedObjectUploadPath, s.awsAuth.PresignedAuth, s.Presigned().ObjectUpload) diff --git a/coordinator/internal/cmd/migrate.go b/coordinator/internal/cmd/migrate.go index 309b832..a73b856 100644 --- a/coordinator/internal/cmd/migrate.go +++ b/coordinator/internal/cmd/migrate.go @@ -40,6 +40,7 @@ func migrate(configPath string) { fmt.Println(err) os.Exit(1) } + db = db.Set("gorm:table_options", "CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci") migrateOne(db, cdssdk.Bucket{}) migrateOne(db, model.Cache{}) From 527d64c15793eb54e29a791d457827e585a3e14d Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Fri, 28 Mar 2025 09:23:14 +0800 Subject: [PATCH 2/3] =?UTF-8?q?=E5=A4=8D=E5=88=B6Package=E7=9A=84=E4=BA=8B?= =?UTF-8?q?=E4=BB=B6=E5=A2=9E=E5=8A=A0ObjectID?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- common/models/datamap.go | 8 +++++--- coordinator/internal/mq/package.go | 7 +++++++ 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/common/models/datamap.go b/common/models/datamap.go index 5b3b47a..2629472 100644 --- a/common/models/datamap.go +++ b/common/models/datamap.go @@ -458,9 +458,11 @@ func (b *BodyNewPackage) OnUnionSerializing() { // Package克隆的事件 type BodyPackageCloned struct { serder.Metadata `union:"PackageCloned"` - Type string `json:"type"` - SourcePackageID cdssdk.PackageID `json:"sourcePackageID"` - NewPackage cdssdk.Package `json:"newPackage"` + Type string `json:"type"` + SourcePackageID cdssdk.PackageID `json:"sourcePackageID"` + NewPackage cdssdk.Package `json:"newPackage"` + SourceObjectIDs []cdssdk.ObjectID `json:"sourceObjectIDs"` // 原本的ObjectID + NewObjectIDs []cdssdk.ObjectID `json:"newObjectIDs"` // 复制后的新ObjectID,与SourceObjectIDs一一对应 } func (b *BodyPackageCloned) GetBodyType() string { diff --git a/coordinator/internal/mq/package.go b/coordinator/internal/mq/package.go index 999234b..f285d26 100644 --- a/coordinator/internal/mq/package.go +++ b/coordinator/internal/mq/package.go @@ -160,6 +160,8 @@ func (svc *Service) DeletePackage(msg *coormq.DeletePackage) (*coormq.DeletePack func (svc *Service) ClonePackage(msg *coormq.ClonePackage) (*coormq.ClonePackageResp, *mq.CodeMessage) { var pkg cdssdk.Package + var oldObjIDs []cdssdk.ObjectID + var newObjIDs []cdssdk.ObjectID err := svc.db2.DoTx(func(tx db2.SQLContext) error { var err error @@ -198,6 +200,9 @@ func (svc *Service) ClonePackage(msg *coormq.ClonePackage) (*coormq.ClonePackage oldToNew := make(map[cdssdk.ObjectID]cdssdk.ObjectID) for i, obj := range clonedObjs { oldToNew[objs[i].ObjectID] = obj.ObjectID + + oldObjIDs = append(oldObjIDs, objs[i].ObjectID) + newObjIDs = append(newObjIDs, obj.ObjectID) } clonedBlks := make([]stgmod.ObjectBlock, len(objBlks)) @@ -224,6 +229,8 @@ func (svc *Service) ClonePackage(msg *coormq.ClonePackage) (*coormq.ClonePackage svc.evtPub.Publish(&stgmod.BodyPackageCloned{ SourcePackageID: msg.PackageID, NewPackage: pkg, + SourceObjectIDs: oldObjIDs, + NewObjectIDs: newObjIDs, }) return mq.ReplyOK(coormq.RespClonePackage(pkg)) From dbc42fe4e4c0fc61c51051aca1a50d2bb4a96e7e Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Mon, 31 Mar 2025 17:26:44 +0800 Subject: [PATCH 3/3] =?UTF-8?q?=E8=A7=A3=E5=86=B3=E4=B8=80=E4=BA=9B?= =?UTF-8?q?=E8=AF=BB=E5=8F=96=E6=B5=81=E6=97=B6=E9=94=99=E8=AF=AF=E5=A4=84?= =?UTF-8?q?=E7=90=86EOF=E7=9A=84=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- common/pkgs/ioswitch2/ops2/range.go | 2 +- common/pkgs/ioswitch2/ops2/segment.go | 16 ++++++++++++---- common/pkgs/ioswitchlrc/ops2/range.go | 2 +- 3 files changed, 14 insertions(+), 6 deletions(-) diff --git a/common/pkgs/ioswitch2/ops2/range.go b/common/pkgs/ioswitch2/ops2/range.go index 4bc70fa..78e99dd 100644 --- a/common/pkgs/ioswitch2/ops2/range.go +++ b/common/pkgs/ioswitch2/ops2/range.go @@ -35,6 +35,7 @@ func (o *Range) Execute(ctx *exec.ExecContext, e *exec.Executor) error { for o.Offset > 0 { rdCnt := math2.Min(o.Offset, int64(len(buf))) rd, err := input.Stream.Read(buf[:rdCnt]) + o.Offset -= int64(rd) if err == io.EOF { // 输入流不够长度也不报错,只是产生一个空的流 break @@ -42,7 +43,6 @@ func (o *Range) Execute(ctx *exec.ExecContext, e *exec.Executor) error { if err != nil { return err } - o.Offset -= int64(rd) } fut := future.NewSetVoid() diff --git a/common/pkgs/ioswitch2/ops2/segment.go b/common/pkgs/ioswitch2/ops2/segment.go index f1ab0b8..525e330 100644 --- a/common/pkgs/ioswitch2/ops2/segment.go +++ b/common/pkgs/ioswitch2/ops2/segment.go @@ -86,12 +86,20 @@ func (o *SegmentJoin) Read(buf []byte) (int, error) { } n, err := o.nextStream.Read(buf) - if err == io.EOF { - o.nextStream.Close() - o.nextStream = nil + if err == nil { + return n, err + } + + if err != io.EOF { + return n, err + } + + o.nextStream.Close() + o.nextStream = nil + if n == 0 { continue } - return n, err + return n, nil } } diff --git a/common/pkgs/ioswitchlrc/ops2/range.go b/common/pkgs/ioswitchlrc/ops2/range.go index 4bc70fa..78e99dd 100644 --- a/common/pkgs/ioswitchlrc/ops2/range.go +++ b/common/pkgs/ioswitchlrc/ops2/range.go @@ -35,6 +35,7 @@ func (o *Range) Execute(ctx *exec.ExecContext, e *exec.Executor) error { for o.Offset > 0 { rdCnt := math2.Min(o.Offset, int64(len(buf))) rd, err := input.Stream.Read(buf[:rdCnt]) + o.Offset -= int64(rd) if err == io.EOF { // 输入流不够长度也不报错,只是产生一个空的流 break @@ -42,7 +43,6 @@ func (o *Range) Execute(ctx *exec.ExecContext, e *exec.Executor) error { if err != nil { return err } - o.Offset -= int64(rd) } fut := future.NewSetVoid()