| @@ -791,8 +791,8 @@ func (obsClient ObsClient) GetBucketRequestPaymentWithSignedUrl(signedUrl string | |||||
| } | } | ||||
| func (obsClient ObsClient) CreateUploadPartSignedUrl(bucketName, objectKey, uploadId string, partNumber int, partSize int64) (string, error) { | |||||
| requestURL := "" | |||||
| func (obsClient ObsClient) CreateUploadPartSignedUrl(bucketName, objectKey, uploadId string, partNumber int, partSize int64) (*http.Request, error) { | |||||
| var req *http.Request | |||||
| input := &UploadPartInput{} | input := &UploadPartInput{} | ||||
| input.Bucket = bucketName | input.Bucket = bucketName | ||||
| @@ -808,7 +808,7 @@ func (obsClient ObsClient) CreateUploadPartSignedUrl(bucketName, objectKey, uplo | |||||
| params, headers, _, err := input.trans(obsClient.conf.signature == SignatureObs) | params, headers, _, err := input.trans(obsClient.conf.signature == SignatureObs) | ||||
| if err != nil { | if err != nil { | ||||
| return requestURL, err | |||||
| return req, err | |||||
| } | } | ||||
| if params == nil { | if params == nil { | ||||
| @@ -833,10 +833,63 @@ func (obsClient ObsClient) CreateUploadPartSignedUrl(bucketName, objectKey, uplo | |||||
| headers["Content-Length"] = []string{com.ToStr(partNumber,10)} | headers["Content-Length"] = []string{com.ToStr(partNumber,10)} | ||||
| requestURL, err = obsClient.doAuth(HTTP_PUT, bucketName, objectKey, params, headers, "") | |||||
| requestURL, err := obsClient.doAuth(HTTP_PUT, bucketName, objectKey, params, headers, "") | |||||
| if err != nil { | if err != nil { | ||||
| return requestURL, nil | |||||
| return req, nil | |||||
| } | } | ||||
| return requestURL, nil | |||||
| var _data io.Reader | |||||
| req, err = http.NewRequest(HTTP_PUT, requestURL, _data) | |||||
| if obsClient.conf.ctx != nil { | |||||
| req = req.WithContext(obsClient.conf.ctx) | |||||
| } | |||||
| if err != nil { | |||||
| return req, err | |||||
| } | |||||
| if isDebugLogEnabled() { | |||||
| auth := headers[HEADER_AUTH_CAMEL] | |||||
| delete(headers, HEADER_AUTH_CAMEL) | |||||
| var isSecurityToken bool | |||||
| var securityToken []string | |||||
| if securityToken, isSecurityToken = headers[HEADER_STS_TOKEN_AMZ]; isSecurityToken { | |||||
| headers[HEADER_STS_TOKEN_AMZ] = []string{"******"} | |||||
| } else if securityToken, isSecurityToken = headers[HEADER_STS_TOKEN_OBS]; isSecurityToken { | |||||
| headers[HEADER_STS_TOKEN_OBS] = []string{"******"} | |||||
| } | |||||
| doLog(LEVEL_DEBUG, "Request headers: %v", headers) | |||||
| headers[HEADER_AUTH_CAMEL] = auth | |||||
| if isSecurityToken { | |||||
| if obsClient.conf.signature == SignatureObs { | |||||
| headers[HEADER_STS_TOKEN_OBS] = securityToken | |||||
| } else { | |||||
| headers[HEADER_STS_TOKEN_AMZ] = securityToken | |||||
| } | |||||
| } | |||||
| } | |||||
| for key, value := range headers { | |||||
| if key == HEADER_HOST_CAMEL { | |||||
| req.Host = value[0] | |||||
| delete(headers, key) | |||||
| } else if key == HEADER_CONTENT_LENGTH_CAMEL { | |||||
| req.ContentLength = StringToInt64(value[0], -1) | |||||
| delete(headers, key) | |||||
| } else { | |||||
| req.Header[key] = value | |||||
| } | |||||
| } | |||||
| var lastRequest *http.Request | |||||
| lastRequest = req | |||||
| req.Header[HEADER_USER_AGENT_CAMEL] = []string{USER_AGENT} | |||||
| if lastRequest != nil { | |||||
| req.Host = lastRequest.Host | |||||
| req.ContentLength = lastRequest.ContentLength | |||||
| } | |||||
| return req, nil | |||||
| } | } | ||||
| @@ -140,14 +140,15 @@ func ObsGenMultiPartSignedUrl(uuid string, uploadId string, partNumber int, part | |||||
| */ | */ | ||||
| Key := strings.TrimPrefix(path.Join(setting.BasePath, path.Join(uuid[0:1], uuid[1:2], uuid)), "/") | Key := strings.TrimPrefix(path.Join(setting.BasePath, path.Join(uuid[0:1], uuid[1:2], uuid)), "/") | ||||
| url, err := ObsCli.CreateUploadPartSignedUrl(setting.Bucket, Key, uploadId, partNumber, partSize) | |||||
| req, err := ObsCli.CreateUploadPartSignedUrl(setting.Bucket, Key, uploadId, partNumber, partSize) | |||||
| if err != nil { | if err != nil { | ||||
| log.Error("CreateSignedUrl failed:", err.Error()) | log.Error("CreateSignedUrl failed:", err.Error()) | ||||
| return "", err | return "", err | ||||
| } | } | ||||
| log.Info(url) | |||||
| log.Info(req.URL.String()) | |||||
| log.Info("", req.Header) | |||||
| return url, nil | |||||
| return req.URL.String(), nil | |||||
| } | } | ||||
| @@ -11,6 +11,7 @@ import ( | |||||
| "fmt" | "fmt" | ||||
| "mime/multipart" | "mime/multipart" | ||||
| "net/http" | "net/http" | ||||
| "path" | |||||
| "strconv" | "strconv" | ||||
| "strings" | "strings" | ||||
| @@ -275,13 +276,29 @@ func GetPresignedPutObjectURL(ctx *context.Context) { | |||||
| // AddAttachment response for add attachment record | // AddAttachment response for add attachment record | ||||
| func AddAttachment(ctx *context.Context) { | func AddAttachment(ctx *context.Context) { | ||||
| uuid := ctx.Query("uuid") | |||||
| has, err := storage.Attachments.HasObject(models.AttachmentRelativePath(uuid)) | |||||
| typeCloudBrain := ctx.QueryInt("type") | |||||
| err := checkTypeCloudBrain(typeCloudBrain) | |||||
| if err != nil { | if err != nil { | ||||
| ctx.ServerError("HasObject", err) | |||||
| ctx.ServerError("checkTypeCloudBrain failed", err) | |||||
| return | return | ||||
| } | } | ||||
| uuid := ctx.Query("uuid") | |||||
| has := false | |||||
| if typeCloudBrain == models.TypeCloudBrainOne { | |||||
| has, err = storage.Attachments.HasObject(models.AttachmentRelativePath(uuid)) | |||||
| if err != nil { | |||||
| ctx.ServerError("HasObject", err) | |||||
| return | |||||
| } | |||||
| } else { | |||||
| has, err = storage.ObsHasObject(models.AttachmentRelativePath(uuid)) | |||||
| if err != nil { | |||||
| ctx.ServerError("ObsHasObject", err) | |||||
| return | |||||
| } | |||||
| } | |||||
| if !has { | if !has { | ||||
| ctx.Error(404, "attachment has not been uploaded") | ctx.Error(404, "attachment has not been uploaded") | ||||
| return | return | ||||
| @@ -294,6 +311,7 @@ func AddAttachment(ctx *context.Context) { | |||||
| Name: ctx.Query("file_name"), | Name: ctx.Query("file_name"), | ||||
| Size: ctx.QueryInt64("size"), | Size: ctx.QueryInt64("size"), | ||||
| DatasetID: ctx.QueryInt64("dataset_id"), | DatasetID: ctx.QueryInt64("dataset_id"), | ||||
| Type: typeCloudBrain, | |||||
| }) | }) | ||||
| if err != nil { | if err != nil { | ||||
| @@ -303,16 +321,19 @@ func AddAttachment(ctx *context.Context) { | |||||
| if attachment.DatasetID != 0 { | if attachment.DatasetID != 0 { | ||||
| if strings.HasSuffix(attachment.Name, ".zip") { | if strings.HasSuffix(attachment.Name, ".zip") { | ||||
| err = worker.SendDecompressTask(contexExt.Background(), uuid) | |||||
| if err != nil { | |||||
| log.Error("SendDecompressTask(%s) failed:%s", uuid, err.Error()) | |||||
| } else { | |||||
| attachment.DecompressState = models.DecompressStateIng | |||||
| err = models.UpdateAttachment(attachment) | |||||
| if typeCloudBrain == models.TypeCloudBrainOne { | |||||
| err = worker.SendDecompressTask(contexExt.Background(), uuid) | |||||
| if err != nil { | if err != nil { | ||||
| log.Error("UpdateAttachment state(%s) failed:%s", uuid, err.Error()) | |||||
| log.Error("SendDecompressTask(%s) failed:%s", uuid, err.Error()) | |||||
| } else { | |||||
| attachment.DecompressState = models.DecompressStateIng | |||||
| err = models.UpdateAttachment(attachment) | |||||
| if err != nil { | |||||
| log.Error("UpdateAttachment state(%s) failed:%s", uuid, err.Error()) | |||||
| } | |||||
| } | } | ||||
| } | } | ||||
| //todo:decompress type_two | |||||
| } | } | ||||
| } | } | ||||
| @@ -581,6 +602,16 @@ func GetMultipartUploadUrl(ctx *context.Context) { | |||||
| }) | }) | ||||
| } | } | ||||
| func GetObsKey(ctx *context.Context) { | |||||
| uuid := gouuid.NewV4().String() | |||||
| key := strings.TrimPrefix(path.Join(setting.BasePath, path.Join(uuid[0:1], uuid[1:2], uuid)), "/") | |||||
| ctx.JSON(200, map[string]string{ | |||||
| "uuid": uuid, | |||||
| "key": key, | |||||
| }) | |||||
| } | |||||
| func UploadPart(ctx *context.Context) { | func UploadPart(ctx *context.Context) { | ||||
| tmp, err := ctx.Req.Body().String() | tmp, err := ctx.Req.Body().String() | ||||
| log.Info(tmp) | log.Info(tmp) | ||||
| @@ -530,6 +530,7 @@ func RegisterRoutes(m *macaron.Macaron) { | |||||
| m.Post("/complete_multipart", repo.CompleteMultipart) | m.Post("/complete_multipart", repo.CompleteMultipart) | ||||
| m.Post("/update_chunk", repo.UpdateMultipart) | m.Post("/update_chunk", repo.UpdateMultipart) | ||||
| m.Post("/upload_part", repo.UploadPart) | m.Post("/upload_part", repo.UploadPart) | ||||
| m.Get("/get_obs_key", repo.GetObsKey) | |||||
| }, reqSignIn) | }, reqSignIn) | ||||
| m.Group("/attachments", func() { | m.Group("/attachments", func() { | ||||