Browse Source

修复调试问题

gitlink
Sydonian 2 years ago
parent
commit
9a0341a31f
3 changed files with 113 additions and 72 deletions
  1. +1
    -1
      agent/internal/task/ipfs_pin.go
  2. +2
    -1
      scanner/internal/event/check_package_redundancy.go
  3. +110
    -70
      scanner/internal/event/clean_pinned.go

+ 1
- 1
agent/internal/task/ipfs_pin.go View File

@@ -40,7 +40,7 @@ func (t *IPFSPin) Execute(task *task.Task[TaskContext], ctx TaskContext, complet
err = ipfsCli.Pin(fileHash)
if err != nil {
err := fmt.Errorf("pin file failed, err: %w", err)
log.WithField("FileHash", t.FileHashes).Warn(err.Error())
log.WithField("FileHash", fileHash).Warn(err.Error())

complete(err, CompleteOption{
RemovingDelay: time.Minute,


+ 2
- 1
scanner/internal/event/check_package_redundancy.go View File

@@ -73,7 +73,8 @@ func (t *CheckPackageRedundancy) Execute(execCtx ExecuteContext) {
return
}

getNodes, err := coorCli.GetNodes(coormq.NewGetNodes(nil))
// TODO UserID
getNodes, err := coorCli.GetUserNodes(coormq.NewGetUserNodes(0))
if err != nil {
log.Warnf("getting all nodes: %s", err.Error())
return


+ 110
- 70
scanner/internal/event/clean_pinned.go View File

@@ -16,11 +16,11 @@ import (
"gitlink.org.cn/cloudream/storage/common/consts"
stgglb "gitlink.org.cn/cloudream/storage/common/globals"
stgmod "gitlink.org.cn/cloudream/storage/common/models"
"gitlink.org.cn/cloudream/storage/common/pkgs/distlock/reqbuilder"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch/plans"
agtmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/agent"
coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator"
scevt "gitlink.org.cn/cloudream/storage/common/pkgs/mq/scanner/event"
"gitlink.org.cn/cloudream/storage/scanner/internal/config"
)

type CleanPinned struct {
@@ -67,6 +67,8 @@ func (t *CleanPinned) Execute(execCtx ExecuteContext) {
}
readerNodeIDs := lo.Map(getLoadLog.Logs, func(item coormq.PackageLoadLogDetail, idx int) cdssdk.NodeID { return item.Storage.NodeID })

// 注意!需要保证allNodeID包含所有之后可能用到的节点ID
// TOOD 可以考虑设计Cache机制
var allNodeID []cdssdk.NodeID
for _, obj := range getObjs.Objects {
for _, block := range obj.Blocks {
@@ -74,6 +76,7 @@ func (t *CleanPinned) Execute(execCtx ExecuteContext) {
}
allNodeID = append(allNodeID, obj.PinnedAt...)
}
allNodeID = append(allNodeID, readerNodeIDs...)

getNodeResp, err := coorCli.GetNodes(coormq.NewGetNodes(lo.Union(allNodeID)))
if err != nil {
@@ -127,63 +130,9 @@ func (t *CleanPinned) Execute(execCtx ExecuteContext) {
ecObjectsUpdateEntries = append(ecObjectsUpdateEntries, t.makePlansForECObject(allNodeInfos, solu, obj, &planBld))
}

wg := sync.WaitGroup{}

// 执行pin操作
var anyPinErr error
for nodeID, pin := range pinPlans {
wg.Add(1)
go func(nodeID cdssdk.NodeID, pin *[]string) {
defer wg.Done()

agtCli, err := stgglb.AgentMQPool.Acquire(nodeID)
if err != nil {
log.Warnf("new agent client: %s", err.Error())
return
}
defer stgglb.AgentMQPool.Release(agtCli)

_, err = agtCli.PinObject(agtmq.ReqPinObject(*pin, false))
if err != nil {
log.Warnf("pinning object: %s", err.Error())
anyPinErr = err
}
}(nodeID, pin)
}

// 执行IO计划
var ioSwRets map[string]any
var ioSwErr error
wg.Add(1)
go func() {
defer wg.Done()
plan, err := planBld.Build()
if err != nil {
ioSwErr = fmt.Errorf("building io switch plan: %w", err)
return
}

exec, err := plans.Execute(*plan)
if err != nil {
ioSwErr = fmt.Errorf("executing io switch plan: %w", err)
return
}
ret, err := exec.Wait()
if err != nil {
ioSwErr = fmt.Errorf("waiting io switch plan: %w", err)
return
}
ioSwRets = ret.ResultValues
}()

wg.Wait()

if anyPinErr != nil {
log.Warn(anyPinErr.Error())
return
}
if ioSwErr != nil {
log.Warn(ioSwErr.Error())
ioSwRets, err := t.executePlans(execCtx, pinPlans, &planBld)
if err != nil {
log.Warn(err.Error())
return
}

@@ -202,7 +151,6 @@ func (t *CleanPinned) Execute(execCtx ExecuteContext) {
}
}

// 统计每个对象块所在的节点,选出块最多的不超过nodeCnt个节点
func (t *CleanPinned) summaryRepObjectBlockNodes(objs []stgmod.ObjectDetail) []cdssdk.NodeID {
type nodeBlocks struct {
NodeID cdssdk.NodeID
@@ -211,17 +159,30 @@ func (t *CleanPinned) summaryRepObjectBlockNodes(objs []stgmod.ObjectDetail) []c

nodeBlocksMap := make(map[cdssdk.NodeID]*nodeBlocks)
for _, obj := range objs {
shouldUseEC := obj.Object.Size > config.Cfg().ECFileSizeThreshold
if _, ok := obj.Object.Redundancy.(*cdssdk.RepRedundancy); ok && !shouldUseEC {
for _, block := range obj.Blocks {
if _, ok := nodeBlocksMap[block.NodeID]; !ok {
nodeBlocksMap[block.NodeID] = &nodeBlocks{
NodeID: block.NodeID,
Count: 0,
}
cacheBlockNodes := make(map[cdssdk.NodeID]bool)
for _, block := range obj.Blocks {
if _, ok := nodeBlocksMap[block.NodeID]; !ok {
nodeBlocksMap[block.NodeID] = &nodeBlocks{
NodeID: block.NodeID,
Count: 0,
}
nodeBlocksMap[block.NodeID].Count++
}
nodeBlocksMap[block.NodeID].Count++
cacheBlockNodes[block.NodeID] = true
}

for _, nodeID := range obj.PinnedAt {
if cacheBlockNodes[nodeID] {
continue
}

if _, ok := nodeBlocksMap[nodeID]; !ok {
nodeBlocksMap[nodeID] = &nodeBlocks{
NodeID: nodeID,
Count: 0,
}
}
nodeBlocksMap[nodeID].Count++
}
}

@@ -553,7 +514,7 @@ func (t *CleanPinned) startAnnealing(allNodeInfos map[cdssdk.NodeID]*cdssdk.Node
}
curTemp *= coolingRate
}
// fmt.Printf("final: %v\n", state.maxScoreRmBlocks)
return annealingSolution{
blockList: state.blockList,
rmBlocks: state.maxScoreRmBlocks,
@@ -680,7 +641,7 @@ func (t *CleanPinned) calcScore(state *annealingState) float64 {
newSc = dtSc / (sc * ac)
}

// fmt.Printf("solu: %v, cur: %v, dt: %v, ac: %v, sc: %v ", ctx.rmBlocks, newSc, dt, ac, sc)
// fmt.Printf("solu: %v, cur: %v, dt: %v, ac: %v, sc: %v \n", state.rmBlocks, newSc, dt, ac, sc)
return newSc
}

@@ -827,6 +788,85 @@ func (t *CleanPinned) makePlansForECObject(allNodeInfos map[cdssdk.NodeID]*cdssd
return entry
}

func (t *CleanPinned) executePlans(execCtx ExecuteContext, pinPlans map[cdssdk.NodeID]*[]string, planBld *plans.PlanBuilder) (map[string]any, error) {
log := logger.WithType[CleanPinned]("Event")

ioPlan, err := planBld.Build()
if err != nil {
return nil, fmt.Errorf("building io switch plan: %w", err)
}

// 统一加锁,有重复也没关系
lockBld := reqbuilder.NewBuilder()
for nodeID := range pinPlans {
lockBld.IPFS().Buzy(nodeID)
}
for _, plan := range ioPlan.AgentPlans {
lockBld.IPFS().Buzy(plan.Node.NodeID)
}
lock, err := lockBld.MutexLock(execCtx.Args.DistLock)
if err != nil {
return nil, fmt.Errorf("acquiring distlock: %w", err)
}
defer lock.Unlock()

wg := sync.WaitGroup{}

// 执行pin操作
var anyPinErr error
for nodeID, pin := range pinPlans {
wg.Add(1)
go func(nodeID cdssdk.NodeID, pin *[]string) {
defer wg.Done()

agtCli, err := stgglb.AgentMQPool.Acquire(nodeID)
if err != nil {
log.Warnf("new agent client: %s", err.Error())
return
}
defer stgglb.AgentMQPool.Release(agtCli)

_, err = agtCli.PinObject(agtmq.ReqPinObject(*pin, false))
if err != nil {
log.Warnf("pinning object: %s", err.Error())
anyPinErr = err
}
}(nodeID, pin)
}

// 执行IO计划
var ioSwRets map[string]any
var ioSwErr error
wg.Add(1)
go func() {
defer wg.Done()

exec, err := plans.Execute(*ioPlan)
if err != nil {
ioSwErr = fmt.Errorf("executing io switch plan: %w", err)
return
}
ret, err := exec.Wait()
if err != nil {
ioSwErr = fmt.Errorf("waiting io switch plan: %w", err)
return
}
ioSwRets = ret.ResultValues
}()

wg.Wait()

if anyPinErr != nil {
return nil, anyPinErr
}

if ioSwErr != nil {
return nil, ioSwErr
}

return ioSwRets, nil
}

func (t *CleanPinned) populateECObjectEntry(entry *coormq.ChangeObjectRedundancyEntry, obj stgmod.ObjectDetail, ioRets map[string]any) {
for i := range entry.Blocks {
if entry.Blocks[i].FileHash != "" {


Loading…
Cancel
Save