Browse Source

迁移代码

gitlink
Sydonian 8 months ago
parent
commit
a1a94dde15
5 changed files with 392 additions and 0 deletions
  1. +96
    -0
      client/internal/metacache/connectivity.go
  2. +27
    -0
      client/internal/metacache/host.go
  3. +75
    -0
      client/internal/metacache/hubmeta.go
  4. +121
    -0
      client/internal/metacache/simple.go
  5. +73
    -0
      client/internal/metacache/storagemeta.go

+ 96
- 0
client/internal/metacache/connectivity.go View File

@@ -0,0 +1,96 @@
package metacache

import (
"sync"
"time"

"gitlink.org.cn/cloudream/common/pkgs/logger"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
stgglb "gitlink.org.cn/cloudream/storage2/common/globals"
coormq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/coordinator"
)

func (m *MetaCacheHost) AddConnectivity() *Connectivity {
cache := &Connectivity{
entries: make(map[cdssdk.HubID]*ConnectivityEntry),
}

m.caches = append(m.caches, cache)
return cache
}

type Connectivity struct {
lock sync.RWMutex
entries map[cdssdk.HubID]*ConnectivityEntry
}

func (c *Connectivity) Get(from cdssdk.HubID, to cdssdk.HubID) *time.Duration {
for i := 0; i < 2; i++ {
c.lock.RLock()
entry, ok := c.entries[from]
if ok {
con, ok := entry.To[to]
if ok {
c.lock.RUnlock()

if con.Latency == nil {
return nil
}
l := time.Millisecond * time.Duration(*con.Latency)
return &l
}
}
c.lock.RUnlock()

c.load(from)
}

return nil
}

func (c *Connectivity) ClearOutdated() {
c.lock.Lock()
defer c.lock.Unlock()

for hubID, entry := range c.entries {
if time.Since(entry.UpdateTime) > time.Minute*5 {
delete(c.entries, hubID)
}
}
}

func (c *Connectivity) load(hubID cdssdk.HubID) {
coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil {
logger.Warnf("new coordinator client: %v", err)
return
}
defer stgglb.CoordinatorMQPool.Release(coorCli)

get, err := coorCli.GetHubConnectivities(coormq.ReqGetHubConnectivities([]cdssdk.HubID{hubID}))
if err != nil {
logger.Warnf("get hub connectivities: %v", err)
return
}

c.lock.Lock()
defer c.lock.Unlock()

ce := &ConnectivityEntry{
From: hubID,
To: make(map[cdssdk.HubID]cdssdk.HubConnectivity),
UpdateTime: time.Now(),
}

for _, conn := range get.Connectivities {
ce.To[conn.ToHubID] = conn
}

c.entries[hubID] = ce
}

type ConnectivityEntry struct {
From cdssdk.HubID
To map[cdssdk.HubID]cdssdk.HubConnectivity
UpdateTime time.Time
}

+ 27
- 0
client/internal/metacache/host.go View File

@@ -0,0 +1,27 @@
package metacache

import "time"

type MetaCache interface {
ClearOutdated()
}

type MetaCacheHost struct {
caches []MetaCache
}

func NewHost() *MetaCacheHost {
return &MetaCacheHost{}
}

func (m *MetaCacheHost) Serve() {
ticker := time.NewTicker(time.Minute)
for {
select {
case <-ticker.C:
for _, cache := range m.caches {
cache.ClearOutdated()
}
}
}
}

+ 75
- 0
client/internal/metacache/hubmeta.go View File

@@ -0,0 +1,75 @@
package metacache

import (
"time"

"gitlink.org.cn/cloudream/common/pkgs/logger"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
stgglb "gitlink.org.cn/cloudream/storage2/common/globals"
coormq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/coordinator"
)

func (m *MetaCacheHost) AddHubMeta() *HubMeta {
meta := &HubMeta{}
meta.cache = NewSimpleMetaCache(SimpleMetaCacheConfig[cdssdk.HubID, cdssdk.Hub]{
Getter: meta.load,
Expire: time.Minute * 5,
})

m.caches = append(m.caches, meta)
return meta
}

type HubMeta struct {
cache *SimpleMetaCache[cdssdk.HubID, cdssdk.Hub]
}

func (h *HubMeta) Get(hubID cdssdk.HubID) *cdssdk.Hub {
v, ok := h.cache.Get(hubID)
if ok {
return &v
}
return nil
}

func (h *HubMeta) GetMany(hubIDs []cdssdk.HubID) []*cdssdk.Hub {
vs, oks := h.cache.GetMany(hubIDs)
ret := make([]*cdssdk.Hub, len(vs))
for i := range vs {
if oks[i] {
ret[i] = &vs[i]
}
}
return ret
}

func (h *HubMeta) ClearOutdated() {
h.cache.ClearOutdated()
}

func (h *HubMeta) load(keys []cdssdk.HubID) ([]cdssdk.Hub, []bool) {
vs := make([]cdssdk.Hub, len(keys))
oks := make([]bool, len(keys))

coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil {
logger.Warnf("new coordinator client: %v", err)
return vs, oks
}
defer stgglb.CoordinatorMQPool.Release(coorCli)

get, err := coorCli.GetHubs(coormq.NewGetHubs(keys))
if err != nil {
logger.Warnf("get hubs: %v", err)
return vs, oks
}

for i := range keys {
if get.Hubs[i] != nil {
vs[i] = *get.Hubs[i]
oks[i] = true
}
}

return vs, oks
}

