| @@ -25,6 +25,7 @@ func InitIssueIndexer() { | |||
| // populateIssueIndexer populate the issue indexer with issue data | |||
| func populateIssueIndexer() error { | |||
| batch := indexer.IssueIndexerBatch() | |||
| for page := 1; ; page++ { | |||
| repos, _, err := Repositories(&SearchRepoOptions{ | |||
| Page: page, | |||
| @@ -34,7 +35,7 @@ func populateIssueIndexer() error { | |||
| return fmt.Errorf("Repositories: %v", err) | |||
| } | |||
| if len(repos) == 0 { | |||
| return nil | |||
| return batch.Flush() | |||
| } | |||
| for _, repo := range repos { | |||
| issues, err := Issues(&IssuesOptions{ | |||
| @@ -42,29 +43,37 @@ func populateIssueIndexer() error { | |||
| IsClosed: util.OptionalBoolNone, | |||
| IsPull: util.OptionalBoolNone, | |||
| }) | |||
| updates := make([]indexer.IssueIndexerUpdate, len(issues)) | |||
| for i, issue := range issues { | |||
| updates[i] = issue.update() | |||
| if err != nil { | |||
| return err | |||
| } | |||
| if err = indexer.BatchUpdateIssues(updates...); err != nil { | |||
| return fmt.Errorf("BatchUpdate: %v", err) | |||
| for _, issue := range issues { | |||
| if err := batch.Add(issue.update()); err != nil { | |||
| return err | |||
| } | |||
| } | |||
| } | |||
| } | |||
| } | |||
| func processIssueIndexerUpdateQueue() { | |||
| batch := indexer.IssueIndexerBatch() | |||
| for { | |||
| var issueID int64 | |||
| select { | |||
| case issueID := <-issueIndexerUpdateQueue: | |||
| issue, err := GetIssueByID(issueID) | |||
| if err != nil { | |||
| log.Error(4, "issuesIndexer.Index: %v", err) | |||
| continue | |||
| } | |||
| if err = indexer.UpdateIssue(issue.update()); err != nil { | |||
| log.Error(4, "issuesIndexer.Index: %v", err) | |||
| case issueID = <-issueIndexerUpdateQueue: | |||
| default: | |||
| // flush whatever updates we currently have, since we | |||
| // might have to wait a while | |||
| if err := batch.Flush(); err != nil { | |||
| log.Error(4, "IssueIndexer: %v", err) | |||
| } | |||
| issueID = <-issueIndexerUpdateQueue | |||
| } | |||
| issue, err := GetIssueByID(issueID) | |||
| if err != nil { | |||
| log.Error(4, "GetIssueByID: %v", err) | |||
| } else if err = batch.Add(issue.update()); err != nil { | |||
| log.Error(4, "IssueIndexer: %v", err) | |||
| } | |||
| } | |||
| } | |||
| @@ -9,6 +9,8 @@ import ( | |||
| "strconv" | |||
| "github.com/blevesearch/bleve" | |||
| "github.com/blevesearch/bleve/analysis/token/unicodenorm" | |||
| "github.com/blevesearch/bleve/mapping" | |||
| "github.com/blevesearch/bleve/search/query" | |||
| ) | |||
| @@ -41,3 +43,50 @@ func newMatchPhraseQuery(matchPhrase, field, analyzer string) *query.MatchPhrase | |||
| q.Analyzer = analyzer | |||
| return q | |||
| } | |||
| const unicodeNormalizeName = "unicodeNormalize" | |||
| func addUnicodeNormalizeTokenFilter(m *mapping.IndexMappingImpl) error { | |||
| return m.AddCustomTokenFilter(unicodeNormalizeName, map[string]interface{}{ | |||
| "type": unicodenorm.Name, | |||
| "form": unicodenorm.NFC, | |||
| }) | |||
| } | |||
| // Update represents an update to an indexer | |||
| type Update interface { | |||
| addToBatch(batch *bleve.Batch) error | |||
| } | |||
| const maxBatchSize = 16 | |||
| // Batch batch of indexer updates that automatically flushes once it | |||
| // reaches a certain size | |||
| type Batch struct { | |||
| batch *bleve.Batch | |||
| index bleve.Index | |||
| } | |||
| // Add add update to batch, possibly flushing | |||
| func (batch *Batch) Add(update Update) error { | |||
| if err := update.addToBatch(batch.batch); err != nil { | |||
| return err | |||
| } | |||
| return batch.flushIfFull() | |||
| } | |||
| func (batch *Batch) flushIfFull() error { | |||
| if batch.batch.Size() >= maxBatchSize { | |||
| return batch.Flush() | |||
| } | |||
| return nil | |||
| } | |||
| // Flush manually flush the batch, regardless of its size | |||
| func (batch *Batch) Flush() error { | |||
| if err := batch.index.Batch(batch.batch); err != nil { | |||
| return err | |||
| } | |||
| batch.batch.Reset() | |||
| return nil | |||
| } | |||
| @@ -13,7 +13,6 @@ import ( | |||
| "github.com/blevesearch/bleve" | |||
| "github.com/blevesearch/bleve/analysis/analyzer/custom" | |||
| "github.com/blevesearch/bleve/analysis/token/lowercase" | |||
| "github.com/blevesearch/bleve/analysis/token/unicodenorm" | |||
| "github.com/blevesearch/bleve/analysis/tokenizer/unicode" | |||
| "github.com/blevesearch/bleve/index/upsidedown" | |||
| ) | |||
| @@ -35,6 +34,10 @@ type IssueIndexerUpdate struct { | |||
| Data *IssueIndexerData | |||
| } | |||
| func (update IssueIndexerUpdate) addToBatch(batch *bleve.Batch) error { | |||
| return batch.Index(indexerID(update.IssueID), update.Data) | |||
| } | |||
| const issueIndexerAnalyzer = "issueIndexer" | |||
| // InitIssueIndexer initialize issue indexer | |||
| @@ -74,17 +77,13 @@ func createIssueIndexer() error { | |||
| docMapping.AddFieldMappingsAt("Content", textFieldMapping) | |||
| docMapping.AddFieldMappingsAt("Comments", textFieldMapping) | |||
| const unicodeNormNFC = "unicodeNormNFC" | |||
| if err := mapping.AddCustomTokenFilter(unicodeNormNFC, map[string]interface{}{ | |||
| "type": unicodenorm.Name, | |||
| "form": unicodenorm.NFC, | |||
| }); err != nil { | |||
| if err := addUnicodeNormalizeTokenFilter(mapping); err != nil { | |||
| return err | |||
| } else if err = mapping.AddCustomAnalyzer(issueIndexerAnalyzer, map[string]interface{}{ | |||
| "type": custom.Name, | |||
| "char_filters": []string{}, | |||
| "tokenizer": unicode.Name, | |||
| "token_filters": []string{unicodeNormNFC, lowercase.Name}, | |||
| "token_filters": []string{unicodeNormalizeName, lowercase.Name}, | |||
| }); err != nil { | |||
| return err | |||
| } | |||
| @@ -97,21 +96,12 @@ func createIssueIndexer() error { | |||
| return err | |||
| } | |||
| // UpdateIssue update the issue indexer | |||
| func UpdateIssue(update IssueIndexerUpdate) error { | |||
| return issueIndexer.Index(indexerID(update.IssueID), update.Data) | |||
| } | |||
| // BatchUpdateIssues perform a batch update of the issue indexer | |||
| func BatchUpdateIssues(updates ...IssueIndexerUpdate) error { | |||
| batch := issueIndexer.NewBatch() | |||
| for _, update := range updates { | |||
| err := batch.Index(indexerID(update.IssueID), update.Data) | |||
| if err != nil { | |||
| return err | |||
| } | |||
| // IssueIndexerBatch batch to add updates to | |||
| func IssueIndexerBatch() *Batch { | |||
| return &Batch{ | |||
| batch: issueIndexer.NewBatch(), | |||
| index: issueIndexer, | |||
| } | |||
| return issueIndexer.Batch(batch) | |||
| } | |||
| // SearchIssuesByKeyword searches for issues by given conditions. | |||