You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

queue.go 3.7 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131
  1. // Copyright 2019 The Gitea Authors. All rights reserved.
  2. // Use of this source code is governed by a MIT-style
  3. // license that can be found in the LICENSE file.
  4. package code
  5. import (
  6. "os"
  7. "code.gitea.io/gitea/models"
  8. "code.gitea.io/gitea/modules/graceful"
  9. "code.gitea.io/gitea/modules/log"
  10. "code.gitea.io/gitea/modules/setting"
  11. )
  12. type repoIndexerOperation struct {
  13. repoID int64
  14. deleted bool
  15. watchers []chan<- error
  16. }
  17. var repoIndexerOperationQueue chan repoIndexerOperation
  18. func processRepoIndexerOperationQueue(indexer Indexer) {
  19. repoIndexerOperationQueue = make(chan repoIndexerOperation, setting.Indexer.UpdateQueueLength)
  20. for {
  21. select {
  22. case op := <-repoIndexerOperationQueue:
  23. var err error
  24. if op.deleted {
  25. if err = indexer.Delete(op.repoID); err != nil {
  26. log.Error("indexer.Delete: %v", err)
  27. }
  28. } else {
  29. if err = indexer.Index(op.repoID); err != nil {
  30. log.Error("indexer.Index: %v", err)
  31. }
  32. }
  33. for _, watcher := range op.watchers {
  34. watcher <- err
  35. }
  36. case <-graceful.GetManager().IsShutdown():
  37. log.Info("PID: %d Repository indexer queue processing stopped", os.Getpid())
  38. return
  39. }
  40. }
  41. }
  42. // DeleteRepoFromIndexer remove all of a repository's entries from the indexer
  43. func DeleteRepoFromIndexer(repo *models.Repository, watchers ...chan<- error) {
  44. addOperationToQueue(repoIndexerOperation{repoID: repo.ID, deleted: true, watchers: watchers})
  45. }
  46. // UpdateRepoIndexer update a repository's entries in the indexer
  47. func UpdateRepoIndexer(repo *models.Repository, watchers ...chan<- error) {
  48. addOperationToQueue(repoIndexerOperation{repoID: repo.ID, deleted: false, watchers: watchers})
  49. }
  50. func addOperationToQueue(op repoIndexerOperation) {
  51. if !setting.Indexer.RepoIndexerEnabled {
  52. return
  53. }
  54. select {
  55. case repoIndexerOperationQueue <- op:
  56. break
  57. default:
  58. go func() {
  59. repoIndexerOperationQueue <- op
  60. }()
  61. }
  62. }
  63. // populateRepoIndexer populate the repo indexer with pre-existing data. This
  64. // should only be run when the indexer is created for the first time.
  65. func populateRepoIndexer() {
  66. log.Info("Populating the repo indexer with existing repositories")
  67. isShutdown := graceful.GetManager().IsShutdown()
  68. exist, err := models.IsTableNotEmpty("repository")
  69. if err != nil {
  70. log.Fatal("System error: %v", err)
  71. } else if !exist {
  72. return
  73. }
  74. // if there is any existing repo indexer metadata in the DB, delete it
  75. // since we are starting afresh. Also, xorm requires deletes to have a
  76. // condition, and we want to delete everything, thus 1=1.
  77. if err := models.DeleteAllRecords("repo_indexer_status"); err != nil {
  78. log.Fatal("System error: %v", err)
  79. }
  80. var maxRepoID int64
  81. if maxRepoID, err = models.GetMaxID("repository"); err != nil {
  82. log.Fatal("System error: %v", err)
  83. }
  84. // start with the maximum existing repo ID and work backwards, so that we
  85. // don't include repos that are created after gitea starts; such repos will
  86. // already be added to the indexer, and we don't need to add them again.
  87. for maxRepoID > 0 {
  88. select {
  89. case <-isShutdown:
  90. log.Info("Repository Indexer population shutdown before completion")
  91. return
  92. default:
  93. }
  94. ids, err := models.GetUnindexedRepos(maxRepoID, 0, 50)
  95. if err != nil {
  96. log.Error("populateRepoIndexer: %v", err)
  97. return
  98. } else if len(ids) == 0 {
  99. break
  100. }
  101. for _, id := range ids {
  102. select {
  103. case <-isShutdown:
  104. log.Info("Repository Indexer population shutdown before completion")
  105. return
  106. default:
  107. }
  108. repoIndexerOperationQueue <- repoIndexerOperation{
  109. repoID: id,
  110. deleted: false,
  111. }
  112. maxRepoID = id - 1
  113. }
  114. }
  115. log.Info("Done (re)populating the repo indexer with existing repositories")
  116. }