You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

clientManager.go 1.7 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081
  1. package socketwrap
  2. import (
  3. "os"
  4. "os/signal"
  5. "syscall"
  6. "code.gitea.io/gitea/models"
  7. "code.gitea.io/gitea/modules/log"
  8. "github.com/elliotchance/orderedmap"
  9. )
  10. type ClientsManager struct {
  11. Clients *orderedmap.OrderedMap
  12. Register chan *Client
  13. Unregister chan *Client
  14. }
  15. func NewClientsManager() *ClientsManager {
  16. return &ClientsManager{
  17. Register: make(chan *Client),
  18. Unregister: make(chan *Client),
  19. Clients: orderedmap.NewOrderedMap(),
  20. }
  21. }
  22. const MaxClients = 100
  23. var LastActionsQueue = NewSyncQueue(20)
  24. func (h *ClientsManager) Run() {
  25. initActionQueue()
  26. sig := make(chan os.Signal, 1)
  27. signal.Notify(sig, os.Interrupt, syscall.SIGTERM)
  28. var signalsReceived uint
  29. for {
  30. select {
  31. case client := <-h.Register:
  32. h.Clients.Set(client, true)
  33. if h.Clients.Len() > MaxClients {
  34. h.Clients.Delete(h.Clients.Front().Key)
  35. }
  36. case client := <-h.Unregister:
  37. if _, ok := h.Clients.Get(client); ok {
  38. h.Clients.Delete(client)
  39. close(client.Send)
  40. }
  41. case message := <-models.ActionChan:
  42. LastActionsQueue.Push(message)
  43. for _, client := range h.Clients.Keys() {
  44. select {
  45. case client.(*Client).Send <- message:
  46. default:
  47. close(client.(*Client).Send)
  48. h.Clients.Delete(client)
  49. }
  50. }
  51. case s := <-sig:
  52. log.Info("received signal", s)
  53. signalsReceived++
  54. if signalsReceived < 2 {
  55. for _, client := range h.Clients.Keys() {
  56. h.Clients.Delete(client)
  57. client.(*Client).Close()
  58. }
  59. break
  60. }
  61. }
  62. }
  63. }
  64. func initActionQueue() {
  65. actions, err := models.GetLast20PublicFeeds()
  66. if err == nil {
  67. for i := len(actions) - 1; i >= 0; i-- {
  68. LastActionsQueue.Push(actions[i])
  69. }
  70. }
  71. }