package publock import ( "fmt" "io" "sync" "time" "gitlink.org.cn/cloudream/common/pkgs/async" "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/trie" "gitlink.org.cn/cloudream/common/pkgs/types" "gitlink.org.cn/cloudream/common/utils/lo2" "gitlink.org.cn/cloudream/common/utils/serder" "gitlink.org.cn/cloudream/jcs-pub/client/internal/cluster" "gitlink.org.cn/cloudream/jcs-pub/client/internal/publock/lockprovider" pubtypes "gitlink.org.cn/cloudream/jcs-pub/client/internal/publock/types" "gitlink.org.cn/cloudream/jcs-pub/common/ecode" ) type Core struct { cfg Config clster *cluster.Cluster fsm *ClusterFSM lock *sync.Mutex provdersTrie *trie.Trie[pubtypes.LockProvider] acquirings []*acquiring // 必须使用数组,因为要保证顺序(集群多个节点的执行结果应该严格相同) acquireds map[pubtypes.RequestID]*acquired eventCh *async.UnboundChannel[CoreEvent] doneCh chan any } func NewCore(cfg Config, clster *cluster.Cluster) *Core { c := &Core{ cfg: cfg, clster: clster, lock: &sync.Mutex{}, provdersTrie: trie.NewTrie[pubtypes.LockProvider](), acquireds: make(map[pubtypes.RequestID]*acquired), eventCh: async.NewUnboundChannel[CoreEvent](), doneCh: make(chan any, 1), } c.fsm = &ClusterFSM{ core: c, } c.provdersTrie.Create([]any{lockprovider.UserSpaceLockPathPrefix, trie.WORD_ANY}).Value = lockprovider.NewUserSpaceLock() c.provdersTrie.Create([]any{lockprovider.PackageLockPathPrefix, trie.WORD_ANY}).Value = lockprovider.NewPackageLock() return c } func (s *Core) Apply(cmd Command) { switch cmd := cmd.(type) { case *Acquire: s.acquire(*cmd) case *AcquireTimeout: s.acquireTimeout(*cmd) case *Release: s.release(*cmd) case *LeaseExpired: s.leaseExpired(*cmd) case *Renew: s.renew(*cmd) } } func (s *Core) EventChan() *async.UnboundChannel[CoreEvent] { return s.eventCh } func (s *Core) FSM() cluster.FSM { return s.fsm } func (s *Core) Start() { log := logger.WithField("Mod", "Publock.Core") go func() { ticker := time.NewTicker(time.Second) defer ticker.Stop() loop: for { select { case <-s.doneCh: break loop case <-ticker.C: var acTimeout []pubtypes.RequestID var leExpired []pubtypes.RequestID s.lock.Lock() // 定时清理超时的锁请求 for _, req := range s.acquirings { if time.Since(req.StartTime) < req.Cmd.Timeout { continue } acTimeout = append(acTimeout, req.Cmd.ID) } // 定时清理过期的锁 for reqID, ac := range s.acquireds { if ac.ExpireCounter < s.cfg.LeaseExpiredSeconds { ac.ExpireCounter++ continue } leExpired = append(leExpired, reqID) } s.lock.Unlock() for _, reqID := range acTimeout { cmd := AcquireTimeout{ ID: reqID, } if s.clster != nil { data, err := serder.ObjectToJSONEx(cmd) if err != nil { log.Warnf("cmd %T to json: %v", cmd, err) continue } // 不管是否成功,有定时任务兜底 s.clster.Apply(s.fsm.ID(), data, time.Second*3) } else { s.acquireTimeout(AcquireTimeout{ ID: reqID, }) } } for _, reqID := range leExpired { cmd := LeaseExpired{ ID: reqID, } if s.clster != nil { data, err := serder.ObjectToJSONEx(cmd) if err != nil { log.Warnf("cmd %T to json: %v", cmd, err) continue } // 不管是否成功,有定时任务兜底 s.clster.Apply(s.fsm.ID(), data, time.Second*3) } else { s.leaseExpired(LeaseExpired{ ID: reqID, }) } } } } }() } func (s *Core) Stop() { select { case s.doneCh <- nil: default: } s.eventCh.Close() } type acquiring struct { Cmd Acquire LastErr *ecode.CodeError // 这个值来自自每个节点自身,所以可能会各不相同。 // 但这个值只是作为判断获取锁是否超时的依据,所以问题不大。 StartTime time.Time } type acquired struct { ID pubtypes.RequestID Req pubtypes.LockRequest // 这个值用来记录锁经过的过期检查的次数,超过一定次数则认为过期。 // 因为未通知加锁的服务而释放一个锁是一个危险操作,所以这里采用这种计数的方式来实现过期 ExpireCounter int } func (c *Core) acquire(cmd Acquire) { c.lock.Lock() defer c.lock.Unlock() // 立刻检测锁是否可用 cerr := c.tryAcquireOne(cmd.ID, cmd.Request) if cerr == nil { c.eventCh.Send(&AcquireResult{ Raw: cmd, Error: nil, }) return } // 不可用则加入等待列表 info := &acquiring{ Cmd: cmd, LastErr: cerr, StartTime: time.Now(), } c.acquirings = append(c.acquirings, info) go func() { log := logger.WithField("Mod", "Publock.Core") <-time.After(cmd.Timeout) ac := AcquireTimeout{ ID: cmd.ID, } data, err := serder.ObjectToJSONEx(ac) if err != nil { log.Warnf("cmd %T to json: %v", ac, err) return } // 不管是否成功,有定时任务兜底 c.clster.Apply(c.fsm.ID(), data, cmd.Timeout) }() } func (s *Core) release(cmd Release) { reqID := cmd.ID s.lock.Lock() defer s.lock.Unlock() ac, ok := s.acquireds[reqID] if !ok { return } s.releaseRequest(reqID, ac.Req) s.eventCh.Send(&Released{ ID: reqID, }) s.tryAcquirings() } func (c *Core) acquireTimeout(cmd AcquireTimeout) { c.lock.Lock() defer c.lock.Unlock() for i, req := range c.acquirings { if req.Cmd.ID == cmd.ID { c.eventCh.Send(&AcquireResult{ Raw: req.Cmd, Error: req.LastErr, }) c.acquirings = lo2.RemoveAt(c.acquirings, i) return } } } func (c *Core) leaseExpired(cmd LeaseExpired) { log := logger.WithField("Mod", "Publock.Core") c.lock.Lock() defer c.lock.Unlock() ac, ok := c.acquireds[cmd.ID] if !ok { return } log.Debugf("lock request %v lease expired", ac.ID) c.releaseRequest(ac.ID, ac.Req) c.tryAcquirings() } func (c *Core) renew(cmd Renew) { c.lock.Lock() defer c.lock.Unlock() for _, reqID := range cmd.IDs { ac, ok := c.acquireds[reqID] if !ok { continue } ac.ExpireCounter = 0 } } func (a *Core) tryAcquirings() { for i := 0; i < len(a.acquirings); i++ { req := a.acquirings[i] err := a.tryAcquireOne(req.Cmd.ID, req.Cmd.Request) if err != nil { req.LastErr = err continue } a.eventCh.Send(&AcquireResult{ Raw: req.Cmd, Error: nil, }) a.acquirings[i] = nil } a.acquirings = lo2.RemoveAllDefault(a.acquirings) } func (s *Core) tryAcquireOne(reqID pubtypes.RequestID, req pubtypes.LockRequest) *ecode.CodeError { cerr := s.testOneRequest(req) if cerr != nil { return cerr } s.applyRequest(reqID, req) s.acquireds[reqID] = &acquired{ ID: reqID, Req: req, ExpireCounter: 0, } return nil } func (s *Core) testOneRequest(req pubtypes.LockRequest) *ecode.CodeError { for _, lock := range req.Locks { n, ok := s.provdersTrie.WalkEnd(lock.Path) if !ok || n.Value == nil { return ecode.Newf(ecode.DataNotFound, "lock provider not found for path %v", lock.Path) } err := n.Value.CanLock(lock) if err != nil { return ecode.Newf(ecode.LockConflict, "%v", err) } } return nil } func (s *Core) applyRequest(reqID pubtypes.RequestID, req pubtypes.LockRequest) { for _, lock := range req.Locks { p, _ := s.provdersTrie.WalkEnd(lock.Path) p.Value.Lock(reqID, lock) } } func (s *Core) releaseRequest(reqID pubtypes.RequestID, req pubtypes.LockRequest) { for _, lock := range req.Locks { p, _ := s.provdersTrie.WalkEnd(lock.Path) p.Value.Unlock(reqID, lock) } delete(s.acquireds, reqID) } type LockedRequest struct { Req pubtypes.LockRequest ReqID pubtypes.RequestID } type Command interface { IsCommand() bool } var _ = serder.UseTypeUnionExternallyTagged(types.Ref(types.NewTypeUnion[Command]( (*Acquire)(nil), (*Release)(nil), (*AcquireTimeout)(nil), (*LeaseExpired)(nil), (*Renew)(nil), ))) type Acquire struct { ID pubtypes.RequestID Request pubtypes.LockRequest Timeout time.Duration Reason string } func (a *Acquire) IsCommand() bool { return true } type Release struct { ID pubtypes.RequestID } func (r *Release) IsCommand() bool { return true } type AcquireTimeout struct { ID pubtypes.RequestID } func (a *AcquireTimeout) IsCommand() bool { return true } type LeaseExpired struct { ID pubtypes.RequestID } func (l *LeaseExpired) IsCommand() bool { return true } type Renew struct { IDs []pubtypes.RequestID } func (r *Renew) IsCommand() bool { return true } type CoreEvent interface { IsCoreEvent() bool } type AcquireResult struct { Raw Acquire Error *ecode.CodeError } func (a *AcquireResult) IsCoreEvent() bool { return true } type Released struct { ID pubtypes.RequestID } func (r *Released) IsCoreEvent() bool { return true } type ClusterFSM struct { core *Core } func (f *ClusterFSM) ID() string { return "Publock" } func (f *ClusterFSM) Apply(cmdData []byte) ([]byte, error) { cmd, err := serder.JSONToObjectEx[Command](cmdData) if err != nil { return nil, fmt.Errorf("parse command: %v", err) } f.core.Apply(cmd) return nil, nil } func (f *ClusterFSM) Snapshot() (cluster.FSMSnapshot, error) { log := logger.WithField("Mod", "Publock.ClusterFSM") log.Debugf("make snapshot") f.core.lock.Lock() defer f.core.lock.Unlock() acquireds := make([]*acquired, 0, len(f.core.acquireds)) for _, ac := range f.core.acquireds { newAc := &acquired{ ID: ac.ID, Req: ac.Req, ExpireCounter: ac.ExpireCounter, } acquireds = append(acquireds, newAc) } acquirings := make([]*acquiring, 0, len(f.core.acquirings)) for _, ac := range f.core.acquirings { newAc := &acquiring{ Cmd: ac.Cmd, LastErr: ac.LastErr, StartTime: ac.StartTime, } acquirings = append(acquirings, newAc) } return &FSMSnapshot{ Acquireds: acquireds, Acquirings: acquirings, }, nil } func (f *ClusterFSM) Restore(input io.Reader) error { log := logger.WithField("Mod", "Publock.ClusterFSM") log.Debugf("restore from input") snap := &FSMSnapshot{} err := serder.JSONToObjectStream(input, snap) if err != nil { return err } f.core.lock.Lock() defer f.core.lock.Unlock() f.core.provdersTrie.Walk(nil, func(word string, wordIndex int, node *trie.Node[pubtypes.LockProvider], isWordNode bool) { if node.Value != nil { node.Value.Clear() } }) f.core.acquireds = make(map[pubtypes.RequestID]*acquired) for _, a := range snap.Acquireds { f.core.applyRequest(a.ID, a.Req) } f.core.acquirings = snap.Acquirings for _, req := range f.core.acquirings { // 已经超时的请求不启动精确的定时任务 if time.Since(req.StartTime) > req.Cmd.Timeout { continue } go func() { <-time.After(req.Cmd.Timeout - time.Since(req.StartTime)) cmd := AcquireTimeout{ ID: req.Cmd.ID, } data, err := serder.ObjectToJSONEx(cmd) if err != nil { log.Warnf("cmd %T to json: %v", cmd, err) return } // 不管是否成功,有定时任务兜底 f.core.clster.Apply(f.core.fsm.ID(), data, req.Cmd.Timeout) }() } return nil } type FSMSnapshot struct { Acquireds []*acquired Acquirings []*acquiring } func (s *FSMSnapshot) Persist(output io.Writer) error { rc := serder.ObjectToJSONStream(s) defer rc.Close() _, err := io.Copy(output, rc) return err } func (s *FSMSnapshot) Release() {}