From a1a94dde1577f36522b104ba7c8b9d224946bfc6 Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Thu, 3 Apr 2025 15:21:13 +0800 Subject: [PATCH] =?UTF-8?q?=E8=BF=81=E7=A7=BB=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- client/internal/metacache/connectivity.go | 96 +++++++++++++++++ client/internal/metacache/host.go | 27 +++++ client/internal/metacache/hubmeta.go | 75 ++++++++++++++ client/internal/metacache/simple.go | 121 ++++++++++++++++++++++ client/internal/metacache/storagemeta.go | 73 +++++++++++++ 5 files changed, 392 insertions(+) create mode 100644 client/internal/metacache/connectivity.go create mode 100644 client/internal/metacache/host.go create mode 100644 client/internal/metacache/hubmeta.go create mode 100644 client/internal/metacache/simple.go create mode 100644 client/internal/metacache/storagemeta.go diff --git a/client/internal/metacache/connectivity.go b/client/internal/metacache/connectivity.go new file mode 100644 index 0000000..ffdcb26 --- /dev/null +++ b/client/internal/metacache/connectivity.go @@ -0,0 +1,96 @@ +package metacache + +import ( + "sync" + "time" + + "gitlink.org.cn/cloudream/common/pkgs/logger" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + stgglb "gitlink.org.cn/cloudream/storage2/common/globals" + coormq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/coordinator" +) + +func (m *MetaCacheHost) AddConnectivity() *Connectivity { + cache := &Connectivity{ + entries: make(map[cdssdk.HubID]*ConnectivityEntry), + } + + m.caches = append(m.caches, cache) + return cache +} + +type Connectivity struct { + lock sync.RWMutex + entries map[cdssdk.HubID]*ConnectivityEntry +} + +func (c *Connectivity) Get(from cdssdk.HubID, to cdssdk.HubID) *time.Duration { + for i := 0; i < 2; i++ { + c.lock.RLock() + entry, ok := c.entries[from] + if ok { + con, ok := entry.To[to] + if ok { + c.lock.RUnlock() + + if con.Latency == nil { + return nil + } + l := time.Millisecond * time.Duration(*con.Latency) + return &l + } + } + c.lock.RUnlock() + + c.load(from) + } + + return nil +} + +func (c *Connectivity) ClearOutdated() { + c.lock.Lock() + defer c.lock.Unlock() + + for hubID, entry := range c.entries { + if time.Since(entry.UpdateTime) > time.Minute*5 { + delete(c.entries, hubID) + } + } +} + +func (c *Connectivity) load(hubID cdssdk.HubID) { + coorCli, err := stgglb.CoordinatorMQPool.Acquire() + if err != nil { + logger.Warnf("new coordinator client: %v", err) + return + } + defer stgglb.CoordinatorMQPool.Release(coorCli) + + get, err := coorCli.GetHubConnectivities(coormq.ReqGetHubConnectivities([]cdssdk.HubID{hubID})) + if err != nil { + logger.Warnf("get hub connectivities: %v", err) + return + } + + c.lock.Lock() + defer c.lock.Unlock() + + ce := &ConnectivityEntry{ + From: hubID, + To: make(map[cdssdk.HubID]cdssdk.HubConnectivity), + UpdateTime: time.Now(), + } + + for _, conn := range get.Connectivities { + ce.To[conn.ToHubID] = conn + } + + c.entries[hubID] = ce +} + +type ConnectivityEntry struct { + From cdssdk.HubID + To map[cdssdk.HubID]cdssdk.HubConnectivity + UpdateTime time.Time +} diff --git a/client/internal/metacache/host.go b/client/internal/metacache/host.go new file mode 100644 index 0000000..542a81a --- /dev/null +++ b/client/internal/metacache/host.go @@ -0,0 +1,27 @@ +package metacache + +import "time" + +type MetaCache interface { + ClearOutdated() +} + +type MetaCacheHost struct { + caches []MetaCache +} + +func NewHost() *MetaCacheHost { + return &MetaCacheHost{} +} + +func (m *MetaCacheHost) Serve() { + ticker := time.NewTicker(time.Minute) + for { + select { + case <-ticker.C: + for _, cache := range m.caches { + cache.ClearOutdated() + } + } + } +} diff --git a/client/internal/metacache/hubmeta.go b/client/internal/metacache/hubmeta.go new file mode 100644 index 0000000..b9f94b4 --- /dev/null +++ b/client/internal/metacache/hubmeta.go @@ -0,0 +1,75 @@ +package metacache + +import ( + "time" + + "gitlink.org.cn/cloudream/common/pkgs/logger" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + stgglb "gitlink.org.cn/cloudream/storage2/common/globals" + coormq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/coordinator" +) + +func (m *MetaCacheHost) AddHubMeta() *HubMeta { + meta := &HubMeta{} + meta.cache = NewSimpleMetaCache(SimpleMetaCacheConfig[cdssdk.HubID, cdssdk.Hub]{ + Getter: meta.load, + Expire: time.Minute * 5, + }) + + m.caches = append(m.caches, meta) + return meta +} + +type HubMeta struct { + cache *SimpleMetaCache[cdssdk.HubID, cdssdk.Hub] +} + +func (h *HubMeta) Get(hubID cdssdk.HubID) *cdssdk.Hub { + v, ok := h.cache.Get(hubID) + if ok { + return &v + } + return nil +} + +func (h *HubMeta) GetMany(hubIDs []cdssdk.HubID) []*cdssdk.Hub { + vs, oks := h.cache.GetMany(hubIDs) + ret := make([]*cdssdk.Hub, len(vs)) + for i := range vs { + if oks[i] { + ret[i] = &vs[i] + } + } + return ret +} + +func (h *HubMeta) ClearOutdated() { + h.cache.ClearOutdated() +} + +func (h *HubMeta) load(keys []cdssdk.HubID) ([]cdssdk.Hub, []bool) { + vs := make([]cdssdk.Hub, len(keys)) + oks := make([]bool, len(keys)) + + coorCli, err := stgglb.CoordinatorMQPool.Acquire() + if err != nil { + logger.Warnf("new coordinator client: %v", err) + return vs, oks + } + defer stgglb.CoordinatorMQPool.Release(coorCli) + + get, err := coorCli.GetHubs(coormq.NewGetHubs(keys)) + if err != nil { + logger.Warnf("get hubs: %v", err) + return vs, oks + } + + for i := range keys { + if get.Hubs[i] != nil { + vs[i] = *get.Hubs[i] + oks[i] = true + } + } + + return vs, oks +} diff --git a/client/internal/metacache/simple.go b/client/internal/metacache/simple.go new file mode 100644 index 0000000..ff2f780 --- /dev/null +++ b/client/internal/metacache/simple.go @@ -0,0 +1,121 @@ +package metacache + +import ( + "sync" + "time" +) + +type SimpleMetaCacheConfig[K comparable, V any] struct { + Getter Getter[K, V] + Expire time.Duration +} + +type Getter[K comparable, V any] func(keys []K) ([]V, []bool) + +type SimpleMetaCache[K comparable, V any] struct { + lock sync.RWMutex + cache map[K]*CacheEntry[K, V] + cfg SimpleMetaCacheConfig[K, V] +} + +func NewSimpleMetaCache[K comparable, V any](cfg SimpleMetaCacheConfig[K, V]) *SimpleMetaCache[K, V] { + return &SimpleMetaCache[K, V]{ + cache: make(map[K]*CacheEntry[K, V]), + cfg: cfg, + } +} + +func (mc *SimpleMetaCache[K, V]) Get(key K) (V, bool) { + var ret V + var ok bool + + for i := 0; i < 2; i++ { + mc.lock.RLock() + entry, o := mc.cache[key] + if o { + ret = entry.Data + ok = true + } + mc.lock.RUnlock() + + if o { + break + } + + mc.load([]K{key}) + } + + return ret, ok +} + +func (mc *SimpleMetaCache[K, V]) GetMany(keys []K) ([]V, []bool) { + result := make([]V, len(keys)) + oks := make([]bool, len(keys)) + + for i := 0; i < 2; i++ { + allGet := true + mc.lock.RLock() + for i, key := range keys { + entry, ok := mc.cache[key] + if ok { + result[i] = entry.Data + oks[i] = true + } else { + allGet = false + } + } + mc.lock.RUnlock() + + if allGet { + break + } + + mc.load(keys) + } + + return result, oks +} + +func (mc *SimpleMetaCache[K, V]) load(keys []K) { + vs, getOks := mc.cfg.Getter(keys) + + mc.lock.Lock() + defer mc.lock.Unlock() + + for i, key := range keys { + if !getOks[i] { + continue + } + + _, ok := mc.cache[key] + // 缓存中已有key则认为缓存中是最新的,不再更新 + if ok { + continue + } + + entry := &CacheEntry[K, V]{ + Key: key, + Data: vs[i], + UpdateTime: time.Now(), + } + mc.cache[key] = entry + } +} + +func (mc *SimpleMetaCache[K, V]) ClearOutdated() { + mc.lock.Lock() + defer mc.lock.Unlock() + + for key, entry := range mc.cache { + dt := time.Since(entry.UpdateTime) + if dt > mc.cfg.Expire || dt < 0 { + delete(mc.cache, key) + } + } +} + +type CacheEntry[K comparable, V any] struct { + Key K + Data V + UpdateTime time.Time +} diff --git a/client/internal/metacache/storagemeta.go b/client/internal/metacache/storagemeta.go new file mode 100644 index 0000000..6dd684c --- /dev/null +++ b/client/internal/metacache/storagemeta.go @@ -0,0 +1,73 @@ +package metacache + +import ( + "time" + + "gitlink.org.cn/cloudream/storage2/client/types" +) + +func (m *MetaCacheHost) AddStorageMeta() *UserSpaceMeta { + meta := &UserSpaceMeta{} + meta.cache = NewSimpleMetaCache(SimpleMetaCacheConfig[types.UserSpaceID, types.UserSpaceDetail]{ + Getter: meta.load, + Expire: time.Minute * 5, + }) + + m.caches = append(m.caches, meta) + return meta +} + +type UserSpaceMeta struct { + cache *SimpleMetaCache[types.UserSpaceID, types.UserSpaceDetail] +} + +func (s *UserSpaceMeta) Get(spaceID types.UserSpaceID) *types.UserSpaceDetail { + v, ok := s.cache.Get(spaceID) + if ok { + return &v + } + return nil +} + +func (s *UserSpaceMeta) GetMany(spaceIDs []types.UserSpaceID) []*types.UserSpaceDetail { + vs, oks := s.cache.GetMany(spaceIDs) + ret := make([]*types.UserSpaceDetail, len(vs)) + for i := range vs { + if oks[i] { + ret[i] = &vs[i] + } + } + return ret +} + +func (s *UserSpaceMeta) ClearOutdated() { + s.cache.ClearOutdated() +} + +func (s *UserSpaceMeta) load(keys []types.UserSpaceID) ([]types.UserSpaceDetail, []bool) { + // vs := make([]stgmod.StorageDetail, len(keys)) + // oks := make([]bool, len(keys)) + + // coorCli, err := stgglb.CoordinatorMQPool.Acquire() + // if err != nil { + // logger.Warnf("new coordinator client: %v", err) + // return vs, oks + // } + // defer stgglb.CoordinatorMQPool.Release(coorCli) + + // get, err := coorCli.GetStorageDetails(coormq.ReqGetStorageDetails(keys)) + // if err != nil { + // logger.Warnf("get storage details: %v", err) + // return vs, oks + // } + + // for i := range keys { + // if get.Storages[i] != nil { + // vs[i] = *get.Storages[i] + // oks[i] = true + // } + // } + + // return vs, oks + +}