You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

check_package_redundancy.go 20 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672
  1. package event
  2. import (
  3. "context"
  4. "fmt"
  5. "strconv"
  6. "time"
  7. "github.com/samber/lo"
  8. "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
  9. "gitlink.org.cn/cloudream/common/pkgs/logger"
  10. cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
  11. "gitlink.org.cn/cloudream/common/utils/sort2"
  12. stgglb "gitlink.org.cn/cloudream/storage/common/globals"
  13. stgmod "gitlink.org.cn/cloudream/storage/common/models"
  14. "gitlink.org.cn/cloudream/storage/common/pkgs/distlock/reqbuilder"
  15. "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2"
  16. "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2/parser"
  17. agtmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/agent"
  18. coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator"
  19. scevt "gitlink.org.cn/cloudream/storage/common/pkgs/mq/scanner/event"
  20. "gitlink.org.cn/cloudream/storage/scanner/internal/config"
  21. )
  22. const (
  23. monthHours = 30 * 24
  24. yearHours = 365 * 24
  25. )
  26. type CheckPackageRedundancy struct {
  27. *scevt.CheckPackageRedundancy
  28. }
  29. func NewCheckPackageRedundancy(evt *scevt.CheckPackageRedundancy) *CheckPackageRedundancy {
  30. return &CheckPackageRedundancy{
  31. CheckPackageRedundancy: evt,
  32. }
  33. }
  34. type NodeLoadInfo struct {
  35. Node cdssdk.Node
  36. LoadsRecentMonth int
  37. LoadsRecentYear int
  38. }
  39. func (t *CheckPackageRedundancy) TryMerge(other Event) bool {
  40. event, ok := other.(*CheckPackageRedundancy)
  41. if !ok {
  42. return false
  43. }
  44. return event.PackageID == t.PackageID
  45. }
  46. func (t *CheckPackageRedundancy) Execute(execCtx ExecuteContext) {
  47. log := logger.WithType[CheckPackageRedundancy]("Event")
  48. startTime := time.Now()
  49. log.Debugf("begin with %v", logger.FormatStruct(t.CheckPackageRedundancy))
  50. defer func() {
  51. log.Debugf("end, time: %v", time.Since(startTime))
  52. }()
  53. coorCli, err := stgglb.CoordinatorMQPool.Acquire()
  54. if err != nil {
  55. log.Warnf("new coordinator client: %s", err.Error())
  56. return
  57. }
  58. defer stgglb.CoordinatorMQPool.Release(coorCli)
  59. getObjs, err := coorCli.GetPackageObjectDetails(coormq.ReqGetPackageObjectDetails(t.PackageID))
  60. if err != nil {
  61. log.Warnf("getting package objects: %s", err.Error())
  62. return
  63. }
  64. getLogs, err := coorCli.GetPackageLoadLogDetails(coormq.ReqGetPackageLoadLogDetails(t.PackageID))
  65. if err != nil {
  66. log.Warnf("getting package load log details: %s", err.Error())
  67. return
  68. }
  69. // TODO UserID
  70. getNodes, err := coorCli.GetUserNodes(coormq.NewGetUserNodes(1))
  71. if err != nil {
  72. log.Warnf("getting all nodes: %s", err.Error())
  73. return
  74. }
  75. if len(getNodes.Nodes) == 0 {
  76. log.Warnf("no available nodes")
  77. return
  78. }
  79. allNodes := make(map[cdssdk.NodeID]*NodeLoadInfo)
  80. for _, node := range getNodes.Nodes {
  81. allNodes[node.NodeID] = &NodeLoadInfo{
  82. Node: node,
  83. }
  84. }
  85. for _, log := range getLogs.Logs {
  86. info, ok := allNodes[log.Storage.NodeID]
  87. if !ok {
  88. continue
  89. }
  90. sinceNow := time.Since(log.CreateTime)
  91. if sinceNow.Hours() < monthHours {
  92. info.LoadsRecentMonth++
  93. } else if sinceNow.Hours() < yearHours {
  94. info.LoadsRecentYear++
  95. }
  96. }
  97. var changedObjects []coormq.UpdatingObjectRedundancy
  98. defRep := cdssdk.DefaultRepRedundancy
  99. defEC := cdssdk.DefaultECRedundancy
  100. // TODO 目前rep的备份数量固定为2,所以这里直接选出两个节点
  101. mostBlockNodeIDs := t.summaryRepObjectBlockNodes(getObjs.Objects, 2)
  102. newRepNodes := t.chooseNewNodesForRep(&defRep, allNodes)
  103. rechoosedRepNodes := t.rechooseNodesForRep(mostBlockNodeIDs, &defRep, allNodes)
  104. newECNodes := t.chooseNewNodesForEC(&defEC, allNodes)
  105. // 加锁
  106. builder := reqbuilder.NewBuilder()
  107. for _, node := range newRepNodes {
  108. builder.IPFS().Buzy(node.Node.NodeID)
  109. }
  110. for _, node := range newECNodes {
  111. builder.IPFS().Buzy(node.Node.NodeID)
  112. }
  113. mutex, err := builder.MutexLock(execCtx.Args.DistLock)
  114. if err != nil {
  115. log.Warnf("acquiring dist lock: %s", err.Error())
  116. return
  117. }
  118. defer mutex.Unlock()
  119. for _, obj := range getObjs.Objects {
  120. var updating *coormq.UpdatingObjectRedundancy
  121. var err error
  122. shouldUseEC := obj.Object.Size > config.Cfg().ECFileSizeThreshold
  123. switch red := obj.Object.Redundancy.(type) {
  124. case *cdssdk.NoneRedundancy:
  125. if shouldUseEC {
  126. log.WithField("ObjectID", obj.Object.ObjectID).Debugf("redundancy: none -> ec")
  127. updating, err = t.noneToEC(obj, &defEC, newECNodes)
  128. } else {
  129. log.WithField("ObjectID", obj.Object.ObjectID).Debugf("redundancy: none -> rep")
  130. updating, err = t.noneToRep(obj, &defRep, newRepNodes)
  131. }
  132. case *cdssdk.RepRedundancy:
  133. if shouldUseEC {
  134. log.WithField("ObjectID", obj.Object.ObjectID).Debugf("redundancy: rep -> ec")
  135. updating, err = t.repToEC(obj, &defEC, newECNodes)
  136. } else {
  137. updating, err = t.repToRep(obj, &defRep, rechoosedRepNodes)
  138. }
  139. case *cdssdk.ECRedundancy:
  140. if shouldUseEC {
  141. uploadNodes := t.rechooseNodesForEC(obj, red, allNodes)
  142. updating, err = t.ecToEC(obj, red, &defEC, uploadNodes)
  143. } else {
  144. log.WithField("ObjectID", obj.Object.ObjectID).Debugf("redundancy: ec -> rep")
  145. updating, err = t.ecToRep(obj, red, &defRep, newRepNodes)
  146. }
  147. }
  148. if updating != nil {
  149. changedObjects = append(changedObjects, *updating)
  150. }
  151. if err != nil {
  152. log.WithField("ObjectID", obj.Object.ObjectID).Warnf("%s, its redundancy wont be changed", err.Error())
  153. }
  154. }
  155. if len(changedObjects) == 0 {
  156. return
  157. }
  158. _, err = coorCli.UpdateObjectRedundancy(coormq.ReqUpdateObjectRedundancy(changedObjects))
  159. if err != nil {
  160. log.Warnf("requesting to change object redundancy: %s", err.Error())
  161. return
  162. }
  163. }
  164. // 统计每个对象块所在的节点,选出块最多的不超过nodeCnt个节点
  165. func (t *CheckPackageRedundancy) summaryRepObjectBlockNodes(objs []stgmod.ObjectDetail, nodeCnt int) []cdssdk.NodeID {
  166. type nodeBlocks struct {
  167. NodeID cdssdk.NodeID
  168. Count int
  169. }
  170. nodeBlocksMap := make(map[cdssdk.NodeID]*nodeBlocks)
  171. for _, obj := range objs {
  172. shouldUseEC := obj.Object.Size > config.Cfg().ECFileSizeThreshold
  173. if _, ok := obj.Object.Redundancy.(*cdssdk.RepRedundancy); ok && !shouldUseEC {
  174. for _, block := range obj.Blocks {
  175. if _, ok := nodeBlocksMap[block.NodeID]; !ok {
  176. nodeBlocksMap[block.NodeID] = &nodeBlocks{
  177. NodeID: block.NodeID,
  178. Count: 0,
  179. }
  180. }
  181. nodeBlocksMap[block.NodeID].Count++
  182. }
  183. }
  184. }
  185. nodes := lo.Values(nodeBlocksMap)
  186. sort2.Sort(nodes, func(left *nodeBlocks, right *nodeBlocks) int {
  187. return right.Count - left.Count
  188. })
  189. ids := lo.Map(nodes, func(item *nodeBlocks, idx int) cdssdk.NodeID { return item.NodeID })
  190. if len(ids) > nodeCnt {
  191. ids = ids[:nodeCnt]
  192. }
  193. return ids
  194. }
  195. func (t *CheckPackageRedundancy) chooseNewNodesForRep(red *cdssdk.RepRedundancy, allNodes map[cdssdk.NodeID]*NodeLoadInfo) []*NodeLoadInfo {
  196. sortedNodes := sort2.Sort(lo.Values(allNodes), func(left *NodeLoadInfo, right *NodeLoadInfo) int {
  197. dm := right.LoadsRecentMonth - left.LoadsRecentMonth
  198. if dm != 0 {
  199. return dm
  200. }
  201. return right.LoadsRecentYear - left.LoadsRecentYear
  202. })
  203. return t.chooseSoManyNodes(red.RepCount, sortedNodes)
  204. }
  205. func (t *CheckPackageRedundancy) chooseNewNodesForEC(red *cdssdk.ECRedundancy, allNodes map[cdssdk.NodeID]*NodeLoadInfo) []*NodeLoadInfo {
  206. sortedNodes := sort2.Sort(lo.Values(allNodes), func(left *NodeLoadInfo, right *NodeLoadInfo) int {
  207. dm := right.LoadsRecentMonth - left.LoadsRecentMonth
  208. if dm != 0 {
  209. return dm
  210. }
  211. return right.LoadsRecentYear - left.LoadsRecentYear
  212. })
  213. return t.chooseSoManyNodes(red.N, sortedNodes)
  214. }
  215. func (t *CheckPackageRedundancy) rechooseNodesForRep(mostBlockNodeIDs []cdssdk.NodeID, red *cdssdk.RepRedundancy, allNodes map[cdssdk.NodeID]*NodeLoadInfo) []*NodeLoadInfo {
  216. type rechooseNode struct {
  217. *NodeLoadInfo
  218. HasBlock bool
  219. }
  220. var rechooseNodes []*rechooseNode
  221. for _, node := range allNodes {
  222. hasBlock := false
  223. for _, id := range mostBlockNodeIDs {
  224. if id == node.Node.NodeID {
  225. hasBlock = true
  226. break
  227. }
  228. }
  229. rechooseNodes = append(rechooseNodes, &rechooseNode{
  230. NodeLoadInfo: node,
  231. HasBlock: hasBlock,
  232. })
  233. }
  234. sortedNodes := sort2.Sort(rechooseNodes, func(left *rechooseNode, right *rechooseNode) int {
  235. dm := right.LoadsRecentMonth - left.LoadsRecentMonth
  236. if dm != 0 {
  237. return dm
  238. }
  239. // 已经缓存了文件块的节点优先选择
  240. v := sort2.CmpBool(right.HasBlock, left.HasBlock)
  241. if v != 0 {
  242. return v
  243. }
  244. return right.LoadsRecentYear - left.LoadsRecentYear
  245. })
  246. return t.chooseSoManyNodes(red.RepCount, lo.Map(sortedNodes, func(node *rechooseNode, idx int) *NodeLoadInfo { return node.NodeLoadInfo }))
  247. }
  248. func (t *CheckPackageRedundancy) rechooseNodesForEC(obj stgmod.ObjectDetail, red *cdssdk.ECRedundancy, allNodes map[cdssdk.NodeID]*NodeLoadInfo) []*NodeLoadInfo {
  249. type rechooseNode struct {
  250. *NodeLoadInfo
  251. CachedBlockIndex int
  252. }
  253. var rechooseNodes []*rechooseNode
  254. for _, node := range allNodes {
  255. cachedBlockIndex := -1
  256. for _, block := range obj.Blocks {
  257. if block.NodeID == node.Node.NodeID {
  258. cachedBlockIndex = block.Index
  259. break
  260. }
  261. }
  262. rechooseNodes = append(rechooseNodes, &rechooseNode{
  263. NodeLoadInfo: node,
  264. CachedBlockIndex: cachedBlockIndex,
  265. })
  266. }
  267. sortedNodes := sort2.Sort(rechooseNodes, func(left *rechooseNode, right *rechooseNode) int {
  268. dm := right.LoadsRecentMonth - left.LoadsRecentMonth
  269. if dm != 0 {
  270. return dm
  271. }
  272. // 已经缓存了文件块的节点优先选择
  273. v := sort2.CmpBool(right.CachedBlockIndex > -1, left.CachedBlockIndex > -1)
  274. if v != 0 {
  275. return v
  276. }
  277. return right.LoadsRecentYear - left.LoadsRecentYear
  278. })
  279. // TODO 可以考虑选择已有块的节点时,能依然按照Index顺序选择
  280. return t.chooseSoManyNodes(red.N, lo.Map(sortedNodes, func(node *rechooseNode, idx int) *NodeLoadInfo { return node.NodeLoadInfo }))
  281. }
  282. func (t *CheckPackageRedundancy) chooseSoManyNodes(count int, nodes []*NodeLoadInfo) []*NodeLoadInfo {
  283. repeateCount := (count + len(nodes) - 1) / len(nodes)
  284. extedNodes := make([]*NodeLoadInfo, repeateCount*len(nodes))
  285. // 使用复制的方式将节点数扩充到要求的数量
  286. // 复制之后的结构:ABCD -> AAABBBCCCDDD
  287. for p := 0; p < repeateCount; p++ {
  288. for i, node := range nodes {
  289. putIdx := i*repeateCount + p
  290. extedNodes[putIdx] = node
  291. }
  292. }
  293. extedNodes = extedNodes[:count]
  294. var chosen []*NodeLoadInfo
  295. for len(chosen) < count {
  296. // 在每一轮内都选不同地区的节点,如果节点数不够,那么就再来一轮
  297. chosenLocations := make(map[cdssdk.LocationID]bool)
  298. for i, node := range extedNodes {
  299. if node == nil {
  300. continue
  301. }
  302. if chosenLocations[node.Node.LocationID] {
  303. continue
  304. }
  305. chosen = append(chosen, node)
  306. chosenLocations[node.Node.LocationID] = true
  307. extedNodes[i] = nil
  308. }
  309. }
  310. return chosen
  311. }
  312. func (t *CheckPackageRedundancy) noneToRep(obj stgmod.ObjectDetail, red *cdssdk.RepRedundancy, uploadNodes []*NodeLoadInfo) (*coormq.UpdatingObjectRedundancy, error) {
  313. if len(obj.Blocks) == 0 {
  314. return nil, fmt.Errorf("object is not cached on any nodes, cannot change its redundancy to rep")
  315. }
  316. // 如果选择的备份节点都是同一个,那么就只要上传一次
  317. uploadNodes = lo.UniqBy(uploadNodes, func(item *NodeLoadInfo) cdssdk.NodeID { return item.Node.NodeID })
  318. var blocks []stgmod.ObjectBlock
  319. for _, node := range uploadNodes {
  320. err := t.pinObject(node.Node.NodeID, obj.Object.FileHash)
  321. if err != nil {
  322. return nil, err
  323. }
  324. blocks = append(blocks, stgmod.ObjectBlock{
  325. ObjectID: obj.Object.ObjectID,
  326. Index: 0,
  327. NodeID: node.Node.NodeID,
  328. FileHash: obj.Object.FileHash,
  329. })
  330. }
  331. return &coormq.UpdatingObjectRedundancy{
  332. ObjectID: obj.Object.ObjectID,
  333. Redundancy: red,
  334. Blocks: blocks,
  335. }, nil
  336. }
  337. func (t *CheckPackageRedundancy) noneToEC(obj stgmod.ObjectDetail, red *cdssdk.ECRedundancy, uploadNodes []*NodeLoadInfo) (*coormq.UpdatingObjectRedundancy, error) {
  338. coorCli, err := stgglb.CoordinatorMQPool.Acquire()
  339. if err != nil {
  340. return nil, fmt.Errorf("new coordinator client: %w", err)
  341. }
  342. defer stgglb.CoordinatorMQPool.Release(coorCli)
  343. if len(obj.Blocks) == 0 {
  344. return nil, fmt.Errorf("object is not cached on any nodes, cannot change its redundancy to ec")
  345. }
  346. getNodes, err := coorCli.GetNodes(coormq.NewGetNodes([]cdssdk.NodeID{obj.Blocks[0].NodeID}))
  347. if err != nil {
  348. return nil, fmt.Errorf("requesting to get nodes: %w", err)
  349. }
  350. ft := ioswitch2.NewFromTo()
  351. ft.AddFrom(ioswitch2.NewFromNode(obj.Object.FileHash, &getNodes.Nodes[0], -1))
  352. for i := 0; i < red.N; i++ {
  353. ft.AddTo(ioswitch2.NewToNode(uploadNodes[i].Node, i, fmt.Sprintf("%d", i)))
  354. }
  355. parser := parser.NewParser(*red)
  356. plans := exec.NewPlanBuilder()
  357. err = parser.Parse(ft, plans)
  358. if err != nil {
  359. return nil, fmt.Errorf("parsing plan: %w", err)
  360. }
  361. ioRet, err := plans.Execute().Wait(context.TODO())
  362. if err != nil {
  363. return nil, fmt.Errorf("executing io plan: %w", err)
  364. }
  365. var blocks []stgmod.ObjectBlock
  366. for i := 0; i < red.N; i++ {
  367. blocks = append(blocks, stgmod.ObjectBlock{
  368. ObjectID: obj.Object.ObjectID,
  369. Index: i,
  370. NodeID: uploadNodes[i].Node.NodeID,
  371. FileHash: ioRet[fmt.Sprintf("%d", i)].(string),
  372. })
  373. }
  374. return &coormq.UpdatingObjectRedundancy{
  375. ObjectID: obj.Object.ObjectID,
  376. Redundancy: red,
  377. Blocks: blocks,
  378. }, nil
  379. }
  380. func (t *CheckPackageRedundancy) repToRep(obj stgmod.ObjectDetail, red *cdssdk.RepRedundancy, uploadNodes []*NodeLoadInfo) (*coormq.UpdatingObjectRedundancy, error) {
  381. if len(obj.Blocks) == 0 {
  382. return nil, fmt.Errorf("object is not cached on any nodes, cannot change its redundancy to rep")
  383. }
  384. // 如果选择的备份节点都是同一个,那么就只要上传一次
  385. uploadNodes = lo.UniqBy(uploadNodes, func(item *NodeLoadInfo) cdssdk.NodeID { return item.Node.NodeID })
  386. for _, node := range uploadNodes {
  387. err := t.pinObject(node.Node.NodeID, obj.Object.FileHash)
  388. if err != nil {
  389. logger.WithField("ObjectID", obj.Object.ObjectID).
  390. Warn(err.Error())
  391. return nil, err
  392. }
  393. }
  394. var blocks []stgmod.ObjectBlock
  395. for _, node := range uploadNodes {
  396. // 由于更新冗余方式会删除所有Block记录然后重新填充,
  397. // 所以即使是节点跳过了上传,也需要为它添加一条Block记录
  398. blocks = append(blocks, stgmod.ObjectBlock{
  399. ObjectID: obj.Object.ObjectID,
  400. Index: 0,
  401. NodeID: node.Node.NodeID,
  402. FileHash: obj.Object.FileHash,
  403. })
  404. }
  405. return &coormq.UpdatingObjectRedundancy{
  406. ObjectID: obj.Object.ObjectID,
  407. Redundancy: red,
  408. Blocks: blocks,
  409. }, nil
  410. }
  411. func (t *CheckPackageRedundancy) repToEC(obj stgmod.ObjectDetail, red *cdssdk.ECRedundancy, uploadNodes []*NodeLoadInfo) (*coormq.UpdatingObjectRedundancy, error) {
  412. return t.noneToEC(obj, red, uploadNodes)
  413. }
  414. func (t *CheckPackageRedundancy) ecToRep(obj stgmod.ObjectDetail, srcRed *cdssdk.ECRedundancy, tarRed *cdssdk.RepRedundancy, uploadNodes []*NodeLoadInfo) (*coormq.UpdatingObjectRedundancy, error) {
  415. coorCli, err := stgglb.CoordinatorMQPool.Acquire()
  416. if err != nil {
  417. return nil, fmt.Errorf("new coordinator client: %w", err)
  418. }
  419. defer stgglb.CoordinatorMQPool.Release(coorCli)
  420. var chosenBlocks []stgmod.GrouppedObjectBlock
  421. var chosenBlockIndexes []int
  422. for _, block := range obj.GroupBlocks() {
  423. if len(block.NodeIDs) > 0 {
  424. chosenBlocks = append(chosenBlocks, block)
  425. chosenBlockIndexes = append(chosenBlockIndexes, block.Index)
  426. }
  427. if len(chosenBlocks) == srcRed.K {
  428. break
  429. }
  430. }
  431. if len(chosenBlocks) < srcRed.K {
  432. return nil, fmt.Errorf("no enough blocks to reconstruct the original file data")
  433. }
  434. // 如果选择的备份节点都是同一个,那么就只要上传一次
  435. uploadNodes = lo.UniqBy(uploadNodes, func(item *NodeLoadInfo) cdssdk.NodeID { return item.Node.NodeID })
  436. // 每个被选节点都在自己节点上重建原始数据
  437. parser := parser.NewParser(*srcRed)
  438. planBlder := exec.NewPlanBuilder()
  439. for i := range uploadNodes {
  440. ft := ioswitch2.NewFromTo()
  441. for _, block := range chosenBlocks {
  442. ft.AddFrom(ioswitch2.NewFromNode(block.FileHash, &uploadNodes[i].Node, block.Index))
  443. }
  444. len := obj.Object.Size
  445. ft.AddTo(ioswitch2.NewToNodeWithRange(uploadNodes[i].Node, -1, fmt.Sprintf("%d", i), exec.Range{
  446. Offset: 0,
  447. Length: &len,
  448. }))
  449. err := parser.Parse(ft, planBlder)
  450. if err != nil {
  451. return nil, fmt.Errorf("parsing plan: %w", err)
  452. }
  453. }
  454. ioRet, err := planBlder.Execute().Wait(context.TODO())
  455. if err != nil {
  456. return nil, fmt.Errorf("executing io plan: %w", err)
  457. }
  458. var blocks []stgmod.ObjectBlock
  459. for i := range uploadNodes {
  460. blocks = append(blocks, stgmod.ObjectBlock{
  461. ObjectID: obj.Object.ObjectID,
  462. Index: 0,
  463. NodeID: uploadNodes[i].Node.NodeID,
  464. FileHash: ioRet[fmt.Sprintf("%d", i)].(string),
  465. })
  466. }
  467. return &coormq.UpdatingObjectRedundancy{
  468. ObjectID: obj.Object.ObjectID,
  469. Redundancy: tarRed,
  470. Blocks: blocks,
  471. }, nil
  472. }
  473. func (t *CheckPackageRedundancy) ecToEC(obj stgmod.ObjectDetail, srcRed *cdssdk.ECRedundancy, tarRed *cdssdk.ECRedundancy, uploadNodes []*NodeLoadInfo) (*coormq.UpdatingObjectRedundancy, error) {
  474. coorCli, err := stgglb.CoordinatorMQPool.Acquire()
  475. if err != nil {
  476. return nil, fmt.Errorf("new coordinator client: %w", err)
  477. }
  478. defer stgglb.CoordinatorMQPool.Release(coorCli)
  479. grpBlocks := obj.GroupBlocks()
  480. var chosenBlocks []stgmod.GrouppedObjectBlock
  481. for _, block := range grpBlocks {
  482. if len(block.NodeIDs) > 0 {
  483. chosenBlocks = append(chosenBlocks, block)
  484. }
  485. if len(chosenBlocks) == srcRed.K {
  486. break
  487. }
  488. }
  489. if len(chosenBlocks) < srcRed.K {
  490. return nil, fmt.Errorf("no enough blocks to reconstruct the original file data")
  491. }
  492. // 目前EC的参数都相同,所以可以不用重建出完整数据然后再分块,可以直接构建出目的节点需要的块
  493. parser := parser.NewParser(*srcRed)
  494. planBlder := exec.NewPlanBuilder()
  495. var newBlocks []stgmod.ObjectBlock
  496. shouldUpdateBlocks := false
  497. for i, node := range uploadNodes {
  498. newBlock := stgmod.ObjectBlock{
  499. ObjectID: obj.Object.ObjectID,
  500. Index: i,
  501. NodeID: node.Node.NodeID,
  502. }
  503. grp, ok := lo.Find(grpBlocks, func(grp stgmod.GrouppedObjectBlock) bool { return grp.Index == i })
  504. // 如果新选中的节点已经记录在Block表中,那么就不需要任何变更
  505. if ok && lo.Contains(grp.NodeIDs, node.Node.NodeID) {
  506. newBlock.FileHash = grp.FileHash
  507. newBlocks = append(newBlocks, newBlock)
  508. continue
  509. }
  510. shouldUpdateBlocks = true
  511. // 否则就要重建出这个节点需要的块
  512. ft := ioswitch2.NewFromTo()
  513. for _, block := range chosenBlocks {
  514. ft.AddFrom(ioswitch2.NewFromNode(block.FileHash, &node.Node, block.Index))
  515. }
  516. // 输出只需要自己要保存的那一块
  517. ft.AddTo(ioswitch2.NewToNode(node.Node, i, fmt.Sprintf("%d", i)))
  518. err := parser.Parse(ft, planBlder)
  519. if err != nil {
  520. return nil, fmt.Errorf("parsing plan: %w", err)
  521. }
  522. newBlocks = append(newBlocks, newBlock)
  523. }
  524. // 如果没有任何Plan,Wait会直接返回成功
  525. ret, err := planBlder.Execute().Wait(context.TODO())
  526. if err != nil {
  527. return nil, fmt.Errorf("executing io plan: %w", err)
  528. }
  529. if !shouldUpdateBlocks {
  530. return nil, nil
  531. }
  532. for k, v := range ret {
  533. idx, err := strconv.ParseInt(k, 10, 64)
  534. if err != nil {
  535. return nil, fmt.Errorf("parsing result key %s as index: %w", k, err)
  536. }
  537. newBlocks[idx].FileHash = v.(string)
  538. }
  539. return &coormq.UpdatingObjectRedundancy{
  540. ObjectID: obj.Object.ObjectID,
  541. Redundancy: tarRed,
  542. Blocks: newBlocks,
  543. }, nil
  544. }
  545. func (t *CheckPackageRedundancy) pinObject(nodeID cdssdk.NodeID, fileHash string) error {
  546. agtCli, err := stgglb.AgentMQPool.Acquire(nodeID)
  547. if err != nil {
  548. return fmt.Errorf("new agent client: %w", err)
  549. }
  550. defer stgglb.AgentMQPool.Release(agtCli)
  551. _, err = agtCli.PinObject(agtmq.ReqPinObject([]string{fileHash}, false))
  552. if err != nil {
  553. return fmt.Errorf("start pinning object: %w", err)
  554. }
  555. return nil
  556. }
  557. func init() {
  558. RegisterMessageConvertor(NewCheckPackageRedundancy)
  559. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。