You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

upload_rep_objects.go 8.3 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282
  1. package task
  2. import (
  3. "fmt"
  4. "io"
  5. "math/rand"
  6. "time"
  7. "github.com/samber/lo"
  8. "gitlink.org.cn/cloudream/agent/internal/config"
  9. "gitlink.org.cn/cloudream/common/pkg/distlock/reqbuilder"
  10. "gitlink.org.cn/cloudream/common/pkg/logger"
  11. mygrpc "gitlink.org.cn/cloudream/common/utils/grpc"
  12. "gitlink.org.cn/cloudream/common/utils/ipfs"
  13. agentcaller "gitlink.org.cn/cloudream/proto"
  14. ramsg "gitlink.org.cn/cloudream/rabbitmq/message"
  15. coormsg "gitlink.org.cn/cloudream/rabbitmq/message/coordinator"
  16. "google.golang.org/grpc"
  17. "google.golang.org/grpc/credentials/insecure"
  18. )
  19. // UploadObjects和UploadRepResults为一一对应关系
  20. type UploadRepObjects struct {
  21. userID int64
  22. bucketID int64
  23. repCount int
  24. Objects []UploadObject
  25. Results []UploadSingleRepObjectResult
  26. IsUploading bool
  27. }
  28. type UploadRepObjectsResult struct {
  29. Objects []UploadObject
  30. Results []UploadSingleRepObjectResult
  31. IsUploading bool
  32. }
  33. type UploadObject struct {
  34. ObjectName string
  35. File io.ReadCloser
  36. FileSize int64
  37. }
  38. type UploadSingleRepObjectResult struct {
  39. Error error
  40. FileHash string
  41. ObjectID int64
  42. }
  43. func NewUploadRepObjects(userID int64, bucketID int64, uploadObjects []UploadObject, repCount int) *UploadRepObjects {
  44. return &UploadRepObjects{
  45. userID: userID,
  46. bucketID: bucketID,
  47. Objects: uploadObjects,
  48. repCount: repCount,
  49. }
  50. }
  51. func (t *UploadRepObjects) Execute(ctx TaskContext, complete CompleteFn) {
  52. log := logger.WithType[UploadRepObjects]("Task")
  53. log.Debugf("begin with %v", logger.FormatStruct(t))
  54. defer log.Debugf("end")
  55. err := t.do(ctx)
  56. complete(err, CompleteOption{
  57. RemovingDelay: time.Minute,
  58. })
  59. for _, obj := range t.Objects {
  60. obj.File.Close()
  61. }
  62. }
  63. func (t *UploadRepObjects) do(ctx TaskContext) error {
  64. reqBlder := reqbuilder.NewBuilder()
  65. for _, uploadObject := range t.Objects {
  66. reqBlder.Metadata().
  67. // 用于防止创建了多个同名对象
  68. Object().CreateOne(t.bucketID, uploadObject.ObjectName)
  69. }
  70. mutex, err := reqBlder.
  71. Metadata().
  72. // 用于判断用户是否有桶的权限
  73. UserBucket().ReadOne(t.userID, t.bucketID).
  74. // 用于查询可用的上传节点
  75. Node().ReadAny().
  76. // 用于设置Rep配置
  77. ObjectRep().CreateAny().
  78. // 用于创建Cache记录
  79. Cache().CreateAny().
  80. MutexLock(ctx.DistLock)
  81. if err != nil {
  82. return fmt.Errorf("acquire locks failed, err: %w", err)
  83. }
  84. defer mutex.Unlock()
  85. var repWriteResps []*coormsg.PreUploadResp
  86. //判断是否所有文件都符合上传条件
  87. hasFailure := true
  88. for i := 0; i < len(t.Objects); i++ {
  89. repWriteResp, err := t.preUploadSingleObject(ctx, t.Objects[i])
  90. if err != nil {
  91. hasFailure = false
  92. t.Results = append(t.Results,
  93. UploadSingleRepObjectResult{
  94. Error: err,
  95. FileHash: "",
  96. ObjectID: 0,
  97. })
  98. continue
  99. }
  100. t.Results = append(t.Results, UploadSingleRepObjectResult{})
  101. repWriteResps = append(repWriteResps, repWriteResp)
  102. }
  103. // 不满足上传条件,返回各文件检查结果
  104. if !hasFailure {
  105. return nil
  106. }
  107. //上传文件夹
  108. t.IsUploading = true
  109. for i := 0; i < len(repWriteResps); i++ {
  110. objectID, fileHash, err := t.uploadSingleObject(ctx, t.Objects[i], repWriteResps[i])
  111. // 记录文件上传结果
  112. t.Results[i] = UploadSingleRepObjectResult{
  113. Error: err,
  114. FileHash: fileHash,
  115. ObjectID: objectID,
  116. }
  117. }
  118. return nil
  119. }
  120. // 检查单个文件是否能够上传
  121. func (t *UploadRepObjects) preUploadSingleObject(ctx TaskContext, uploadObject UploadObject) (*coormsg.PreUploadResp, error) {
  122. //发送写请求,请求Coor分配写入节点Ip
  123. repWriteResp, err := ctx.Coordinator.PreUploadRepObject(coormsg.NewPreUploadRepObjectBody(t.bucketID, uploadObject.ObjectName, uploadObject.FileSize, t.userID, config.Cfg().ExternalIP))
  124. if err != nil {
  125. return nil, fmt.Errorf("pre upload rep object: %w", err)
  126. }
  127. if len(repWriteResp.Nodes) == 0 {
  128. return nil, fmt.Errorf("no node to upload file")
  129. }
  130. return repWriteResp, nil
  131. }
  132. // 上传文件
  133. func (t *UploadRepObjects) uploadSingleObject(ctx TaskContext, uploadObject UploadObject, preResp *coormsg.PreUploadResp) (int64, string, error) {
  134. var fileHash string
  135. uploadedNodeIDs := []int64{}
  136. willUploadToNode := true
  137. // 因为本地的IPFS属于调度系统的一部分,所以需要加锁
  138. mutex, err := reqbuilder.NewBuilder().
  139. IPFS().CreateAnyRep(config.Cfg().ID).
  140. MutexLock(ctx.DistLock)
  141. if err != nil {
  142. return 0, "", fmt.Errorf("acquiring locks: %w", err)
  143. }
  144. fileHash, err = uploadToLocalIPFS(ctx.IPFS, uploadObject.File)
  145. if err != nil {
  146. // 上传失败,则立刻解锁
  147. mutex.Unlock()
  148. logger.Warnf("uploading to local IPFS: %s, will select a node to upload", err.Error())
  149. } else {
  150. willUploadToNode = false
  151. uploadedNodeIDs = append(uploadedNodeIDs, config.Cfg().ID)
  152. // 上传成功,则等到所有操作结束后才能解锁
  153. defer mutex.Unlock()
  154. }
  155. // 本地IPFS失败,则发送到agent上传
  156. if willUploadToNode {
  157. // 本地IPFS已经失败,所以不要再选择当前节点了
  158. uploadNode := t.chooseUploadNode(lo.Reject(preResp.Nodes, func(item ramsg.RespNode, index int) bool { return item.ID == config.Cfg().ID }))
  159. // 如果客户端与节点在同一个地域,则使用内网地址连接节点
  160. nodeIP := uploadNode.ExternalIP
  161. if uploadNode.IsSameLocation {
  162. nodeIP = uploadNode.LocalIP
  163. logger.Infof("client and node %d are at the same location, use local ip\n", uploadNode.ID)
  164. }
  165. mutex, err := reqbuilder.NewBuilder().
  166. // 防止上传的副本被清除
  167. IPFS().CreateAnyRep(uploadNode.ID).
  168. MutexLock(ctx.DistLock)
  169. if err != nil {
  170. return 0, "", fmt.Errorf("acquire locks failed, err: %w", err)
  171. }
  172. defer mutex.Unlock()
  173. fileHash, err = uploadToNode(uploadObject.File, nodeIP)
  174. if err != nil {
  175. return 0, "", fmt.Errorf("upload to node %s failed, err: %w", nodeIP, err)
  176. }
  177. uploadedNodeIDs = append(uploadedNodeIDs, uploadNode.ID)
  178. }
  179. // 记录写入的文件的Hash
  180. createResp, err := ctx.Coordinator.CreateRepObject(coormsg.NewCreateRepObject(t.bucketID, uploadObject.ObjectName, uploadObject.FileSize, t.repCount, t.userID, uploadedNodeIDs, fileHash))
  181. if err != nil {
  182. return 0, "", fmt.Errorf("creating rep object: %w", err)
  183. }
  184. return createResp.ObjectID, fileHash, nil
  185. }
  186. // chooseUploadNode 选择一个上传文件的节点
  187. // 1. 从与当前客户端相同地域的节点中随机选一个
  188. // 2. 没有用的话从所有节点中随机选一个
  189. func (t *UploadRepObjects) chooseUploadNode(nodes []ramsg.RespNode) ramsg.RespNode {
  190. sameLocationNodes := lo.Filter(nodes, func(e ramsg.RespNode, i int) bool { return e.IsSameLocation })
  191. if len(sameLocationNodes) > 0 {
  192. return sameLocationNodes[rand.Intn(len(sameLocationNodes))]
  193. }
  194. return nodes[rand.Intn(len(nodes))]
  195. }
  196. func uploadToNode(file io.ReadCloser, nodeIP string) (string, error) {
  197. // 建立grpc连接,发送请求
  198. grpcAddr := fmt.Sprintf("%s:%d", nodeIP, config.Cfg().GRPCPort)
  199. grpcCon, err := grpc.Dial(grpcAddr, grpc.WithTransportCredentials(insecure.NewCredentials()))
  200. if err != nil {
  201. return "", fmt.Errorf("connect to grpc server at %s failed, err: %w", grpcAddr, err)
  202. }
  203. defer grpcCon.Close()
  204. client := agentcaller.NewFileTransportClient(grpcCon)
  205. upload, err := mygrpc.SendFileAsStream(client)
  206. if err != nil {
  207. return "", fmt.Errorf("request to send file failed, err: %w", err)
  208. }
  209. // 发送文件数据
  210. _, err = io.Copy(upload, file)
  211. if err != nil {
  212. // 发生错误则关闭连接
  213. upload.Abort(io.ErrClosedPipe)
  214. return "", fmt.Errorf("copy file date to upload stream failed, err: %w", err)
  215. }
  216. // 发送EOF消息,并获得FileHash
  217. fileHash, err := upload.Finish()
  218. if err != nil {
  219. upload.Abort(io.ErrClosedPipe)
  220. return "", fmt.Errorf("send EOF failed, err: %w", err)
  221. }
  222. return fileHash, nil
  223. }
  224. func uploadToLocalIPFS(ipfs *ipfs.IPFS, file io.ReadCloser) (string, error) {
  225. // 从本地IPFS上传文件
  226. writer, err := ipfs.CreateFile()
  227. if err != nil {
  228. return "", fmt.Errorf("create IPFS file failed, err: %w", err)
  229. }
  230. _, err = io.Copy(writer, file)
  231. if err != nil {
  232. return "", fmt.Errorf("copy file data to IPFS failed, err: %w", err)
  233. }
  234. fileHash, err := writer.Finish()
  235. if err != nil {
  236. return "", fmt.Errorf("finish writing IPFS failed, err: %w", err)
  237. }
  238. return fileHash, nil
  239. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。