+ 121
- 0
client/internal/metacache/simple.go View File

@@ -0,0 +1,121 @@
package metacache

import (
"sync"
"time"
)

type SimpleMetaCacheConfig[K comparable, V any] struct {
Getter Getter[K, V]
Expire time.Duration
}

type Getter[K comparable, V any] func(keys []K) ([]V, []bool)

type SimpleMetaCache[K comparable, V any] struct {
lock sync.RWMutex
cache map[K]*CacheEntry[K, V]
cfg SimpleMetaCacheConfig[K, V]
}

func NewSimpleMetaCache[K comparable, V any](cfg SimpleMetaCacheConfig[K, V]) *SimpleMetaCache[K, V] {
return &SimpleMetaCache[K, V]{
cache: make(map[K]*CacheEntry[K, V]),
cfg: cfg,
}
}

func (mc *SimpleMetaCache[K, V]) Get(key K) (V, bool) {
var ret V
var ok bool

for i := 0; i < 2; i++ {
mc.lock.RLock()
entry, o := mc.cache[key]
if o {
ret = entry.Data
ok = true
}
mc.lock.RUnlock()

if o {
break
}

mc.load([]K{key})
}

return ret, ok
}

func (mc *SimpleMetaCache[K, V]) GetMany(keys []K) ([]V, []bool) {
result := make([]V, len(keys))
oks := make([]bool, len(keys))

for i := 0; i < 2; i++ {
allGet := true
mc.lock.RLock()
for i, key := range keys {
entry, ok := mc.cache[key]
if ok {
result[i] = entry.Data
oks[i] = true
} else {
allGet = false
}
}
mc.lock.RUnlock()

if allGet {
break
}

mc.load(keys)
}

return result, oks
}

func (mc *SimpleMetaCache[K, V]) load(keys []K) {
vs, getOks := mc.cfg.Getter(keys)

mc.lock.Lock()
defer mc.lock.Unlock()

for i, key := range keys {
if !getOks[i] {
continue
}

_, ok := mc.cache[key]
// 缓存中已有key则认为缓存中是最新的,不再更新
if ok {
continue
}

entry := &CacheEntry[K, V]{
Key: key,
Data: vs[i],
UpdateTime: time.Now(),
}
mc.cache[key] = entry
}
}

func (mc *SimpleMetaCache[K, V]) ClearOutdated() {
mc.lock.Lock()
defer mc.lock.Unlock()

for key, entry := range mc.cache {
dt := time.Since(entry.UpdateTime)
if dt > mc.cfg.Expire || dt < 0 {
delete(mc.cache, key)
}
}
}

type CacheEntry[K comparable, V any] struct {
Key K
Data V
UpdateTime time.Time
}

+ 73
- 0
client/internal/metacache/storagemeta.go View File

@@ -0,0 +1,73 @@
package metacache

import (
"time"

"gitlink.org.cn/cloudream/storage2/client/types"
)

func (m *MetaCacheHost) AddStorageMeta() *UserSpaceMeta {
meta := &UserSpaceMeta{}
meta.cache = NewSimpleMetaCache(SimpleMetaCacheConfig[types.UserSpaceID, types.UserSpaceDetail]{
Getter: meta.load,
Expire: time.Minute * 5,
})

m.caches = append(m.caches, meta)
return meta
}

type UserSpaceMeta struct {
cache *SimpleMetaCache[types.UserSpaceID, types.UserSpaceDetail]
}

func (s *UserSpaceMeta) Get(spaceID types.UserSpaceID) *types.UserSpaceDetail {
v, ok := s.cache.Get(spaceID)
if ok {
return &v
}
return nil
}

func (s *UserSpaceMeta) GetMany(spaceIDs []types.UserSpaceID) []*types.UserSpaceDetail {
vs, oks := s.cache.GetMany(spaceIDs)
ret := make([]*types.UserSpaceDetail, len(vs))
for i := range vs {
if oks[i] {
ret[i] = &vs[i]
}
}
return ret
}

func (s *UserSpaceMeta) ClearOutdated() {
s.cache.ClearOutdated()
}

func (s *UserSpaceMeta) load(keys []types.UserSpaceID) ([]types.UserSpaceDetail, []bool) {
// vs := make([]stgmod.StorageDetail, len(keys))
// oks := make([]bool, len(keys))

// coorCli, err := stgglb.CoordinatorMQPool.Acquire()
// if err != nil {
// logger.Warnf("new coordinator client: %v", err)
// return vs, oks
// }
// defer stgglb.CoordinatorMQPool.Release(coorCli)

// get, err := coorCli.GetStorageDetails(coormq.ReqGetStorageDetails(keys))
// if err != nil {
// logger.Warnf("get storage details: %v", err)
// return vs, oks
// }

// for i := range keys {
// if get.Storages[i] != nil {
// vs[i] = *get.Storages[i]
// oks[i] = true
// }
// }

// return vs, oks

}

Loading…
Cancel
Save