You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

upload_objects.go 11 kB

2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339
  1. package cmd
  2. import (
  3. "fmt"
  4. "io"
  5. "math"
  6. "math/rand"
  7. "time"
  8. "github.com/samber/lo"
  9. "gitlink.org.cn/cloudream/common/pkgs/distlock"
  10. "gitlink.org.cn/cloudream/common/pkgs/logger"
  11. cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
  12. "gitlink.org.cn/cloudream/common/utils/sort2"
  13. stgglb "gitlink.org.cn/cloudream/storage/common/globals"
  14. "gitlink.org.cn/cloudream/storage/common/pkgs/connectivity"
  15. "gitlink.org.cn/cloudream/storage/common/pkgs/distlock/reqbuilder"
  16. "gitlink.org.cn/cloudream/storage/common/pkgs/iterator"
  17. agtmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/agent"
  18. coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator"
  19. )
  20. // UploadObjects 上传对象的结构体,包含上传所需的用户ID、包ID、对象迭代器和节点亲和性信息。
  21. type UploadObjects struct {
  22. userID cdssdk.UserID
  23. packageID cdssdk.PackageID
  24. objectIter iterator.UploadingObjectIterator
  25. nodeAffinity *cdssdk.NodeID
  26. }
  27. // UploadObjectsResult 上传对象结果的结构体,包含上传结果的数组。
  28. type UploadObjectsResult struct {
  29. Objects []ObjectUploadResult
  30. }
  31. // ObjectUploadResult 单个对象上传结果的结构体,包含上传信息、错误和对象ID。
  32. type ObjectUploadResult struct {
  33. Info *iterator.IterUploadingObject
  34. Error error
  35. Object cdssdk.Object
  36. }
  37. // UploadNodeInfo 上传节点信息的结构体,包含节点信息、延迟、是否与客户端在同一位置。
  38. type UploadNodeInfo struct {
  39. Node cdssdk.Node
  40. Delay time.Duration
  41. IsSameLocation bool
  42. }
  43. // UploadObjectsContext 上传对象上下文的结构体,包含分布式锁服务和连通性收集器。
  44. type UploadObjectsContext struct {
  45. Distlock *distlock.Service
  46. Connectivity *connectivity.Collector
  47. }
  48. // NewUploadObjects 创建一个新的UploadObjects实例。
  49. func NewUploadObjects(userID cdssdk.UserID, packageID cdssdk.PackageID, objIter iterator.UploadingObjectIterator, nodeAffinity *cdssdk.NodeID) *UploadObjects {
  50. return &UploadObjects{
  51. userID: userID,
  52. packageID: packageID,
  53. objectIter: objIter,
  54. nodeAffinity: nodeAffinity,
  55. }
  56. }
  57. // Execute 执行上传对象的操作。
  58. func (t *UploadObjects) Execute(ctx *UploadObjectsContext) (*UploadObjectsResult, error) {
  59. defer t.objectIter.Close()
  60. // 获取协调器客户端
  61. coorCli, err := stgglb.CoordinatorMQPool.Acquire()
  62. if err != nil {
  63. return nil, fmt.Errorf("new coordinator client: %w", err)
  64. }
  65. // 获取用户节点信息
  66. getUserNodesResp, err := coorCli.GetUserNodes(coormq.NewGetUserNodes(t.userID))
  67. if err != nil {
  68. return nil, fmt.Errorf("getting user nodes: %w", err)
  69. }
  70. // 获取节点连通性信息
  71. cons := ctx.Connectivity.GetAll()
  72. userNodes := lo.Map(getUserNodesResp.Nodes, func(node cdssdk.Node, index int) UploadNodeInfo {
  73. delay := time.Duration(math.MaxInt64)
  74. con, ok := cons[node.NodeID]
  75. if ok && con.Delay != nil {
  76. delay = *con.Delay
  77. }
  78. return UploadNodeInfo{
  79. Node: node,
  80. Delay: delay,
  81. IsSameLocation: node.LocationID == stgglb.Local.LocationID,
  82. }
  83. })
  84. if len(userNodes) == 0 {
  85. return nil, fmt.Errorf("user no available nodes")
  86. }
  87. // 对上传节点的IPFS加锁
  88. ipfsReqBlder := reqbuilder.NewBuilder()
  89. if stgglb.Local.NodeID != nil {
  90. ipfsReqBlder.IPFS().Buzy(*stgglb.Local.NodeID)
  91. }
  92. for _, node := range userNodes {
  93. if stgglb.Local.NodeID != nil && node.Node.NodeID == *stgglb.Local.NodeID {
  94. continue
  95. }
  96. ipfsReqBlder.IPFS().Buzy(node.Node.NodeID)
  97. }
  98. // 获得IPFS锁
  99. ipfsMutex, err := ipfsReqBlder.MutexLock(ctx.Distlock)
  100. if err != nil {
  101. return nil, fmt.Errorf("acquire locks failed, err: %w", err)
  102. }
  103. defer ipfsMutex.Unlock()
  104. // 上传并更新包信息
  105. rets, err := uploadAndUpdatePackage(t.packageID, t.objectIter, userNodes, t.nodeAffinity)
  106. if err != nil {
  107. return nil, err
  108. }
  109. return &UploadObjectsResult{
  110. Objects: rets,
  111. }, nil
  112. }
  113. // chooseUploadNode 选择一个上传文件的节点。
  114. // 首先选择设置了亲和性的节点,然后从与当前客户端相同地域的节点中随机选择一个,最后选择延迟最低的节点。
  115. func chooseUploadNode(nodes []UploadNodeInfo, nodeAffinity *cdssdk.NodeID) UploadNodeInfo {
  116. if nodeAffinity != nil {
  117. aff, ok := lo.Find(nodes, func(node UploadNodeInfo) bool { return node.Node.NodeID == *nodeAffinity })
  118. if ok {
  119. return aff
  120. }
  121. }
  122. sameLocationNodes := lo.Filter(nodes, func(e UploadNodeInfo, i int) bool { return e.IsSameLocation })
  123. if len(sameLocationNodes) > 0 {
  124. return sameLocationNodes[rand.Intn(len(sameLocationNodes))]
  125. }
  126. // 选择延迟最低的节点
  127. nodes = sort2.Sort(nodes, func(e1, e2 UploadNodeInfo) int { return sort2.Cmp(e1.Delay, e2.Delay) })
  128. return nodes[0]
  129. }
  130. // uploadAndUpdatePackage 上传文件并更新包信息。
  131. // packageID:标识待更新的包的ID。
  132. // objectIter:提供上传对象迭代器,用于遍历上传的文件。
  133. // userNodes:用户可选的上传节点信息列表。
  134. // nodeAffinity:用户首选的上传节点。
  135. // 返回值:上传结果列表和错误信息。
  136. func uploadAndUpdatePackage(packageID cdssdk.PackageID, objectIter iterator.UploadingObjectIterator, userNodes []UploadNodeInfo, nodeAffinity *cdssdk.NodeID) ([]ObjectUploadResult, error) {
  137. // 获取协调器客户端
  138. coorCli, err := stgglb.CoordinatorMQPool.Acquire()
  139. if err != nil {
  140. return nil, fmt.Errorf("new coordinator client: %w", err)
  141. }
  142. defer stgglb.CoordinatorMQPool.Release(coorCli)
  143. // 选择上传节点
  144. uploadNode := chooseUploadNode(userNodes, nodeAffinity)
  145. var uploadRets []ObjectUploadResult
  146. // 构建添加对象的列表
  147. var adds []coormq.AddObjectEntry
  148. for {
  149. // 获取下一个对象信息。如果不存在更多对象,则退出循环。
  150. objInfo, err := objectIter.MoveNext()
  151. if err == iterator.ErrNoMoreItem {
  152. break
  153. }
  154. if err != nil {
  155. // 对象获取发生错误,返回错误信息。
  156. return nil, fmt.Errorf("reading object: %w", err)
  157. }
  158. // 执行上传逻辑,每个对象依次执行。
  159. err = func() error {
  160. // 确保对象文件在函数退出时关闭。
  161. defer objInfo.File.Close()
  162. // 记录上传开始时间。
  163. uploadTime := time.Now()
  164. // 上传文件,并获取文件哈希值。
  165. fileHash, err := uploadFile(objInfo.File, uploadNode)
  166. if err != nil {
  167. // 文件上传失败,记录错误信息并返回。
  168. return fmt.Errorf("uploading file: %w", err)
  169. }
  170. // 收集上传结果。
  171. uploadRets = append(uploadRets, ObjectUploadResult{
  172. Info: objInfo,
  173. Error: err,
  174. })
  175. // 准备添加到队列的条目,以供后续处理。
  176. adds = append(adds, coormq.NewAddObjectEntry(objInfo.Path, objInfo.Size, fileHash, uploadTime, uploadNode.Node.NodeID))
  177. return nil
  178. }()
  179. if err != nil {
  180. // 上传操作中出现错误,返回错误信息。
  181. return nil, err
  182. }
  183. }
  184. updateResp, err := coorCli.UpdatePackage(coormq.NewUpdatePackage(packageID, adds, nil))
  185. if err != nil {
  186. return nil, fmt.Errorf("updating package: %w", err)
  187. }
  188. updatedObjs := make(map[string]*cdssdk.Object)
  189. for _, obj := range updateResp.Added {
  190. o := obj
  191. updatedObjs[obj.Path] = &o
  192. }
  193. for i := range uploadRets {
  194. obj := updatedObjs[uploadRets[i].Info.Path]
  195. if obj == nil {
  196. uploadRets[i].Error = fmt.Errorf("object %s not found in package", uploadRets[i].Info.Path)
  197. continue
  198. }
  199. uploadRets[i].Object = *obj
  200. }
  201. return uploadRets, nil
  202. }
  203. // uploadFile 上传文件。
  204. // file:待上传的文件流。
  205. // uploadNode:指定的上传节点信息。
  206. // 返回值:文件哈希和错误信息。
  207. func uploadFile(file io.Reader, uploadNode UploadNodeInfo) (string, error) {
  208. // 尝试使用本地IPFS上传
  209. if stgglb.IPFSPool != nil {
  210. logger.Infof("try to use local IPFS to upload file")
  211. fileHash, err := uploadToLocalIPFS(file, uploadNode.Node.NodeID, stgglb.Local.NodeID == nil)
  212. if err == nil {
  213. return fileHash, nil
  214. } else {
  215. logger.Warnf("upload to local IPFS failed, so try to upload to node %d, err: %s", uploadNode.Node.NodeID, err.Error())
  216. }
  217. }
  218. // 否则,发送到agent进行上传
  219. nodeIP := uploadNode.Node.ExternalIP
  220. grpcPort := uploadNode.Node.ExternalGRPCPort
  221. if uploadNode.IsSameLocation {
  222. nodeIP = uploadNode.Node.LocalIP
  223. grpcPort = uploadNode.Node.LocalGRPCPort
  224. logger.Infof("client and node %d are at the same location, use local ip", uploadNode.Node.NodeID)
  225. }
  226. fileHash, err := uploadToNode(file, nodeIP, grpcPort)
  227. if err != nil {
  228. return "", fmt.Errorf("upload to node %s failed, err: %w", nodeIP, err)
  229. }
  230. return fileHash, nil
  231. }
  232. // uploadToNode 发送文件到指定的节点。
  233. // file:文件流。
  234. // nodeIP:节点的IP地址。
  235. // grpcPort:节点的gRPC端口。
  236. // 返回值:文件哈希和错误信息。
  237. func uploadToNode(file io.Reader, nodeIP string, grpcPort int) (string, error) {
  238. rpcCli, err := stgglb.AgentRPCPool.Acquire(nodeIP, grpcPort)
  239. if err != nil {
  240. return "", fmt.Errorf("new agent rpc client: %w", err)
  241. }
  242. defer rpcCli.Close()
  243. return rpcCli.SendIPFSFile(file)
  244. }
  245. // uploadToLocalIPFS 将文件上传到本地的IPFS节点,并根据需要将文件固定(pin)在节点上。
  246. // file: 要上传的文件,作为io.Reader提供。
  247. // nodeID: 指定上传到的IPFS节点的ID。
  248. // shouldPin: 指示是否在IPFS节点上固定(pin)上传的文件。如果为true,则文件会被固定,否则不会。
  249. // 返回上传文件的IPFS哈希值和可能出现的错误。
  250. func uploadToLocalIPFS(file io.Reader, nodeID cdssdk.NodeID, shouldPin bool) (string, error) {
  251. // 从IPFS池获取一个IPFS客户端实例
  252. ipfsCli, err := stgglb.IPFSPool.Acquire()
  253. if err != nil {
  254. return "", fmt.Errorf("new ipfs client: %w", err)
  255. }
  256. defer ipfsCli.Close() // 确保IPFS客户端在函数返回前被释放
  257. // 在IPFS上创建文件并获取其哈希值
  258. fileHash, err := ipfsCli.CreateFile(file)
  259. if err != nil {
  260. return "", fmt.Errorf("creating ipfs file: %w", err)
  261. }
  262. // 如果不需要固定文件,则直接返回文件哈希值
  263. if !shouldPin {
  264. return fileHash, nil
  265. }
  266. // 将文件固定在IPFS节点上
  267. err = pinIPFSFile(nodeID, fileHash)
  268. if err != nil {
  269. return "", err
  270. }
  271. return fileHash, nil
  272. }
  273. // pinIPFSFile 将文件Pin到IPFS节点。
  274. // nodeID:节点ID。
  275. // fileHash:文件哈希。
  276. // 返回值:错误信息。
  277. func pinIPFSFile(nodeID cdssdk.NodeID, fileHash string) error {
  278. agtCli, err := stgglb.AgentMQPool.Acquire(nodeID)
  279. if err != nil {
  280. return fmt.Errorf("new agent client: %w", err)
  281. }
  282. defer stgglb.AgentMQPool.Release(agtCli)
  283. _, err = agtCli.PinObject(agtmq.ReqPinObject([]string{fileHash}, false))
  284. if err != nil {
  285. return fmt.Errorf("start pinning object: %w", err)
  286. }
  287. return nil
  288. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。