diff --git a/sdks/imfs/models.go b/sdks/imfs/models.go index c7c54a9..4292bcd 100644 --- a/sdks/imfs/models.go +++ b/sdks/imfs/models.go @@ -16,10 +16,10 @@ type ClientService struct { //代表任务给提供各服务的端口 type ServerService struct { Name string `json:"name"` - Port string `json:"port"` + Port int `json:"port"` } type FullJobID struct { - JobSetID schsdk.JobSetID - LocalJobID string + JobSetID schsdk.JobSetID `json:"jobSetID"` + LocalJobID string `json:"localJobID"` } diff --git a/sdks/storage/object.go b/sdks/storage/object.go index fd82606..86e7fdb 100644 --- a/sdks/storage/object.go +++ b/sdks/storage/object.go @@ -102,6 +102,7 @@ type ObjectDownload struct { ObjectID ObjectID `form:"objectID" json:"objectID" binding:"required"` Offset int64 `form:"offset" json:"offset,omitempty"` Length *int64 `form:"length" json:"length,omitempty"` + PartSize int64 `form:"partSize" json:"partSize,omitempty"` } type DownloadingObject struct { Path string diff --git a/utils/http/http.go b/utils/http/http.go index d496daa..5eeb1af 100644 --- a/utils/http/http.go +++ b/utils/http/http.go @@ -6,14 +6,17 @@ import ( "io" "mime" "mime/multipart" + "net" "net/http" "net/textproto" ul "net/url" "reflect" "strings" + "time" "github.com/mitchellh/mapstructure" "gitlink.org.cn/cloudream/common/pkgs/iterator" + "gitlink.org.cn/cloudream/common/utils/io2" "gitlink.org.cn/cloudream/common/utils/math2" "gitlink.org.cn/cloudream/common/utils/serder" ) @@ -25,6 +28,16 @@ const ( ContentTypeOctetStream = "application/octet-stream" ) +var defaultClient = http.Client{ + Timeout: time.Second * 5, + Transport: &http.Transport{ + DialContext: (&net.Dialer{ + Timeout: time.Second * 5, + KeepAlive: 0, // 修改为 0 可以生效 + }).DialContext, + }, +} + type RequestParam struct { Header any Query any @@ -49,7 +62,7 @@ func GetJSON(url string, param RequestParam) (*http.Response, error) { return nil, err } - return http.DefaultClient.Do(req) + return defaultClient.Do(req) } func GetForm(url string, param RequestParam) (*http.Response, error) { @@ -70,7 +83,7 @@ func GetForm(url string, param RequestParam) (*http.Response, error) { return nil, err } - return http.DefaultClient.Do(req) + return defaultClient.Do(req) } func PostJSON(url string, param RequestParam) (*http.Response, error) { @@ -91,7 +104,7 @@ func PostJSON(url string, param RequestParam) (*http.Response, error) { return nil, err } - return http.DefaultClient.Do(req) + return defaultClient.Do(req) } func PostForm(url string, param RequestParam) (*http.Response, error) { @@ -112,7 +125,7 @@ func PostForm(url string, param RequestParam) (*http.Response, error) { return nil, err } - return http.DefaultClient.Do(req) + return defaultClient.Do(req) } func ParseJSONResponse[TBody any](resp *http.Response) (TBody, error) { @@ -143,56 +156,96 @@ type MultiPartFile struct { } type multiPartFileIterator struct { - mr *multipart.Reader - firstFile *multipart.Part + iterErr error + mr *multipart.Reader + nextPart *multipart.Part + nextFileName string } +// 不可异步调用 func (m *multiPartFileIterator) MoveNext() (*MultiPartFile, error) { - if m.firstFile != nil { - f := m.firstFile - m.firstFile = nil + if m.iterErr != nil { + return nil, m.iterErr + } + + return &MultiPartFile{ + FieldName: m.nextPart.FormName(), + FileName: m.nextFileName, + File: &multiPartFileReader{ + iter: m, + curPart: m.nextPart, + fileName: m.nextFileName, + }, + Header: m.nextPart.Header, + }, nil +} - fileName, err := ul.PathUnescape(f.FileName()) - if err != nil { - return nil, fmt.Errorf("unescape file name: %w", err) - } +func (m *multiPartFileIterator) Close() {} + +type multiPartFileReader struct { + iter *multiPartFileIterator + curPart *multipart.Part + fileName string +} - return &MultiPartFile{ - FieldName: f.FormName(), - FileName: fileName, - File: f, - Header: f.Header, - }, nil +func (r *multiPartFileReader) Read(p []byte) (int, error) { + if r.curPart != nil { + return r.readFromCurPart(p) } - for { - part, err := m.mr.NextPart() - if err == io.EOF { - return nil, iterator.ErrNoMoreItem - } - if err != nil { - return nil, err - } + part, err := r.iter.mr.NextPart() + if err == io.EOF { + r.iter.iterErr = iterator.ErrNoMoreItem + return 0, io.EOF + } + if err != nil { + r.iter.iterErr = err + return 0, err + } + r.curPart = part - fileName, err := ul.PathUnescape(part.FileName()) - if err != nil { - return nil, fmt.Errorf("unescape file name: %w", err) - } + fileName, err := ul.PathUnescape(part.FileName()) + if err != nil { + r.iter.iterErr = fmt.Errorf("unescape file name: %w", err) + return 0, r.iter.iterErr + } - if part.FileName() != "" { - return &MultiPartFile{ - FieldName: part.FormName(), - FileName: fileName, - File: part, - Header: part.Header, - }, nil - } + if fileName == r.fileName { + return r.readFromCurPart(p) } + + // 如果新一个part的文件名不同,则开始一个新文件 + r.iter.nextPart = part + r.iter.nextFileName = fileName + return 0, io.EOF } -func (m *multiPartFileIterator) Close() {} +func (r *multiPartFileReader) readFromCurPart(p []byte) (int, error) { + rd, err := r.curPart.Read(p) + if err == io.EOF { + r.curPart = nil + return rd, nil + } + if err != nil { + return 0, err + } + + return rd, nil +} -// 解析multipart/form-data响应,只支持form参数在头部的情况 +func (m *multiPartFileReader) Close() error { + // 如果在文件还未读取完毕的时候关闭文件,则后续所有文件都无法再读取(暂不支持跳文件读取) + if m.curPart != nil { + m.curPart.Close() + m.iter.iterErr = fmt.Errorf("stream closed") + } + + return nil +} + +// 解析multipart/form-data响应。 +// - 只支持form参数在头部的情况。 +// - 支持文件分片(拆分成连续的同文件名的part) func ParseMultiPartResponse(resp *http.Response) (map[string]string, iterator.Iterator[*MultiPartFile], error) { mtype, params, err := mime.ParseMediaType(resp.Header.Get("Content-Type")) if err != nil { @@ -211,7 +264,8 @@ func ParseMultiPartResponse(resp *http.Response) (map[string]string, iterator.It formValues := make(map[string]string) rd := multipart.NewReader(resp.Body, boundary) - var firstFile *multipart.Part + var firstFilePart *multipart.Part + var firstFileName string for { part, err := rd.NextPart() if err == io.EOF { @@ -229,7 +283,12 @@ func ParseMultiPartResponse(resp *http.Response) (map[string]string, iterator.It } if fileName != "" { - firstFile = part + firstFilePart = part + firstFileName, err = ul.PathUnescape(part.FileName()) + if err != nil { + return nil, nil, fmt.Errorf("unescape file name: %w", err) + } + break } @@ -242,16 +301,18 @@ func ParseMultiPartResponse(resp *http.Response) (map[string]string, iterator.It } return formValues, &multiPartFileIterator{ - mr: rd, - firstFile: firstFile, + mr: rd, + nextPart: firstFilePart, + nextFileName: firstFileName, }, nil } type MultiPartRequestParam struct { - Header any - Query any - Form any - Files MultiPartFileIterator + Header any + Query any + Form any + Files MultiPartFileIterator + PartSize int64 // 文件分片大小,如果为0则不分片 } type MultiPartFileIterator = iterator.Iterator[*IterMultiPartFile] @@ -309,20 +370,12 @@ func PostMultiPart(url string, param MultiPartRequestParam) (*http.Response, err return fmt.Errorf("opening file: %w", err) } - err = func() error { - defer file.File.Close() - - w, err := muWriter.CreateFormFile(file.FieldName, ul.PathEscape(file.FileName)) - if err != nil { - return fmt.Errorf("create form file failed, err: %w", err) - } - - _, err = io.Copy(w, file.File) - if err != nil { - return err - } - return nil - }() + if param.PartSize == 0 { + err = sendFileOnePart(muWriter, file.FieldName, file.FileName, file.File) + } else { + err = sendFileMultiPart(muWriter, file.FieldName, file.FileName, file.File, param.PartSize) + } + file.File.Close() if err != nil { return err } @@ -348,6 +401,37 @@ func PostMultiPart(url string, param MultiPartRequestParam) (*http.Response, err return resp, nil } +func sendFileMultiPart(muWriter *multipart.Writer, fieldName, fileName string, file io.ReadCloser, partSize int64) error { + for { + w, err := muWriter.CreateFormFile(fieldName, ul.PathEscape(fileName)) + if err != nil { + return fmt.Errorf("create form file failed, err: %w", err) + } + + _, err = io.Copy(w, io2.MustLength(file, partSize)) + if err == io.EOF { + continue + } + if err == io.ErrUnexpectedEOF { + break + } + if err != nil { + return err + } + } + return nil +} + +func sendFileOnePart(muWriter *multipart.Writer, fieldName, fileName string, file io.ReadCloser) error { + w, err := muWriter.CreateFormFile(fieldName, ul.PathEscape(fileName)) + if err != nil { + return fmt.Errorf("create form file failed, err: %w", err) + } + + _, err = io.Copy(w, file) + return err +} + func prepareQuery(req *http.Request, query any) error { if query == nil { return nil