Browse Source

提交代码,适配数据集及模型上传功能。

Signed-off-by: zouap <zouap@pcl.ac.cn>
tags/v1.22.11.2^2
zouap 3 years ago
parent
commit
d381c0b503
3 changed files with 112 additions and 74 deletions
  1. +1
    -0
      models/file_chunk.go
  2. +9
    -9
      modules/storage/obs.go
  3. +102
    -65
      routers/repo/attachment.go

+ 1
- 0
models/file_chunk.go View File

@@ -17,6 +17,7 @@ type FileChunk struct {
ID int64 `xorm:"pk autoincr"`
UUID string `xorm:"uuid UNIQUE"`
Md5 string `xorm:"INDEX"`
ObjectName string `xorm:"DEFAULT ''"`
IsUploaded int `xorm:"DEFAULT 0"` // not uploaded: 0, uploaded: 1
UploadID string `xorm:"UNIQUE"` //minio upload id
TotalChunks int


+ 9
- 9
modules/storage/obs.go View File

@@ -90,17 +90,16 @@ func listAllParts(uuid, uploadID, key string) (output *obs.ListPartsOutput, err
} else {
continue
}

break
}

return output, nil
}

func GetObsPartInfos(uuid, uploadID, fileName string) (string, error) {
key := strings.TrimPrefix(path.Join(setting.BasePath, path.Join(uuid[0:1], uuid[1:2], uuid, fileName)), "/")
func GetObsPartInfos(objectName, uploadID string) (string, error) {
key := objectName
//strings.TrimPrefix(path.Join(setting.BasePath, path.Join(uuid[0:1], uuid[1:2], uuid, fileName)), "/")

allParts, err := listAllParts(uuid, uploadID, key)
allParts, err := listAllParts(objectName, uploadID, key)
if err != nil {
log.Error("listAllParts failed: %v", err)
return "", err
@@ -129,13 +128,14 @@ func NewObsMultiPartUpload(objectName string) (string, error) {
return output.UploadId, nil
}

func CompleteObsMultiPartUpload(uuid, uploadID, fileName string, totalChunks int) error {
func CompleteObsMultiPartUpload(objectName, uploadID string, totalChunks int) error {
input := &obs.CompleteMultipartUploadInput{}
input.Bucket = setting.Bucket
input.Key = strings.TrimPrefix(path.Join(setting.BasePath, path.Join(uuid[0:1], uuid[1:2], uuid, fileName)), "/")
//input.Key = strings.TrimPrefix(path.Join(setting.BasePath, path.Join(uuid[0:1], uuid[1:2], uuid, fileName)), "/")
input.Key = objectName
input.UploadId = uploadID

allParts, err := listAllParts(uuid, uploadID, input.Key)
allParts, err := listAllParts(objectName, uploadID, input.Key)
if err != nil {
log.Error("listAllParts failed: %v", err)
return err
@@ -154,7 +154,7 @@ func CompleteObsMultiPartUpload(uuid, uploadID, fileName string, totalChunks int
return err
}

log.Info("uuid:%s, RequestId:%s", uuid, output.RequestId)
log.Info("uuid:%s, RequestId:%s", objectName, output.RequestId)

return nil
}


+ 102
- 65
routers/repo/attachment.go View File

@@ -529,19 +529,19 @@ func UpdateAttachmentDecompressState(ctx *context.Context) {
})
}

func getCloudOneMinioPrefix(scene string) string {
func getCloudOneMinioPrefix(scene string, fileChunk *models.FileChunk) string {
if scene == Attachment_model {
return Model_prefix
return fileChunk.ObjectName
} else {
return setting.Attachment.Minio.BasePath
return setting.Attachment.Minio.BasePath + models.AttachmentRelativePath(fileChunk.UUID)
}
}

func getCloudTwoOBSPrefix(scene string) string {
func getCloudTwoOBSPrefix(scene string, fileChunk *models.FileChunk, fileName string) string {
if scene == Attachment_model {
return Model_prefix
return fileChunk.ObjectName
} else {
return setting.BasePath
return setting.BasePath + models.AttachmentRelativePath(fileChunk.UUID) + "/" + fileName
}
}

@@ -576,7 +576,7 @@ func GetSuccessChunks(ctx *context.Context) {

isExist := false
if typeCloudBrain == models.TypeCloudBrainOne {
isExist, err = storage.Attachments.HasObject(getCloudOneMinioPrefix(scene) + models.AttachmentRelativePath(fileChunk.UUID))
isExist, err = storage.Attachments.HasObject(getCloudOneMinioPrefix(scene, fileChunk))
if err != nil {
ctx.ServerError("HasObject failed", err)
return
@@ -587,7 +587,7 @@ func GetSuccessChunks(ctx *context.Context) {
if oldAttachment != nil {
oldFileName = oldAttachment.Name
}
isExist, err = storage.ObsHasObject(getCloudTwoOBSPrefix(scene) + models.AttachmentRelativePath(fileChunk.UUID) + "/" + oldFileName)
isExist, err = storage.ObsHasObject(getCloudTwoOBSPrefix(scene, fileChunk, oldFileName))
if err != nil {
ctx.ServerError("ObsHasObject failed", err)
return
@@ -612,12 +612,12 @@ func GetSuccessChunks(ctx *context.Context) {
}

if typeCloudBrain == models.TypeCloudBrainOne {
chunks, err = storage.GetPartInfos(fileChunk.UUID, fileChunk.UploadID)
chunks, err = storage.GetPartInfos(getChunkMinioExistObjectName(scene, fileChunk, fileName), fileChunk.UploadID)
if err != nil {
log.Error("GetPartInfos failed:%v", err.Error())
}
} else {
chunks, err = storage.GetObsPartInfos(fileChunk.UUID, fileChunk.UploadID, fileName)
chunks, err = storage.GetObsPartInfos(getChunkOBSExistObjectName(scene, fileChunk, fileName), fileChunk.UploadID)
if err != nil {
log.Error("GetObsPartInfos failed:%v", err.Error())
}
@@ -684,7 +684,7 @@ func GetSuccessChunks(ctx *context.Context) {
}
}

func getCloudOneMinioObjectName(scene string, uuid string, filename string) string {
func getMinioInitObjectName(scene string, uuid string, filename string) string {
if scene == Attachment_model {
return strings.TrimPrefix(path.Join(Model_prefix, path.Join(uuid[0:1], uuid[1:2], uuid, filename)), "/")
} else {
@@ -692,7 +692,15 @@ func getCloudOneMinioObjectName(scene string, uuid string, filename string) stri
}
}

func getCloudTwoOBSObjectName(scene string, uuid string, filename string) string {
func getChunkMinioExistObjectName(scene string, fileChunk *models.FileChunk, filename string) string {
if scene == Attachment_model {
return fileChunk.ObjectName
} else {
return strings.TrimPrefix(path.Join(setting.Attachment.Minio.BasePath, path.Join(fileChunk.UUID[0:1], fileChunk.UUID[1:2], fileChunk.UUID)), "/")
}
}

func getOBSInitObjectName(scene string, uuid string, filename string) string {
if scene == Attachment_model {
return strings.TrimPrefix(path.Join(Model_prefix, path.Join(uuid[0:1], uuid[1:2], uuid, filename)), "/")
} else {
@@ -700,6 +708,14 @@ func getCloudTwoOBSObjectName(scene string, uuid string, filename string) string
}
}

func getChunkOBSExistObjectName(scene string, fileChunk *models.FileChunk, filename string) string {
if scene == Attachment_model {
return fileChunk.ObjectName
} else {
return strings.TrimPrefix(path.Join(setting.BasePath, path.Join(fileChunk.UUID[0:1], fileChunk.UUID[1:2], fileChunk.UUID, filename)), "/")
}
}

func NewMultipart(ctx *context.Context) {
if !setting.Attachment.Enabled {
ctx.Error(404, "attachment is not enabled")
@@ -736,15 +752,21 @@ func NewMultipart(ctx *context.Context) {
}

uuid := gouuid.NewV4().String()
if scene == Attachment_model {
uuid = ctx.Query("uuid")
}
var uploadID string
var objectName string
if typeCloudBrain == models.TypeCloudBrainOne {
uploadID, err = storage.NewMultiPartUpload(getCloudOneMinioObjectName(scene, uuid, fileName))
objectName = getMinioInitObjectName(scene, uuid, fileName)
uploadID, err = storage.NewMultiPartUpload(objectName)
if err != nil {
ctx.ServerError("NewMultipart", err)
return
}
} else {
uploadID, err = storage.NewObsMultiPartUpload(getCloudTwoOBSObjectName(scene, uuid, fileName))
objectName = getOBSInitObjectName(scene, uuid, fileName)
uploadID, err = storage.NewObsMultiPartUpload(objectName)
if err != nil {
ctx.ServerError("NewObsMultiPartUpload", err)
return
@@ -755,6 +777,7 @@ func NewMultipart(ctx *context.Context) {
UUID: uuid,
UserID: ctx.User.ID,
UploadID: uploadID,
ObjectName: objectName,
Md5: ctx.Query("md5"),
Size: fileSize,
TotalChunks: totalChunkCounts,
@@ -836,7 +859,15 @@ func GetMultipartUploadUrl(ctx *context.Context) {
ctx.ServerError("checkTypeCloudBrain failed", err)
return
}

fileChunk, err := models.GetFileChunkByUUID(uuid)
if err != nil {
if models.IsErrFileChunkNotExist(err) {
ctx.Error(404)
} else {
ctx.ServerError("GetFileChunkByUUID", err)
}
return
}
url := ""
if typeCloudBrain == models.TypeCloudBrainOne {
if size > minio_ext.MinPartSize {
@@ -844,7 +875,7 @@ func GetMultipartUploadUrl(ctx *context.Context) {
return
}

url, err = storage.GenMultiPartSignedUrl(getCloudOneMinioObjectName(scene, uuid, fileName), uploadID, partNumber, size)
url, err = storage.GenMultiPartSignedUrl(getChunkMinioExistObjectName(scene, fileChunk, fileName), uploadID, partNumber, size)
if err != nil {
ctx.Error(500, fmt.Sprintf("GenMultiPartSignedUrl failed: %v", err))
return
@@ -854,7 +885,7 @@ func GetMultipartUploadUrl(ctx *context.Context) {
url = setting.PROXYURL + "/obs_proxy_multipart?uuid=" + uuid + "&uploadId=" + uploadID + "&partNumber=" + fmt.Sprint(partNumber) + "&file_name=" + fileName
log.Info("return url=" + url)
} else {
url, err = storage.ObsGenMultiPartSignedUrl(getCloudTwoOBSObjectName(scene, uuid, fileName), uploadID, partNumber)
url, err = storage.ObsGenMultiPartSignedUrl(getChunkOBSExistObjectName(scene, fileChunk, fileName), uploadID, partNumber)
if err != nil {
ctx.Error(500, fmt.Sprintf("ObsGenMultiPartSignedUrl failed: %v", err))
return
@@ -873,8 +904,10 @@ func CompleteMultipart(ctx *context.Context) {
uploadID := ctx.Query("uploadID")
typeCloudBrain := ctx.QueryInt("type")
fileName := ctx.Query("file_name")
scene := ctx.Query("scene")

log.Warn("uuid:" + uuid)
log.Warn("scene:" + scene)
log.Warn("typeCloudBrain:" + strconv.Itoa(typeCloudBrain))

err := checkTypeCloudBrain(typeCloudBrain)
@@ -894,13 +927,13 @@ func CompleteMultipart(ctx *context.Context) {
}

if typeCloudBrain == models.TypeCloudBrainOne {
_, err = storage.CompleteMultiPartUpload(uuid, uploadID, fileChunk.TotalChunks)
_, err = storage.CompleteMultiPartUpload(getChunkMinioExistObjectName(scene, fileChunk, fileName), uploadID, fileChunk.TotalChunks)
if err != nil {
ctx.Error(500, fmt.Sprintf("CompleteMultiPartUpload failed: %v", err))
return
}
} else {
err = storage.CompleteObsMultiPartUpload(uuid, uploadID, fileName, fileChunk.TotalChunks)
err = storage.CompleteObsMultiPartUpload(getChunkOBSExistObjectName(scene, fileChunk, fileName), uploadID, fileChunk.TotalChunks)
if err != nil {
ctx.Error(500, fmt.Sprintf("CompleteObsMultiPartUpload failed: %v", err))
return
@@ -914,58 +947,62 @@ func CompleteMultipart(ctx *context.Context) {
ctx.Error(500, fmt.Sprintf("UpdateFileChunk: %v", err))
return
}
dataset, _ := models.GetDatasetByID(ctx.QueryInt64("dataset_id"))
log.Warn("insert attachment to datasetId:" + strconv.FormatInt(dataset.ID, 10))
attachment, err := models.InsertAttachment(&models.Attachment{
UUID: uuid,
UploaderID: ctx.User.ID,
IsPrivate: dataset.IsPrivate(),
Name: fileName,
Size: ctx.QueryInt64("size"),
DatasetID: ctx.QueryInt64("dataset_id"),
Description: ctx.Query("description"),
Type: typeCloudBrain,
})

if err != nil {
ctx.Error(500, fmt.Sprintf("InsertAttachment: %v", err))
return
}
attachment.UpdateDatasetUpdateUnix()
repository, _ := models.GetRepositoryByID(dataset.RepoID)
notification.NotifyOtherTask(ctx.User, repository, fmt.Sprint(repository.IsPrivate, attachment.IsPrivate), attachment.Name, models.ActionUploadAttachment)
if attachment.DatasetID != 0 {
if isCanDecompress(attachment.Name) {
if typeCloudBrain == models.TypeCloudBrainOne {
err = worker.SendDecompressTask(contexExt.Background(), uuid, attachment.Name)
if err != nil {
log.Error("SendDecompressTask(%s) failed:%s", uuid, err.Error())
} else {
updateAttachmentDecompressStateIng(attachment)
if scene == Attachment_model {

} else {
dataset, _ := models.GetDatasetByID(ctx.QueryInt64("dataset_id"))
log.Warn("insert attachment to datasetId:" + strconv.FormatInt(dataset.ID, 10))
attachment, err := models.InsertAttachment(&models.Attachment{
UUID: uuid,
UploaderID: ctx.User.ID,
IsPrivate: dataset.IsPrivate(),
Name: fileName,
Size: ctx.QueryInt64("size"),
DatasetID: ctx.QueryInt64("dataset_id"),
Description: ctx.Query("description"),
Type: typeCloudBrain,
})

if err != nil {
ctx.Error(500, fmt.Sprintf("InsertAttachment: %v", err))
return
}
attachment.UpdateDatasetUpdateUnix()
repository, _ := models.GetRepositoryByID(dataset.RepoID)
notification.NotifyOtherTask(ctx.User, repository, fmt.Sprint(repository.IsPrivate, attachment.IsPrivate), attachment.Name, models.ActionUploadAttachment)
if attachment.DatasetID != 0 {
if isCanDecompress(attachment.Name) {
if typeCloudBrain == models.TypeCloudBrainOne {
err = worker.SendDecompressTask(contexExt.Background(), uuid, attachment.Name)
if err != nil {
log.Error("SendDecompressTask(%s) failed:%s", uuid, err.Error())
} else {
updateAttachmentDecompressStateIng(attachment)
}
}
}
if typeCloudBrain == models.TypeCloudBrainTwo {
attachjson, _ := json.Marshal(attachment)
err = labelmsg.SendDecompressAttachToLabelOBS(string(attachjson))
if err != nil {
log.Error("SendDecompressTask to labelsystem (%s) failed:%s", attachment.UUID, err.Error())
} else {
updateAttachmentDecompressStateIng(attachment)
if typeCloudBrain == models.TypeCloudBrainTwo {
attachjson, _ := json.Marshal(attachment)
err = labelmsg.SendDecompressAttachToLabelOBS(string(attachjson))
if err != nil {
log.Error("SendDecompressTask to labelsystem (%s) failed:%s", attachment.UUID, err.Error())
} else {
updateAttachmentDecompressStateIng(attachment)
}
}
} else {
var labelMap map[string]string
labelMap = make(map[string]string)
labelMap["UUID"] = uuid
labelMap["Type"] = fmt.Sprint(attachment.Type)
labelMap["UploaderID"] = fmt.Sprint(attachment.UploaderID)
labelMap["RepoID"] = fmt.Sprint(dataset.RepoID)
labelMap["AttachName"] = attachment.Name
attachjson, _ := json.Marshal(labelMap)
labelmsg.SendAddAttachToLabelSys(string(attachjson))
}
} else {
var labelMap map[string]string
labelMap = make(map[string]string)
labelMap["UUID"] = uuid
labelMap["Type"] = fmt.Sprint(attachment.Type)
labelMap["UploaderID"] = fmt.Sprint(attachment.UploaderID)
labelMap["RepoID"] = fmt.Sprint(dataset.RepoID)
labelMap["AttachName"] = attachment.Name
attachjson, _ := json.Marshal(labelMap)
labelmsg.SendAddAttachToLabelSys(string(attachjson))
}
}

ctx.JSON(200, map[string]string{
"result_code": "0",
})


Loading…
Cancel
Save