You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

package.go 7.2 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236
  1. package http
  2. import (
  3. "mime/multipart"
  4. "net/http"
  5. "time"
  6. "github.com/gin-gonic/gin"
  7. "gitlink.org.cn/cloudream/common/consts/errorcode"
  8. "gitlink.org.cn/cloudream/common/models"
  9. "gitlink.org.cn/cloudream/common/pkgs/logger"
  10. "gitlink.org.cn/cloudream/common/utils/serder"
  11. "gitlink.org.cn/cloudream/storage-common/pkgs/iterator"
  12. )
  13. type PackageService struct {
  14. *Server
  15. }
  16. func (s *Server) PackageSvc() *PackageService {
  17. return &PackageService{
  18. Server: s,
  19. }
  20. }
  21. type PackageUploadReq struct {
  22. Info PackageUploadInfo `form:"info" binding:"required"`
  23. Files []*multipart.FileHeader `form:"files"`
  24. }
  25. type PackageUploadInfo struct {
  26. UserID *int64 `json:"userID" binding:"required"`
  27. BucketID *int64 `json:"bucketID" binding:"required"`
  28. Name string `json:"name" binding:"required"`
  29. Redundancy models.TypedRedundancyInfo `json:"redundancy" binding:"required"`
  30. }
  31. type PackageUploadResp struct {
  32. PackageID int64 `json:"packageID,string"`
  33. }
  34. func (s *PackageService) Upload(ctx *gin.Context) {
  35. log := logger.WithField("HTTP", "Package.Upload")
  36. var req PackageUploadReq
  37. if err := ctx.ShouldBind(&req); err != nil {
  38. log.Warnf("binding body: %s", err.Error())
  39. ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument"))
  40. return
  41. }
  42. switch req.Info.Redundancy.Type {
  43. case models.RedundancyRep:
  44. s.uploadRep(ctx, &req)
  45. return
  46. case models.RedundancyEC:
  47. s.uploadEC(ctx, &req)
  48. return
  49. }
  50. ctx.JSON(http.StatusForbidden, Failed(errorcode.OperationFailed, "not supported yet"))
  51. }
  52. func (s *PackageService) uploadRep(ctx *gin.Context, req *PackageUploadReq) {
  53. log := logger.WithField("HTTP", "Package.Upload")
  54. var repInfo models.RepRedundancyInfo
  55. if err := serder.AnyToAny(req.Info.Redundancy.Info, &repInfo); err != nil {
  56. log.Warnf("parsing rep redundancy config: %s", err.Error())
  57. ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "invalid rep redundancy config"))
  58. return
  59. }
  60. objIter := iterator.NewHTTPObjectIterator(req.Files)
  61. taskID, err := s.svc.PackageSvc().StartCreatingRepPackage(*req.Info.UserID, *req.Info.BucketID, req.Info.Name, objIter, repInfo)
  62. if err != nil {
  63. log.Warnf("start uploading rep package task: %s", err.Error())
  64. ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "start uploading task failed"))
  65. return
  66. }
  67. for {
  68. complete, createResult, err := s.svc.PackageSvc().WaitCreatingRepPackage(taskID, time.Second*5)
  69. if complete {
  70. if err != nil {
  71. log.Warnf("uploading rep package: %s", err.Error())
  72. ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "uploading rep package failed"))
  73. return
  74. }
  75. ctx.JSON(http.StatusOK, OK(PackageUploadResp{
  76. PackageID: createResult.PackageID,
  77. }))
  78. return
  79. }
  80. if err != nil {
  81. log.Warnf("waiting task: %s", err.Error())
  82. ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "wait uploading task failed"))
  83. return
  84. }
  85. }
  86. }
  87. func (s *PackageService) uploadEC(ctx *gin.Context, req *PackageUploadReq) {
  88. log := logger.WithField("HTTP", "Package.Upload")
  89. var ecInfo models.ECRedundancyInfo
  90. if err := serder.AnyToAny(req.Info.Redundancy.Info, &ecInfo); err != nil {
  91. log.Warnf("parsing ec redundancy config: %s", err.Error())
  92. ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "invalid rep redundancy config"))
  93. return
  94. }
  95. objIter := iterator.NewHTTPObjectIterator(req.Files)
  96. taskID, err := s.svc.PackageSvc().StartCreatingECPackage(*req.Info.UserID, *req.Info.BucketID, req.Info.Name, objIter, ecInfo)
  97. if err != nil {
  98. log.Warnf("start uploading ec package task: %s", err.Error())
  99. ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "start uploading task failed"))
  100. return
  101. }
  102. for {
  103. complete, createResult, err := s.svc.PackageSvc().WaitCreatingECPackage(taskID, time.Second*5)
  104. if complete {
  105. if err != nil {
  106. log.Warnf("uploading ec package: %s", err.Error())
  107. ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "uploading ec package failed"))
  108. return
  109. }
  110. ctx.JSON(http.StatusOK, OK(PackageUploadResp{
  111. PackageID: createResult.PackageID,
  112. }))
  113. return
  114. }
  115. if err != nil {
  116. log.Warnf("waiting task: %s", err.Error())
  117. ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "wait uploading task failed"))
  118. return
  119. }
  120. }
  121. }
  122. type PackageDeleteReq struct {
  123. UserID *int64 `json:"userID" binding:"required"`
  124. PackageID *int64 `json:"packageID" binding:"required"`
  125. }
  126. func (s *PackageService) Delete(ctx *gin.Context) {
  127. log := logger.WithField("HTTP", "Package.Delete")
  128. var req PackageDeleteReq
  129. if err := ctx.ShouldBindJSON(&req); err != nil {
  130. log.Warnf("binding body: %s", err.Error())
  131. ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument"))
  132. return
  133. }
  134. err := s.svc.PackageSvc().DeletePackage(*req.UserID, *req.PackageID)
  135. if err != nil {
  136. log.Warnf("deleting package: %s", err.Error())
  137. ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "delete package failed"))
  138. return
  139. }
  140. ctx.JSON(http.StatusOK, OK(nil))
  141. }
  142. type PackageGetCacheNodeIDs struct {
  143. UserID *int64 `json:"userID" binding:"required"`
  144. PackageID *int64 `json:"packageID" binding:"required"`
  145. }
  146. type GetCacheNodesByPackageResp struct {
  147. NodeIDs []int64 `json:"nodeIDs"`
  148. RedunancyType string `json:"redunancyType,string"`
  149. }
  150. func (s *PackageService) GetCacheNodeIDs(ctx *gin.Context) {
  151. log := logger.WithField("HTTP", "Package.GetCacheNodeIDs")
  152. var req PackageGetCacheNodeIDs
  153. if err := ctx.ShouldBindJSON(&req); err != nil {
  154. log.Warnf("binding body: %s", err.Error())
  155. ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument"))
  156. return
  157. }
  158. nodeIDs, redunancyType, err := s.svc.PackageSvc().GetCacheNodesByPackage(*req.UserID, *req.PackageID)
  159. if err != nil {
  160. log.Warnf("get cache nodes by packageID failed: %s", err.Error())
  161. ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get cache nodes by packageID failed"))
  162. return
  163. }
  164. ctx.JSON(http.StatusOK, OK(GetCacheNodesByPackageResp{
  165. NodeIDs: nodeIDs,
  166. RedunancyType: redunancyType,
  167. }))
  168. }
  169. type PackageGetStorageNodeIDs struct {
  170. UserID *int64 `json:"userID" binding:"required"`
  171. PackageID *int64 `json:"packageID" binding:"required"`
  172. }
  173. type GetStorageNodesByPackageResp struct {
  174. NodeIDs []int64 `json:"nodeIDs"`
  175. }
  176. func (s *PackageService) GetStorageNodeIDs(ctx *gin.Context) {
  177. log := logger.WithField("HTTP", "Package.GetStorageNodeIDs")
  178. var req PackageGetStorageNodeIDs
  179. if err := ctx.ShouldBindJSON(&req); err != nil {
  180. log.Warnf("binding body: %s", err.Error())
  181. ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument"))
  182. return
  183. }
  184. nodeIDs, err := s.svc.PackageSvc().GetStorageNodesByPackage(*req.UserID, *req.PackageID)
  185. if err != nil {
  186. log.Warnf("get storage nodes by packageID failed: %s", err.Error())
  187. ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get storage nodes by packageID failed"))
  188. return
  189. }
  190. ctx.JSON(http.StatusOK, OK(GetStorageNodesByPackageResp{
  191. NodeIDs: nodeIDs,
  192. }))
  193. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。