From fd6b8c3c1fdcaa724f20e10e4df20f11fce01955 Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Tue, 8 Apr 2025 11:08:53 +0800 Subject: [PATCH] =?UTF-8?q?=E8=B0=83=E6=95=B4=E4=BB=A3=E7=A0=81=E7=BB=93?= =?UTF-8?q?=E6=9E=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- client/internal/accessstat/access_stat.go | 76 +++++++++++++++++++++++ client/internal/accessstat/config.go | 7 +++ 2 files changed, 83 insertions(+) create mode 100644 client/internal/accessstat/access_stat.go create mode 100644 client/internal/accessstat/config.go diff --git a/client/internal/accessstat/access_stat.go b/client/internal/accessstat/access_stat.go new file mode 100644 index 0000000..b7d1982 --- /dev/null +++ b/client/internal/accessstat/access_stat.go @@ -0,0 +1,76 @@ +package accessstat + +import ( + "fmt" + "sync" + "time" + + "gitlink.org.cn/cloudream/common/pkgs/logger" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + "gitlink.org.cn/cloudream/common/utils/sync2" + stgglb "gitlink.org.cn/cloudream/storage2/common/globals" + coormq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/coordinator" +) + +type AccessStatEvent interface{} + +type AccessStat struct { + cfg Config + stats []coormq.AddAccessStatEntry + lock sync.Mutex +} + +func NewAccessStat(cfg Config) *AccessStat { + return &AccessStat{ + cfg: cfg, + } +} + +func (p *AccessStat) AddAccessCounter(objID cdssdk.ObjectID, pkgID cdssdk.PackageID, stgID cdssdk.StorageID, value float64) { + p.lock.Lock() + defer p.lock.Unlock() + + p.stats = append(p.stats, coormq.AddAccessStatEntry{ + ObjectID: objID, + PackageID: pkgID, + StorageID: stgID, + Counter: value, + }) +} + +func (p *AccessStat) Start() *sync2.UnboundChannel[AccessStatEvent] { + ch := sync2.NewUnboundChannel[AccessStatEvent]() + + go func() { + coorCli, err := stgglb.CoordinatorMQPool.Acquire() + if err != nil { + ch.Send(fmt.Errorf("new coordinator client: %w", err)) + } + defer stgglb.CoordinatorMQPool.Release(coorCli) + + ticker := time.NewTicker(p.cfg.ReportInterval) + for { + <-ticker.C + + p.lock.Lock() + st := p.stats + p.stats = nil + p.lock.Unlock() + + if len(st) == 0 { + continue + } + + err := coorCli.AddAccessStat(coormq.ReqAddAccessStat(st)) + if err != nil { + logger.Errorf("add all package access stat counter: %v", err) + + p.lock.Lock() + p.stats = append(p.stats, st...) + p.lock.Unlock() + continue + } + } + }() + return ch +} diff --git a/client/internal/accessstat/config.go b/client/internal/accessstat/config.go new file mode 100644 index 0000000..b67e8e6 --- /dev/null +++ b/client/internal/accessstat/config.go @@ -0,0 +1,7 @@ +package accessstat + +import "time" + +type Config struct { + ReportInterval time.Duration +}