| @@ -791,8 +791,8 @@ func (obsClient ObsClient) GetBucketRequestPaymentWithSignedUrl(signedUrl string | |||
| } | |||
| func (obsClient ObsClient) CreateUploadPartSignedUrl(bucketName, objectKey, uploadId string, partNumber int, partSize int64) (string, error) { | |||
| requestURL := "" | |||
| func (obsClient ObsClient) CreateUploadPartSignedUrl(bucketName, objectKey, uploadId string, partNumber int, partSize int64) (*http.Request, error) { | |||
| var req *http.Request | |||
| input := &UploadPartInput{} | |||
| input.Bucket = bucketName | |||
| @@ -808,7 +808,7 @@ func (obsClient ObsClient) CreateUploadPartSignedUrl(bucketName, objectKey, uplo | |||
| params, headers, _, err := input.trans(obsClient.conf.signature == SignatureObs) | |||
| if err != nil { | |||
| return requestURL, err | |||
| return req, err | |||
| } | |||
| if params == nil { | |||
| @@ -833,10 +833,63 @@ func (obsClient ObsClient) CreateUploadPartSignedUrl(bucketName, objectKey, uplo | |||
| headers["Content-Length"] = []string{com.ToStr(partNumber,10)} | |||
| requestURL, err = obsClient.doAuth(HTTP_PUT, bucketName, objectKey, params, headers, "") | |||
| requestURL, err := obsClient.doAuth(HTTP_PUT, bucketName, objectKey, params, headers, "") | |||
| if err != nil { | |||
| return requestURL, nil | |||
| return req, nil | |||
| } | |||
| return requestURL, nil | |||
| var _data io.Reader | |||
| req, err = http.NewRequest(HTTP_PUT, requestURL, _data) | |||
| if obsClient.conf.ctx != nil { | |||
| req = req.WithContext(obsClient.conf.ctx) | |||
| } | |||
| if err != nil { | |||
| return req, err | |||
| } | |||
| if isDebugLogEnabled() { | |||
| auth := headers[HEADER_AUTH_CAMEL] | |||
| delete(headers, HEADER_AUTH_CAMEL) | |||
| var isSecurityToken bool | |||
| var securityToken []string | |||
| if securityToken, isSecurityToken = headers[HEADER_STS_TOKEN_AMZ]; isSecurityToken { | |||
| headers[HEADER_STS_TOKEN_AMZ] = []string{"******"} | |||
| } else if securityToken, isSecurityToken = headers[HEADER_STS_TOKEN_OBS]; isSecurityToken { | |||
| headers[HEADER_STS_TOKEN_OBS] = []string{"******"} | |||
| } | |||
| doLog(LEVEL_DEBUG, "Request headers: %v", headers) | |||
| headers[HEADER_AUTH_CAMEL] = auth | |||
| if isSecurityToken { | |||
| if obsClient.conf.signature == SignatureObs { | |||
| headers[HEADER_STS_TOKEN_OBS] = securityToken | |||
| } else { | |||
| headers[HEADER_STS_TOKEN_AMZ] = securityToken | |||
| } | |||
| } | |||
| } | |||
| for key, value := range headers { | |||
| if key == HEADER_HOST_CAMEL { | |||
| req.Host = value[0] | |||
| delete(headers, key) | |||
| } else if key == HEADER_CONTENT_LENGTH_CAMEL { | |||
| req.ContentLength = StringToInt64(value[0], -1) | |||
| delete(headers, key) | |||
| } else { | |||
| req.Header[key] = value | |||
| } | |||
| } | |||
| var lastRequest *http.Request | |||
| lastRequest = req | |||
| req.Header[HEADER_USER_AGENT_CAMEL] = []string{USER_AGENT} | |||
| if lastRequest != nil { | |||
| req.Host = lastRequest.Host | |||
| req.ContentLength = lastRequest.ContentLength | |||
| } | |||
| return req, nil | |||
| } | |||
| @@ -140,14 +140,15 @@ func ObsGenMultiPartSignedUrl(uuid string, uploadId string, partNumber int, part | |||
| */ | |||
| Key := strings.TrimPrefix(path.Join(setting.BasePath, path.Join(uuid[0:1], uuid[1:2], uuid)), "/") | |||
| url, err := ObsCli.CreateUploadPartSignedUrl(setting.Bucket, Key, uploadId, partNumber, partSize) | |||
| req, err := ObsCli.CreateUploadPartSignedUrl(setting.Bucket, Key, uploadId, partNumber, partSize) | |||
| if err != nil { | |||
| log.Error("CreateSignedUrl failed:", err.Error()) | |||
| return "", err | |||
| } | |||
| log.Info(url) | |||
| log.Info(req.URL.String()) | |||
| log.Info("", req.Header) | |||
| return url, nil | |||
| return req.URL.String(), nil | |||
| } | |||
| @@ -11,6 +11,7 @@ import ( | |||
| "fmt" | |||
| "mime/multipart" | |||
| "net/http" | |||
| "path" | |||
| "strconv" | |||
| "strings" | |||
| @@ -275,13 +276,29 @@ func GetPresignedPutObjectURL(ctx *context.Context) { | |||
| // AddAttachment response for add attachment record | |||
| func AddAttachment(ctx *context.Context) { | |||
| uuid := ctx.Query("uuid") | |||
| has, err := storage.Attachments.HasObject(models.AttachmentRelativePath(uuid)) | |||
| typeCloudBrain := ctx.QueryInt("type") | |||
| err := checkTypeCloudBrain(typeCloudBrain) | |||
| if err != nil { | |||
| ctx.ServerError("HasObject", err) | |||
| ctx.ServerError("checkTypeCloudBrain failed", err) | |||
| return | |||
| } | |||
| uuid := ctx.Query("uuid") | |||
| has := false | |||
| if typeCloudBrain == models.TypeCloudBrainOne { | |||
| has, err = storage.Attachments.HasObject(models.AttachmentRelativePath(uuid)) | |||
| if err != nil { | |||
| ctx.ServerError("HasObject", err) | |||
| return | |||
| } | |||
| } else { | |||
| has, err = storage.ObsHasObject(models.AttachmentRelativePath(uuid)) | |||
| if err != nil { | |||
| ctx.ServerError("ObsHasObject", err) | |||
| return | |||
| } | |||
| } | |||
| if !has { | |||
| ctx.Error(404, "attachment has not been uploaded") | |||
| return | |||
| @@ -294,6 +311,7 @@ func AddAttachment(ctx *context.Context) { | |||
| Name: ctx.Query("file_name"), | |||
| Size: ctx.QueryInt64("size"), | |||
| DatasetID: ctx.QueryInt64("dataset_id"), | |||
| Type: typeCloudBrain, | |||
| }) | |||
| if err != nil { | |||
| @@ -303,16 +321,19 @@ func AddAttachment(ctx *context.Context) { | |||
| if attachment.DatasetID != 0 { | |||
| if strings.HasSuffix(attachment.Name, ".zip") { | |||
| err = worker.SendDecompressTask(contexExt.Background(), uuid) | |||
| if err != nil { | |||
| log.Error("SendDecompressTask(%s) failed:%s", uuid, err.Error()) | |||
| } else { | |||
| attachment.DecompressState = models.DecompressStateIng | |||
| err = models.UpdateAttachment(attachment) | |||
| if typeCloudBrain == models.TypeCloudBrainOne { | |||
| err = worker.SendDecompressTask(contexExt.Background(), uuid) | |||
| if err != nil { | |||
| log.Error("UpdateAttachment state(%s) failed:%s", uuid, err.Error()) | |||
| log.Error("SendDecompressTask(%s) failed:%s", uuid, err.Error()) | |||
| } else { | |||
| attachment.DecompressState = models.DecompressStateIng | |||
| err = models.UpdateAttachment(attachment) | |||
| if err != nil { | |||
| log.Error("UpdateAttachment state(%s) failed:%s", uuid, err.Error()) | |||
| } | |||
| } | |||
| } | |||
| //todo:decompress type_two | |||
| } | |||
| } | |||
| @@ -581,6 +602,16 @@ func GetMultipartUploadUrl(ctx *context.Context) { | |||
| }) | |||
| } | |||
| func GetObsKey(ctx *context.Context) { | |||
| uuid := gouuid.NewV4().String() | |||
| key := strings.TrimPrefix(path.Join(setting.BasePath, path.Join(uuid[0:1], uuid[1:2], uuid)), "/") | |||
| ctx.JSON(200, map[string]string{ | |||
| "uuid": uuid, | |||
| "key": key, | |||
| }) | |||
| } | |||
| func UploadPart(ctx *context.Context) { | |||
| tmp, err := ctx.Req.Body().String() | |||
| log.Info(tmp) | |||
| @@ -530,6 +530,7 @@ func RegisterRoutes(m *macaron.Macaron) { | |||
| m.Post("/complete_multipart", repo.CompleteMultipart) | |||
| m.Post("/update_chunk", repo.UpdateMultipart) | |||
| m.Post("/upload_part", repo.UploadPart) | |||
| m.Get("/get_obs_key", repo.GetObsKey) | |||
| }, reqSignIn) | |||
| m.Group("/attachments", func() { | |||