You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

object.go 7.8 kB

2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278
  1. package services
  2. import (
  3. "fmt"
  4. "io"
  5. "sync"
  6. "gitlink.org.cn/cloudream/client/config"
  7. "gitlink.org.cn/cloudream/db/model"
  8. agentcaller "gitlink.org.cn/cloudream/proto"
  9. "gitlink.org.cn/cloudream/utils/consts"
  10. "gitlink.org.cn/cloudream/utils/consts/errorcode"
  11. mygrpc "gitlink.org.cn/cloudream/utils/grpc"
  12. myio "gitlink.org.cn/cloudream/utils/io"
  13. "google.golang.org/grpc"
  14. "google.golang.org/grpc/credentials/insecure"
  15. )
  16. type ObjectService struct {
  17. *Service
  18. }
  19. func ObjectSvc(svc *Service) *ObjectService {
  20. return &ObjectService{Service: svc}
  21. }
  22. func (svc *ObjectService) GetObject(userID int, objectID int) (model.Object, error) {
  23. // TODO
  24. panic("not implement yet")
  25. }
  26. func (svc *ObjectService) DownloadObject(userID int, objectID int) (io.ReadCloser, error) {
  27. readResp, err := svc.coordinator.Read(objectID, userID)
  28. if err != nil {
  29. return nil, fmt.Errorf("request to coordinator failed, err: %w", err)
  30. }
  31. if readResp.ErrorCode != errorcode.OK {
  32. return nil, fmt.Errorf("coordinator operation failed, code: %s, message: %s", readResp.ErrorCode, readResp.Message)
  33. }
  34. switch readResp.Redundancy {
  35. case consts.REDUNDANCY_REP:
  36. if len(readResp.NodeIPs) == 0 {
  37. return nil, fmt.Errorf("no node has this file")
  38. }
  39. // 随便选第一个节点下载文件
  40. reader, err := svc.downloadAsRepObject(readResp.NodeIPs[0], readResp.Hashes[0])
  41. if err != nil {
  42. return nil, fmt.Errorf("rep read failed, err: %w", err)
  43. }
  44. return reader, nil
  45. //case consts.REDUNDANCY_EC:
  46. // TODO EC部分的代码要考虑重构
  47. // ecRead(readResp.FileSizeInBytes, readResp.NodeIPs, readResp.Hashes, readResp.BlockIDs, *readResp.ECName)
  48. }
  49. return nil, fmt.Errorf("unsupported redundancy type: %s", readResp.Redundancy)
  50. }
  51. func (svc *ObjectService) downloadAsRepObject(nodeIP string, fileHash string) (io.ReadCloser, error) {
  52. // 连接grpc
  53. grpcAddr := fmt.Sprintf("%s:%d", nodeIP, config.Cfg().GRPCPort)
  54. conn, err := grpc.Dial(grpcAddr, grpc.WithTransportCredentials(insecure.NewCredentials()))
  55. if err != nil {
  56. return nil, fmt.Errorf("connect to grpc server at %s failed, err: %w", grpcAddr, err)
  57. }
  58. defer conn.Close()
  59. /*
  60. TO DO: 判断本地有没有ipfs daemon、能否获取相应对象的cid
  61. 如果本地有ipfs daemon且能获取相应对象的cid,则获取对象cid对应的ipfsblock的cid,通过ipfs网络获取这些ipfsblock
  62. 否则,像目前一样,使用grpc向指定节点获取
  63. */
  64. // 下载文件
  65. client := agentcaller.NewFileTransportClient(conn)
  66. reader, err := mygrpc.GetFileAsStream(client, fileHash)
  67. if err != nil {
  68. return nil, fmt.Errorf("request to get file failed, err: %w", err)
  69. }
  70. return reader, nil
  71. }
  72. func (svc *ObjectService) UploadRepObject(userID int, bucketID int, objectName string, file io.ReadCloser, fileSize int64, repNum int) error {
  73. //发送写请求,请求Coor分配写入节点Ip
  74. repWriteResp, err := svc.coordinator.RepWrite(bucketID, objectName, fileSize, repNum, userID)
  75. if err != nil {
  76. return fmt.Errorf("request to coordinator failed, err: %w", err)
  77. }
  78. if repWriteResp.ErrorCode != errorcode.OK {
  79. return fmt.Errorf("coordinator RepWrite failed, code: %s, message: %s", repWriteResp.ErrorCode, repWriteResp.Message)
  80. }
  81. /*
  82. TO DO ss: 判断本地有没有ipfs daemon、能否与目标agent的ipfs daemon连通、本地ipfs目录空间是否充足
  83. 如果本地有ipfs daemon、能与目标agent的ipfs daemon连通、本地ipfs目录空间充足,将所有内容写入本地ipfs目录,得到对象的cid,发送cid给目标agent让其pin相应的对象
  84. 否则,像目前一样,使用grpc向指定节点获取
  85. */
  86. senders := make([]fileSender, repNum)
  87. // 建立grpc连接,发送请求
  88. svc.startSendFile(repNum, senders, repWriteResp.NodeIDs, repWriteResp.NodeIPs)
  89. // 向每个节点发送数据
  90. err = svc.sendFileData(file, repNum, senders)
  91. if err != nil {
  92. return err
  93. }
  94. // 发送EOF消息,并获得FileHash
  95. svc.sendFinish(repNum, senders)
  96. // 收集发送成功的节点以及返回的hash
  97. var sucNodeIDs []int
  98. var sucFileHashes []string
  99. for i := 0; i < repNum; i++ {
  100. sender := &senders[i]
  101. if sender.err == nil {
  102. sucNodeIDs = append(sucNodeIDs, sender.nodeID)
  103. sucFileHashes = append(sucFileHashes, sender.fileHash)
  104. }
  105. }
  106. // 记录写入的文件的Hash
  107. // TODO 如果一个都没有写成功,那么是否要发送这个请求?
  108. writeRepHashResp, err := svc.coordinator.WriteRepHash(bucketID, objectName, fileSize, repNum, userID, sucNodeIDs, sucFileHashes)
  109. if err != nil {
  110. return fmt.Errorf("request to coordinator failed, err: %w", err)
  111. }
  112. if writeRepHashResp.ErrorCode != errorcode.OK {
  113. return fmt.Errorf("coordinator WriteRepHash failed, code: %s, message: %s", writeRepHashResp.ErrorCode, writeRepHashResp.Message)
  114. }
  115. return nil
  116. }
  117. type fileSender struct {
  118. grpcCon *grpc.ClientConn
  119. stream mygrpc.FileWriteCloser[string]
  120. nodeID int
  121. fileHash string
  122. err error
  123. }
  124. func (svc *ObjectService) startSendFile(numRep int, senders []fileSender, nodeIDs []int, nodeIPs []string) {
  125. for i := 0; i < numRep; i++ {
  126. sender := &senders[i]
  127. sender.nodeID = nodeIDs[i]
  128. grpcAddr := fmt.Sprintf("%s:%d", nodeIPs[i], config.Cfg().GRPCPort)
  129. conn, err := grpc.Dial(grpcAddr, grpc.WithTransportCredentials(insecure.NewCredentials()))
  130. if err != nil {
  131. sender.err = fmt.Errorf("connect to grpc server at %s failed, err: %w", grpcAddr, err)
  132. continue
  133. }
  134. client := agentcaller.NewFileTransportClient(conn)
  135. stream, err := mygrpc.SendFileAsStream(client)
  136. if err != nil {
  137. conn.Close()
  138. sender.err = fmt.Errorf("request to send file failed, err: %w", err)
  139. continue
  140. }
  141. sender.grpcCon = conn
  142. sender.stream = stream
  143. }
  144. }
  145. func (svc *ObjectService) sendFileData(file io.ReadCloser, numRep int, senders []fileSender) error {
  146. // 共用的发送数据缓冲区
  147. buf := make([]byte, 2048)
  148. for {
  149. // 读取文件数据
  150. readCnt, err := file.Read(buf)
  151. // 文件读取完毕
  152. if err == io.EOF {
  153. break
  154. }
  155. if err != nil {
  156. // 读取失败则断开所有连接
  157. for i := 0; i < numRep; i++ {
  158. sender := &senders[i]
  159. if sender.err != nil {
  160. continue
  161. }
  162. sender.stream.Abort(io.ErrClosedPipe)
  163. sender.grpcCon.Close()
  164. sender.err = fmt.Errorf("read file data failed, err: %w", err)
  165. }
  166. return fmt.Errorf("read file data failed, err: %w", err)
  167. }
  168. // 并行的向每个节点发送数据
  169. hasSender := false
  170. var sendWg sync.WaitGroup
  171. for i := 0; i < numRep; i++ {
  172. sender := &senders[i]
  173. // 发生了错误的跳过
  174. if sender.err != nil {
  175. continue
  176. }
  177. hasSender = true
  178. sendWg.Add(1)
  179. go func() {
  180. err := myio.WriteAll(sender.stream, buf[:readCnt])
  181. // 发生错误则关闭连接
  182. if err != nil {
  183. sender.stream.Abort(io.ErrClosedPipe)
  184. sender.grpcCon.Close()
  185. sender.err = fmt.Errorf("send file data failed, err: %w", err)
  186. }
  187. sendWg.Done()
  188. }()
  189. }
  190. // 等待向每个节点发送数据结束
  191. sendWg.Wait()
  192. // 如果所有节点都发送失败,则不要再继续读取文件数据了
  193. if !hasSender {
  194. break
  195. }
  196. }
  197. return nil
  198. }
  199. func (svc *ObjectService) sendFinish(numRep int, senders []fileSender) {
  200. for i := 0; i < numRep; i++ {
  201. sender := &senders[i]
  202. // 发生了错误的跳过
  203. if sender.err != nil {
  204. continue
  205. }
  206. fileHash, err := sender.stream.Finish()
  207. if err != nil {
  208. sender.err = err
  209. sender.stream.Abort(io.ErrClosedPipe)
  210. sender.grpcCon.Close()
  211. continue
  212. }
  213. sender.fileHash = fileHash
  214. sender.err = err
  215. sender.grpcCon.Close()
  216. }
  217. }
  218. func (svc *ObjectService) UploadECObject(userID int, file io.ReadCloser, fileSize int64, ecName string) error {
  219. // TODO
  220. panic("not implement yet")
  221. }
  222. func (svc *ObjectService) Delete(userID int, objectID int) error {
  223. // TODO
  224. panic("not implement yet")
  225. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。