You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

ops.go 6.4 kB

2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270
  1. package ops
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "sync"
  7. "gitlink.org.cn/cloudream/common/pkgs/future"
  8. "gitlink.org.cn/cloudream/common/pkgs/logger"
  9. "gitlink.org.cn/cloudream/common/pkgs/types"
  10. myio "gitlink.org.cn/cloudream/common/utils/io"
  11. mymath "gitlink.org.cn/cloudream/common/utils/math"
  12. "gitlink.org.cn/cloudream/common/utils/serder"
  13. stgglb "gitlink.org.cn/cloudream/storage/common/globals"
  14. stgmod "gitlink.org.cn/cloudream/storage/common/models"
  15. "gitlink.org.cn/cloudream/storage/common/pkgs/db/model"
  16. "gitlink.org.cn/cloudream/storage/common/pkgs/ec"
  17. "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch"
  18. )
  19. var _ = serder.UseTypeUnionExternallyTagged(types.Ref(types.NewTypeUnion[ioswitch.Op](
  20. (*IPFSRead)(nil),
  21. (*IPFSWrite)(nil),
  22. (*GRPCSend)(nil),
  23. (*GRPCFetch)(nil),
  24. (*ECCompute)(nil),
  25. (*Combine)(nil),
  26. )))
  27. type IPFSRead struct {
  28. Output ioswitch.StreamID `json:"output"`
  29. FileHash string `json:"fileHash"`
  30. }
  31. func (o *IPFSRead) Execute(sw *ioswitch.Switch, planID ioswitch.PlanID) error {
  32. logger.
  33. WithField("FileHash", o.FileHash).
  34. WithField("Output", o.Output).
  35. Debugf("ipfs read op")
  36. defer logger.Debugf("ipfs read op finished")
  37. ipfsCli, err := stgglb.IPFSPool.Acquire()
  38. if err != nil {
  39. return fmt.Errorf("new ipfs client: %w", err)
  40. }
  41. defer stgglb.IPFSPool.Release(ipfsCli)
  42. file, err := ipfsCli.OpenRead(o.FileHash)
  43. if err != nil {
  44. return fmt.Errorf("reading ipfs: %w", err)
  45. }
  46. fut := future.NewSetVoid()
  47. file = myio.AfterReadClosed(file, func(closer io.ReadCloser) {
  48. fut.SetVoid()
  49. })
  50. sw.StreamReady(planID, ioswitch.NewStream(o.Output, file))
  51. // TODO context
  52. fut.Wait(context.TODO())
  53. return nil
  54. }
  55. type IPFSWrite struct {
  56. Input ioswitch.StreamID `json:"input"`
  57. ResultKey string `json:"resultKey"`
  58. }
  59. func (o *IPFSWrite) Execute(sw *ioswitch.Switch, planID ioswitch.PlanID) error {
  60. logger.
  61. WithField("ResultKey", o.ResultKey).
  62. WithField("Input", o.Input).
  63. Debugf("ipfs write op")
  64. ipfsCli, err := stgglb.IPFSPool.Acquire()
  65. if err != nil {
  66. return fmt.Errorf("new ipfs client: %w", err)
  67. }
  68. defer stgglb.IPFSPool.Release(ipfsCli)
  69. strs, err := sw.WaitStreams(planID, o.Input)
  70. if err != nil {
  71. return err
  72. }
  73. defer strs[0].Stream.Close()
  74. fileHash, err := ipfsCli.CreateFile(strs[0].Stream)
  75. if err != nil {
  76. return fmt.Errorf("creating ipfs file: %w", err)
  77. }
  78. if o.ResultKey != "" {
  79. sw.AddResultValue(planID, ioswitch.ResultKV{
  80. Key: o.ResultKey,
  81. Value: fileHash,
  82. })
  83. }
  84. return nil
  85. }
  86. type GRPCSend struct {
  87. StreamID ioswitch.StreamID `json:"streamID"`
  88. Node model.Node `json:"node"`
  89. }
  90. func (o *GRPCSend) Execute(sw *ioswitch.Switch, planID ioswitch.PlanID) error {
  91. logger.
  92. WithField("ioswitch.StreamID", o.StreamID).
  93. Debugf("grpc send")
  94. strs, err := sw.WaitStreams(planID, o.StreamID)
  95. if err != nil {
  96. return err
  97. }
  98. defer strs[0].Stream.Close()
  99. // TODO 根据客户端地址选择IP和端口
  100. agtCli, err := stgglb.AgentRPCPool.Acquire(o.Node.ExternalIP, o.Node.ExternalGRPCPort)
  101. if err != nil {
  102. return fmt.Errorf("new agent rpc client: %w", err)
  103. }
  104. defer stgglb.AgentRPCPool.Release(agtCli)
  105. err = agtCli.SendStream(planID, o.StreamID, strs[0].Stream)
  106. if err != nil {
  107. return fmt.Errorf("sending stream: %w", err)
  108. }
  109. return nil
  110. }
  111. type GRPCFetch struct {
  112. StreamID ioswitch.StreamID `json:"streamID"`
  113. Node model.Node `json:"node"`
  114. }
  115. func (o *GRPCFetch) Execute(sw *ioswitch.Switch, planID ioswitch.PlanID) error {
  116. // TODO 根据客户端地址选择IP和端口
  117. agtCli, err := stgglb.AgentRPCPool.Acquire(o.Node.ExternalIP, o.Node.ExternalGRPCPort)
  118. if err != nil {
  119. return fmt.Errorf("new agent rpc client: %w", err)
  120. }
  121. defer stgglb.AgentRPCPool.Release(agtCli)
  122. str, err := agtCli.FetchStream(planID, o.StreamID)
  123. if err != nil {
  124. return fmt.Errorf("fetching stream: %w", err)
  125. }
  126. fut := future.NewSetVoid()
  127. str = myio.AfterReadClosed(str, func(closer io.ReadCloser) {
  128. fut.SetVoid()
  129. })
  130. sw.StreamReady(planID, ioswitch.NewStream(o.StreamID, str))
  131. // TODO
  132. fut.Wait(context.TODO())
  133. return err
  134. }
  135. type ECCompute struct {
  136. EC stgmod.EC `json:"ec"`
  137. InputIDs []ioswitch.StreamID `json:"inputIDs"`
  138. OutputIDs []ioswitch.StreamID `json:"outputIDs"`
  139. InputBlockIndexes []int `json:"inputBlockIndexes"`
  140. OutputBlockIndexes []int `json:"outputBlockIndexes"`
  141. }
  142. func (o *ECCompute) Execute(sw *ioswitch.Switch, planID ioswitch.PlanID) error {
  143. rs, err := ec.NewRs(o.EC.K, o.EC.N, o.EC.ChunkSize)
  144. if err != nil {
  145. return fmt.Errorf("new ec: %w", err)
  146. }
  147. strs, err := sw.WaitStreams(planID, o.InputIDs...)
  148. if err != nil {
  149. return err
  150. }
  151. defer func() {
  152. for _, s := range strs {
  153. s.Stream.Close()
  154. }
  155. }()
  156. var inputs []io.Reader
  157. for _, s := range strs {
  158. inputs = append(inputs, s.Stream)
  159. }
  160. outputs, err := rs.ReconstructSome(inputs, o.InputBlockIndexes, o.OutputBlockIndexes)
  161. if err != nil {
  162. return fmt.Errorf("reconstructing: %w", err)
  163. }
  164. wg := sync.WaitGroup{}
  165. for i, id := range o.OutputIDs {
  166. wg.Add(1)
  167. sw.StreamReady(planID, ioswitch.NewStream(id, myio.AfterReadClosed(outputs[i], func(closer io.ReadCloser) {
  168. wg.Done()
  169. })))
  170. }
  171. wg.Wait()
  172. return nil
  173. }
  174. type Combine struct {
  175. InputIDs []ioswitch.StreamID `json:"inputIDs"`
  176. OutputID ioswitch.StreamID `json:"outputID"`
  177. Length int64 `json:"length"`
  178. }
  179. func (o *Combine) Execute(sw *ioswitch.Switch, planID ioswitch.PlanID) error {
  180. strs, err := sw.WaitStreams(planID, o.InputIDs...)
  181. if err != nil {
  182. return err
  183. }
  184. defer func() {
  185. for _, str := range strs {
  186. str.Stream.Close()
  187. }
  188. }()
  189. length := o.Length
  190. pr, pw := io.Pipe()
  191. sw.StreamReady(planID, ioswitch.NewStream(o.OutputID, pr))
  192. buf := make([]byte, 4096)
  193. for _, str := range strs {
  194. for {
  195. bufLen := mymath.Min(length, int64(len(buf)))
  196. if bufLen == 0 {
  197. return nil
  198. }
  199. rd, err := str.Stream.Read(buf[:bufLen])
  200. if err != nil {
  201. if err != io.EOF {
  202. return err
  203. }
  204. length -= int64(rd)
  205. err = myio.WriteAll(pw, buf[:rd])
  206. if err != nil {
  207. return err
  208. }
  209. break
  210. }
  211. length -= int64(rd)
  212. err = myio.WriteAll(pw, buf[:rd])
  213. if err != nil {
  214. return err
  215. }
  216. }
  217. }
  218. if length > 0 {
  219. return fmt.Errorf("want %d bytes, but only get %d bytes", o.Length, o.Length-length)
  220. }
  221. return nil
  222. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。