You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

ops.go 5.7 kB

2 years ago
2 years ago
2 years ago
2 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233
  1. package ops
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "sync"
  7. "gitlink.org.cn/cloudream/common/pkgs/future"
  8. "gitlink.org.cn/cloudream/common/pkgs/logger"
  9. "gitlink.org.cn/cloudream/common/pkgs/types"
  10. myio "gitlink.org.cn/cloudream/common/utils/io"
  11. "gitlink.org.cn/cloudream/common/utils/serder"
  12. stgglb "gitlink.org.cn/cloudream/storage/common/globals"
  13. stgmod "gitlink.org.cn/cloudream/storage/common/models"
  14. "gitlink.org.cn/cloudream/storage/common/pkgs/db/model"
  15. "gitlink.org.cn/cloudream/storage/common/pkgs/ec"
  16. "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch"
  17. )
  18. var _ = serder.UseTypeUnionExternallyTagged(types.Ref(types.NewTypeUnion[ioswitch.Op](
  19. (*IPFSRead)(nil),
  20. (*IPFSWrite)(nil),
  21. (*GRPCSend)(nil),
  22. (*GRPCFetch)(nil),
  23. (*ECCompute)(nil),
  24. (*Combine)(nil),
  25. )))
  26. type IPFSRead struct {
  27. Output ioswitch.StreamID `json:"output"`
  28. FileHash string `json:"fileHash"`
  29. }
  30. func (o *IPFSRead) Execute(sw *ioswitch.Switch, planID ioswitch.PlanID) error {
  31. logger.
  32. WithField("FileHash", o.FileHash).
  33. WithField("Output", o.Output).
  34. Debugf("ipfs read op")
  35. defer logger.Debugf("ipfs read op finished")
  36. ipfsCli, err := stgglb.IPFSPool.Acquire()
  37. if err != nil {
  38. return fmt.Errorf("new ipfs client: %w", err)
  39. }
  40. defer stgglb.IPFSPool.Release(ipfsCli)
  41. file, err := ipfsCli.OpenRead(o.FileHash)
  42. if err != nil {
  43. return fmt.Errorf("reading ipfs: %w", err)
  44. }
  45. fut := future.NewSetVoid()
  46. file = myio.AfterReadClosed(file, func(closer io.ReadCloser) {
  47. fut.SetVoid()
  48. })
  49. sw.StreamReady(planID, ioswitch.NewStream(o.Output, file))
  50. // TODO context
  51. fut.Wait(context.TODO())
  52. return nil
  53. }
  54. type IPFSWrite struct {
  55. Input ioswitch.StreamID `json:"input"`
  56. ResultKey string `json:"resultKey"`
  57. }
  58. func (o *IPFSWrite) Execute(sw *ioswitch.Switch, planID ioswitch.PlanID) error {
  59. logger.
  60. WithField("ResultKey", o.ResultKey).
  61. WithField("Input", o.Input).
  62. Debugf("ipfs write op")
  63. ipfsCli, err := stgglb.IPFSPool.Acquire()
  64. if err != nil {
  65. return fmt.Errorf("new ipfs client: %w", err)
  66. }
  67. defer stgglb.IPFSPool.Release(ipfsCli)
  68. strs, err := sw.WaitStreams(planID, o.Input)
  69. if err != nil {
  70. return err
  71. }
  72. defer strs[0].Stream.Close()
  73. fileHash, err := ipfsCli.CreateFile(strs[0].Stream)
  74. if err != nil {
  75. return fmt.Errorf("creating ipfs file: %w", err)
  76. }
  77. if o.ResultKey != "" {
  78. sw.AddResultValue(planID, ioswitch.ResultKV{
  79. Key: o.ResultKey,
  80. Value: fileHash,
  81. })
  82. }
  83. return nil
  84. }
  85. type GRPCSend struct {
  86. StreamID ioswitch.StreamID `json:"streamID"`
  87. Node model.Node `json:"node"`
  88. }
  89. func (o *GRPCSend) Execute(sw *ioswitch.Switch, planID ioswitch.PlanID) error {
  90. logger.
  91. WithField("ioswitch.StreamID", o.StreamID).
  92. Debugf("grpc send")
  93. strs, err := sw.WaitStreams(planID, o.StreamID)
  94. if err != nil {
  95. return err
  96. }
  97. defer strs[0].Stream.Close()
  98. // TODO 根据客户端地址选择IP和端口
  99. agtCli, err := stgglb.AgentRPCPool.Acquire(o.Node.ExternalIP, o.Node.ExternalGRPCPort)
  100. if err != nil {
  101. return fmt.Errorf("new agent rpc client: %w", err)
  102. }
  103. defer stgglb.AgentRPCPool.Release(agtCli)
  104. err = agtCli.SendStream(planID, o.StreamID, strs[0].Stream)
  105. if err != nil {
  106. return fmt.Errorf("sending stream: %w", err)
  107. }
  108. return nil
  109. }
  110. type GRPCFetch struct {
  111. StreamID ioswitch.StreamID `json:"streamID"`
  112. Node model.Node `json:"node"`
  113. }
  114. func (o *GRPCFetch) Execute(sw *ioswitch.Switch, planID ioswitch.PlanID) error {
  115. // TODO 根据客户端地址选择IP和端口
  116. agtCli, err := stgglb.AgentRPCPool.Acquire(o.Node.ExternalIP, o.Node.ExternalGRPCPort)
  117. if err != nil {
  118. return fmt.Errorf("new agent rpc client: %w", err)
  119. }
  120. defer stgglb.AgentRPCPool.Release(agtCli)
  121. str, err := agtCli.FetchStream(planID, o.StreamID)
  122. if err != nil {
  123. return fmt.Errorf("fetching stream: %w", err)
  124. }
  125. fut := future.NewSetVoid()
  126. str = myio.AfterReadClosed(str, func(closer io.ReadCloser) {
  127. fut.SetVoid()
  128. })
  129. sw.StreamReady(planID, ioswitch.NewStream(o.StreamID, str))
  130. // TODO
  131. fut.Wait(context.TODO())
  132. return err
  133. }
  134. type ECCompute struct {
  135. EC stgmod.EC `json:"ec"`
  136. InputIDs []ioswitch.StreamID `json:"inputIDs"`
  137. OutputIDs []ioswitch.StreamID `json:"outputIDs"`
  138. InputBlockIndexes []int `json:"inputBlockIndexes"`
  139. OutputBlockIndexes []int `json:"outputBlockIndexes"`
  140. }
  141. func (o *ECCompute) Execute(sw *ioswitch.Switch, planID ioswitch.PlanID) error {
  142. rs, err := ec.NewRs(o.EC.K, o.EC.N, o.EC.ChunkSize)
  143. if err != nil {
  144. return fmt.Errorf("new ec: %w", err)
  145. }
  146. strs, err := sw.WaitStreams(planID, o.InputIDs...)
  147. if err != nil {
  148. return err
  149. }
  150. defer func() {
  151. for _, s := range strs {
  152. s.Stream.Close()
  153. }
  154. }()
  155. var inputs []io.ReadCloser
  156. for _, s := range strs {
  157. inputs = append(inputs, s.Stream)
  158. }
  159. outputs, err := rs.ReconstructSome(inputs, o.InputBlockIndexes, o.OutputBlockIndexes)
  160. if err != nil {
  161. return fmt.Errorf("reconstructing: %w", err)
  162. }
  163. wg := sync.WaitGroup{}
  164. for i, id := range o.OutputIDs {
  165. wg.Add(1)
  166. sw.StreamReady(planID, ioswitch.NewStream(id, myio.AfterReadClosed(outputs[i], func(closer io.ReadCloser) {
  167. wg.Done()
  168. })))
  169. }
  170. wg.Wait()
  171. return nil
  172. }
  173. type Combine struct {
  174. InputIDs []ioswitch.StreamID `json:"inputIDs"`
  175. OutputID ioswitch.StreamID `json:"outputID"`
  176. }
  177. func (o *Combine) Execute(sw *ioswitch.Switch, planID ioswitch.PlanID) error {
  178. strs, err := sw.WaitStreams(planID, o.InputIDs...)
  179. if err != nil {
  180. return err
  181. }
  182. pr, pw := io.Pipe()
  183. sw.StreamReady(planID, ioswitch.NewStream(o.OutputID, pr))
  184. for _, str := range strs {
  185. _, err := io.Copy(pw, str.Stream)
  186. if err != nil {
  187. return err
  188. }
  189. }
  190. return nil
  191. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。