| @@ -53,7 +53,7 @@ func populateIssueIndexer() error { | |||
| return err | |||
| } | |||
| for _, issue := range issues { | |||
| if err := batch.Add(issue.update()); err != nil { | |||
| if err := issue.update().AddToFlushingBatch(batch); err != nil { | |||
| return err | |||
| } | |||
| } | |||
| @@ -78,7 +78,7 @@ func processIssueIndexerUpdateQueue() { | |||
| issue, err := GetIssueByID(issueID) | |||
| if err != nil { | |||
| log.Error(4, "GetIssueByID: %v", err) | |||
| } else if err = batch.Add(issue.update()); err != nil { | |||
| } else if err = issue.update().AddToFlushingBatch(batch); err != nil { | |||
| log.Error(4, "IssueIndexer: %v", err) | |||
| } | |||
| } | |||
| @@ -14,6 +14,8 @@ import ( | |||
| "code.gitea.io/gitea/modules/indexer" | |||
| "code.gitea.io/gitea/modules/log" | |||
| "code.gitea.io/gitea/modules/setting" | |||
| "github.com/ethantkoenig/rupture" | |||
| ) | |||
| // RepoIndexerStatus status of a repo's entry in the repo indexer | |||
| @@ -187,7 +189,7 @@ func getRepoChanges(repo *Repository, revision string) (*repoChanges, error) { | |||
| return nonGenesisChanges(repo, revision) | |||
| } | |||
| func addUpdate(update fileUpdate, repo *Repository, batch *indexer.Batch) error { | |||
| func addUpdate(update fileUpdate, repo *Repository, batch rupture.FlushingBatch) error { | |||
| stdout, err := git.NewCommand("cat-file", "-s", update.BlobSha). | |||
| RunInDir(repo.RepoPath()) | |||
| if err != nil { | |||
| @@ -206,24 +208,26 @@ func addUpdate(update fileUpdate, repo *Repository, batch *indexer.Batch) error | |||
| } else if !base.IsTextFile(fileContents) { | |||
| return nil | |||
| } | |||
| return batch.Add(indexer.RepoIndexerUpdate{ | |||
| indexerUpdate := indexer.RepoIndexerUpdate{ | |||
| Filepath: update.Filename, | |||
| Op: indexer.RepoIndexerOpUpdate, | |||
| Data: &indexer.RepoIndexerData{ | |||
| RepoID: repo.ID, | |||
| Content: string(fileContents), | |||
| }, | |||
| }) | |||
| } | |||
| return indexerUpdate.AddToFlushingBatch(batch) | |||
| } | |||
| func addDelete(filename string, repo *Repository, batch *indexer.Batch) error { | |||
| return batch.Add(indexer.RepoIndexerUpdate{ | |||
| func addDelete(filename string, repo *Repository, batch rupture.FlushingBatch) error { | |||
| indexerUpdate := indexer.RepoIndexerUpdate{ | |||
| Filepath: filename, | |||
| Op: indexer.RepoIndexerOpDelete, | |||
| Data: &indexer.RepoIndexerData{ | |||
| RepoID: repo.ID, | |||
| }, | |||
| }) | |||
| } | |||
| return indexerUpdate.AddToFlushingBatch(batch) | |||
| } | |||
| // parseGitLsTreeOutput parses the output of a `git ls-tree -r --full-name` command | |||
| @@ -6,12 +6,17 @@ package indexer | |||
| import ( | |||
| "fmt" | |||
| "os" | |||
| "strconv" | |||
| "code.gitea.io/gitea/modules/setting" | |||
| "github.com/blevesearch/bleve" | |||
| "github.com/blevesearch/bleve/analysis/token/unicodenorm" | |||
| "github.com/blevesearch/bleve/index/upsidedown" | |||
| "github.com/blevesearch/bleve/mapping" | |||
| "github.com/blevesearch/bleve/search/query" | |||
| "github.com/ethantkoenig/rupture" | |||
| ) | |||
| // indexerID a bleve-compatible unique identifier for an integer id | |||
| @@ -53,40 +58,36 @@ func addUnicodeNormalizeTokenFilter(m *mapping.IndexMappingImpl) error { | |||
| }) | |||
| } | |||
| // Update represents an update to an indexer | |||
| type Update interface { | |||
| addToBatch(batch *bleve.Batch) error | |||
| } | |||
| const maxBatchSize = 16 | |||
| // Batch batch of indexer updates that automatically flushes once it | |||
| // reaches a certain size | |||
| type Batch struct { | |||
| batch *bleve.Batch | |||
| index bleve.Index | |||
| } | |||
| // Add add update to batch, possibly flushing | |||
| func (batch *Batch) Add(update Update) error { | |||
| if err := update.addToBatch(batch.batch); err != nil { | |||
| return err | |||
| // openIndexer open the index at the specified path, checking for metadata | |||
| // updates and bleve version updates. If index needs to be created (or | |||
| // re-created), returns (nil, nil) | |||
| func openIndexer(path string, latestVersion int) (bleve.Index, error) { | |||
| _, err := os.Stat(setting.Indexer.IssuePath) | |||
| if err != nil && os.IsNotExist(err) { | |||
| return nil, nil | |||
| } else if err != nil { | |||
| return nil, err | |||
| } | |||
| return batch.flushIfFull() | |||
| } | |||
| func (batch *Batch) flushIfFull() error { | |||
| if batch.batch.Size() >= maxBatchSize { | |||
| return batch.Flush() | |||
| metadata, err := rupture.ReadIndexMetadata(path) | |||
| if err != nil { | |||
| return nil, err | |||
| } | |||
| if metadata.Version < latestVersion { | |||
| // the indexer is using a previous version, so we should delete it and | |||
| // re-populate | |||
| return nil, os.RemoveAll(path) | |||
| } | |||
| return nil | |||
| } | |||
| // Flush manually flush the batch, regardless of its size | |||
| func (batch *Batch) Flush() error { | |||
| if err := batch.index.Batch(batch.batch); err != nil { | |||
| return err | |||
| index, err := bleve.Open(path) | |||
| if err != nil && err == upsidedown.IncompatibleVersion { | |||
| // the indexer was built with a previous version of bleve, so we should | |||
| // delete it and re-populate | |||
| return nil, os.RemoveAll(path) | |||
| } else if err != nil { | |||
| return nil, err | |||
| } | |||
| batch.batch.Reset() | |||
| return nil | |||
| return index, nil | |||
| } | |||
| @@ -5,8 +5,6 @@ | |||
| package indexer | |||
| import ( | |||
| "os" | |||
| "code.gitea.io/gitea/modules/log" | |||
| "code.gitea.io/gitea/modules/setting" | |||
| @@ -14,12 +12,19 @@ import ( | |||
| "github.com/blevesearch/bleve/analysis/analyzer/custom" | |||
| "github.com/blevesearch/bleve/analysis/token/lowercase" | |||
| "github.com/blevesearch/bleve/analysis/tokenizer/unicode" | |||
| "github.com/blevesearch/bleve/index/upsidedown" | |||
| "github.com/ethantkoenig/rupture" | |||
| ) | |||
| // issueIndexer (thread-safe) index for searching issues | |||
| var issueIndexer bleve.Index | |||
| const ( | |||
| issueIndexerAnalyzer = "issueIndexer" | |||
| issueIndexerDocType = "issueIndexerDocType" | |||
| issueIndexerLatestVersion = 1 | |||
| ) | |||
| // IssueIndexerData data stored in the issue indexer | |||
| type IssueIndexerData struct { | |||
| RepoID int64 | |||
| @@ -28,35 +33,33 @@ type IssueIndexerData struct { | |||
| Comments []string | |||
| } | |||
| // Type returns the document type, for bleve's mapping.Classifier interface. | |||
| func (i *IssueIndexerData) Type() string { | |||
| return issueIndexerDocType | |||
| } | |||
| // IssueIndexerUpdate an update to the issue indexer | |||
| type IssueIndexerUpdate struct { | |||
| IssueID int64 | |||
| Data *IssueIndexerData | |||
| } | |||
| func (update IssueIndexerUpdate) addToBatch(batch *bleve.Batch) error { | |||
| return batch.Index(indexerID(update.IssueID), update.Data) | |||
| // AddToFlushingBatch adds the update to the given flushing batch. | |||
| func (i IssueIndexerUpdate) AddToFlushingBatch(batch rupture.FlushingBatch) error { | |||
| return batch.Index(indexerID(i.IssueID), i.Data) | |||
| } | |||
| const issueIndexerAnalyzer = "issueIndexer" | |||
| // InitIssueIndexer initialize issue indexer | |||
| func InitIssueIndexer(populateIndexer func() error) { | |||
| _, err := os.Stat(setting.Indexer.IssuePath) | |||
| if err != nil && !os.IsNotExist(err) { | |||
| var err error | |||
| issueIndexer, err = openIndexer(setting.Indexer.IssuePath, issueIndexerLatestVersion) | |||
| if err != nil { | |||
| log.Fatal(4, "InitIssueIndexer: %v", err) | |||
| } else if err == nil { | |||
| issueIndexer, err = bleve.Open(setting.Indexer.IssuePath) | |||
| if err == nil { | |||
| return | |||
| } else if err != upsidedown.IncompatibleVersion { | |||
| log.Fatal(4, "InitIssueIndexer, open index: %v", err) | |||
| } | |||
| log.Warn("Incompatible bleve version, deleting and recreating issue indexer") | |||
| if err = os.RemoveAll(setting.Indexer.IssuePath); err != nil { | |||
| log.Fatal(4, "InitIssueIndexer: remove index, %v", err) | |||
| } | |||
| } | |||
| if issueIndexer != nil { | |||
| return | |||
| } | |||
| if err = createIssueIndexer(); err != nil { | |||
| log.Fatal(4, "InitIssuesIndexer: create index, %v", err) | |||
| } | |||
| @@ -70,9 +73,13 @@ func createIssueIndexer() error { | |||
| mapping := bleve.NewIndexMapping() | |||
| docMapping := bleve.NewDocumentMapping() | |||
| docMapping.AddFieldMappingsAt("RepoID", bleve.NewNumericFieldMapping()) | |||
| numericFieldMapping := bleve.NewNumericFieldMapping() | |||
| numericFieldMapping.IncludeInAll = false | |||
| docMapping.AddFieldMappingsAt("RepoID", numericFieldMapping) | |||
| textFieldMapping := bleve.NewTextFieldMapping() | |||
| textFieldMapping.Store = false | |||
| textFieldMapping.IncludeInAll = false | |||
| docMapping.AddFieldMappingsAt("Title", textFieldMapping) | |||
| docMapping.AddFieldMappingsAt("Content", textFieldMapping) | |||
| docMapping.AddFieldMappingsAt("Comments", textFieldMapping) | |||
| @@ -89,7 +96,8 @@ func createIssueIndexer() error { | |||
| } | |||
| mapping.DefaultAnalyzer = issueIndexerAnalyzer | |||
| mapping.AddDocumentMapping("issues", docMapping) | |||
| mapping.AddDocumentMapping(issueIndexerDocType, docMapping) | |||
| mapping.AddDocumentMapping("_all", bleve.NewDocumentDisabledMapping()) | |||
| var err error | |||
| issueIndexer, err = bleve.New(setting.Indexer.IssuePath, mapping) | |||
| @@ -97,11 +105,8 @@ func createIssueIndexer() error { | |||
| } | |||
| // IssueIndexerBatch batch to add updates to | |||
| func IssueIndexerBatch() *Batch { | |||
| return &Batch{ | |||
| batch: issueIndexer.NewBatch(), | |||
| index: issueIndexer, | |||
| } | |||
| func IssueIndexerBatch() rupture.FlushingBatch { | |||
| return rupture.NewFlushingBatch(issueIndexer, maxBatchSize) | |||
| } | |||
| // SearchIssuesByKeyword searches for issues by given conditions. | |||
| @@ -5,7 +5,6 @@ | |||
| package indexer | |||
| import ( | |||
| "os" | |||
| "strings" | |||
| "code.gitea.io/gitea/modules/log" | |||
| @@ -15,10 +14,17 @@ import ( | |||
| "github.com/blevesearch/bleve/analysis/analyzer/custom" | |||
| "github.com/blevesearch/bleve/analysis/token/camelcase" | |||
| "github.com/blevesearch/bleve/analysis/token/lowercase" | |||
| "github.com/blevesearch/bleve/analysis/token/unique" | |||
| "github.com/blevesearch/bleve/analysis/tokenizer/unicode" | |||
| "github.com/ethantkoenig/rupture" | |||
| ) | |||
| const repoIndexerAnalyzer = "repoIndexerAnalyzer" | |||
| const ( | |||
| repoIndexerAnalyzer = "repoIndexerAnalyzer" | |||
| repoIndexerDocType = "repoIndexerDocType" | |||
| repoIndexerLatestVersion = 1 | |||
| ) | |||
| // repoIndexer (thread-safe) index for repository contents | |||
| var repoIndexer bleve.Index | |||
| @@ -40,6 +46,11 @@ type RepoIndexerData struct { | |||
| Content string | |||
| } | |||
| // Type returns the document type, for bleve's mapping.Classifier interface. | |||
| func (d *RepoIndexerData) Type() string { | |||
| return repoIndexerDocType | |||
| } | |||
| // RepoIndexerUpdate an update to the repo indexer | |||
| type RepoIndexerUpdate struct { | |||
| Filepath string | |||
| @@ -47,13 +58,14 @@ type RepoIndexerUpdate struct { | |||
| Data *RepoIndexerData | |||
| } | |||
| func (update RepoIndexerUpdate) addToBatch(batch *bleve.Batch) error { | |||
| // AddToFlushingBatch adds the update to the given flushing batch. | |||
| func (update RepoIndexerUpdate) AddToFlushingBatch(batch rupture.FlushingBatch) error { | |||
| id := filenameIndexerID(update.Data.RepoID, update.Filepath) | |||
| switch update.Op { | |||
| case RepoIndexerOpUpdate: | |||
| return batch.Index(id, update.Data) | |||
| case RepoIndexerOpDelete: | |||
| batch.Delete(id) | |||
| return batch.Delete(id) | |||
| default: | |||
| log.Error(4, "Unrecognized repo indexer op: %d", update.Op) | |||
| } | |||
| @@ -62,48 +74,50 @@ func (update RepoIndexerUpdate) addToBatch(batch *bleve.Batch) error { | |||
| // InitRepoIndexer initialize repo indexer | |||
| func InitRepoIndexer(populateIndexer func() error) { | |||
| _, err := os.Stat(setting.Indexer.RepoPath) | |||
| var err error | |||
| repoIndexer, err = openIndexer(setting.Indexer.RepoPath, repoIndexerLatestVersion) | |||
| if err != nil { | |||
| if os.IsNotExist(err) { | |||
| if err = createRepoIndexer(); err != nil { | |||
| log.Fatal(4, "CreateRepoIndexer: %v", err) | |||
| } | |||
| if err = populateIndexer(); err != nil { | |||
| log.Fatal(4, "PopulateRepoIndex: %v", err) | |||
| } | |||
| } else { | |||
| log.Fatal(4, "InitRepoIndexer: %v", err) | |||
| } | |||
| } else { | |||
| repoIndexer, err = bleve.Open(setting.Indexer.RepoPath) | |||
| if err != nil { | |||
| log.Fatal(4, "InitRepoIndexer, open index: %v", err) | |||
| } | |||
| log.Fatal(4, "InitRepoIndexer: %v", err) | |||
| } | |||
| if repoIndexer != nil { | |||
| return | |||
| } | |||
| if err = createRepoIndexer(); err != nil { | |||
| log.Fatal(4, "CreateRepoIndexer: %v", err) | |||
| } | |||
| if err = populateIndexer(); err != nil { | |||
| log.Fatal(4, "PopulateRepoIndex: %v", err) | |||
| } | |||
| } | |||
| // createRepoIndexer create a repo indexer if one does not already exist | |||
| func createRepoIndexer() error { | |||
| var err error | |||
| docMapping := bleve.NewDocumentMapping() | |||
| docMapping.AddFieldMappingsAt("RepoID", bleve.NewNumericFieldMapping()) | |||
| numericFieldMapping := bleve.NewNumericFieldMapping() | |||
| numericFieldMapping.IncludeInAll = false | |||
| docMapping.AddFieldMappingsAt("RepoID", numericFieldMapping) | |||
| textFieldMapping := bleve.NewTextFieldMapping() | |||
| textFieldMapping.IncludeInAll = false | |||
| docMapping.AddFieldMappingsAt("Content", textFieldMapping) | |||
| mapping := bleve.NewIndexMapping() | |||
| if err := addUnicodeNormalizeTokenFilter(mapping); err != nil { | |||
| if err = addUnicodeNormalizeTokenFilter(mapping); err != nil { | |||
| return err | |||
| } else if err := mapping.AddCustomAnalyzer(repoIndexerAnalyzer, map[string]interface{}{ | |||
| } else if err = mapping.AddCustomAnalyzer(repoIndexerAnalyzer, map[string]interface{}{ | |||
| "type": custom.Name, | |||
| "char_filters": []string{}, | |||
| "tokenizer": unicode.Name, | |||
| "token_filters": []string{unicodeNormalizeName, camelcase.Name, lowercase.Name}, | |||
| "token_filters": []string{unicodeNormalizeName, camelcase.Name, lowercase.Name, unique.Name}, | |||
| }); err != nil { | |||
| return err | |||
| } | |||
| mapping.DefaultAnalyzer = repoIndexerAnalyzer | |||
| mapping.AddDocumentMapping("repo", docMapping) | |||
| var err error | |||
| mapping.AddDocumentMapping(repoIndexerDocType, docMapping) | |||
| mapping.AddDocumentMapping("_all", bleve.NewDocumentDisabledMapping()) | |||
| repoIndexer, err = bleve.New(setting.Indexer.RepoPath, mapping) | |||
| return err | |||
| } | |||
| @@ -121,11 +135,8 @@ func filenameOfIndexerID(indexerID string) string { | |||
| } | |||
| // RepoIndexerBatch batch to add updates to | |||
| func RepoIndexerBatch() *Batch { | |||
| return &Batch{ | |||
| batch: repoIndexer.NewBatch(), | |||
| index: repoIndexer, | |||
| } | |||
| func RepoIndexerBatch() rupture.FlushingBatch { | |||
| return rupture.NewFlushingBatch(repoIndexer, maxBatchSize) | |||
| } | |||
| // DeleteRepoFromIndexer delete all of a repo's files from indexer | |||
| @@ -138,8 +149,7 @@ func DeleteRepoFromIndexer(repoID int64) error { | |||
| } | |||
| batch := RepoIndexerBatch() | |||
| for _, hit := range result.Hits { | |||
| batch.batch.Delete(hit.ID) | |||
| if err = batch.flushIfFull(); err != nil { | |||
| if err = batch.Delete(hit.ID); err != nil { | |||
| return err | |||
| } | |||
| } | |||
| @@ -0,0 +1,53 @@ | |||
| // Copyright (c) 2018 Couchbase, Inc. | |||
| // | |||
| // Licensed under the Apache License, Version 2.0 (the "License"); | |||
| // you may not use this file except in compliance with the License. | |||
| // You may obtain a copy of the License at | |||
| // | |||
| // http://www.apache.org/licenses/LICENSE-2.0 | |||
| // | |||
| // Unless required by applicable law or agreed to in writing, software | |||
| // distributed under the License is distributed on an "AS IS" BASIS, | |||
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||
| // See the License for the specific language governing permissions and | |||
| // limitations under the License. | |||
| package unique | |||
| import ( | |||
| "github.com/blevesearch/bleve/analysis" | |||
| "github.com/blevesearch/bleve/registry" | |||
| ) | |||
| const Name = "unique" | |||
| // UniqueTermFilter retains only the tokens which mark the first occurence of | |||
| // a term. Tokens whose term appears in a preceding token are dropped. | |||
| type UniqueTermFilter struct{} | |||
| func NewUniqueTermFilter() *UniqueTermFilter { | |||
| return &UniqueTermFilter{} | |||
| } | |||
| func (f *UniqueTermFilter) Filter(input analysis.TokenStream) analysis.TokenStream { | |||
| encounteredTerms := make(map[string]struct{}, len(input)/4) | |||
| j := 0 | |||
| for _, token := range input { | |||
| term := string(token.Term) | |||
| if _, ok := encounteredTerms[term]; ok { | |||
| continue | |||
| } | |||
| encounteredTerms[term] = struct{}{} | |||
| input[j] = token | |||
| j++ | |||
| } | |||
| return input[:j] | |||
| } | |||
| func UniqueTermFilterConstructor(config map[string]interface{}, cache *registry.Cache) (analysis.TokenFilter, error) { | |||
| return NewUniqueTermFilter(), nil | |||
| } | |||
| func init() { | |||
| registry.RegisterTokenFilter(Name, UniqueTermFilterConstructor) | |||
| } | |||
| @@ -0,0 +1,173 @@ | |||
| # This file is autogenerated, do not edit; changes may be undone by the next 'dep ensure'. | |||
| [[projects]] | |||
| name = "github.com/RoaringBitmap/roaring" | |||
| packages = ["."] | |||
| revision = "84551f0e309d6f9bafa428ef39b31ab7f16ff7b8" | |||
| version = "v0.4.1" | |||
| [[projects]] | |||
| branch = "master" | |||
| name = "github.com/Smerity/govarint" | |||
| packages = ["."] | |||
| revision = "7265e41f48f15fd61751e16da866af3c704bb3ab" | |||
| [[projects]] | |||
| name = "github.com/blevesearch/bleve" | |||
| packages = [ | |||
| ".", | |||
| "analysis", | |||
| "analysis/analyzer/standard", | |||
| "analysis/datetime/flexible", | |||
| "analysis/datetime/optional", | |||
| "analysis/lang/en", | |||
| "analysis/token/lowercase", | |||
| "analysis/token/porter", | |||
| "analysis/token/stop", | |||
| "analysis/tokenizer/unicode", | |||
| "document", | |||
| "geo", | |||
| "index", | |||
| "index/scorch", | |||
| "index/scorch/mergeplan", | |||
| "index/scorch/segment", | |||
| "index/scorch/segment/mem", | |||
| "index/scorch/segment/zap", | |||
| "index/store", | |||
| "index/store/boltdb", | |||
| "index/store/gtreap", | |||
| "index/upsidedown", | |||
| "mapping", | |||
| "numeric", | |||
| "registry", | |||
| "search", | |||
| "search/collector", | |||
| "search/facet", | |||
| "search/highlight", | |||
| "search/highlight/format/html", | |||
| "search/highlight/fragmenter/simple", | |||
| "search/highlight/highlighter/html", | |||
| "search/highlight/highlighter/simple", | |||
| "search/query", | |||
| "search/scorer", | |||
| "search/searcher" | |||
| ] | |||
| revision = "a3b125508b4443344b596888ca58467b6c9310b9" | |||
| [[projects]] | |||
| branch = "master" | |||
| name = "github.com/blevesearch/go-porterstemmer" | |||
| packages = ["."] | |||
| revision = "23a2c8e5cf1f380f27722c6d2ae8896431dc7d0e" | |||
| [[projects]] | |||
| branch = "master" | |||
| name = "github.com/blevesearch/segment" | |||
| packages = ["."] | |||
| revision = "762005e7a34fd909a84586299f1dd457371d36ee" | |||
| [[projects]] | |||
| branch = "master" | |||
| name = "github.com/boltdb/bolt" | |||
| packages = ["."] | |||
| revision = "9da31745363232bc1e27dbab3569e77383a51585" | |||
| [[projects]] | |||
| branch = "master" | |||
| name = "github.com/couchbase/vellum" | |||
| packages = [ | |||
| ".", | |||
| "regexp", | |||
| "utf8" | |||
| ] | |||
| revision = "ed84a675e24ed0a0bf6859b1ddec7e7c858354bd" | |||
| [[projects]] | |||
| name = "github.com/davecgh/go-spew" | |||
| packages = ["spew"] | |||
| revision = "346938d642f2ec3594ed81d874461961cd0faa76" | |||
| version = "v1.1.0" | |||
| [[projects]] | |||
| branch = "master" | |||
| name = "github.com/edsrzf/mmap-go" | |||
| packages = ["."] | |||
| revision = "0bce6a6887123b67a60366d2c9fe2dfb74289d2e" | |||
| [[projects]] | |||
| branch = "master" | |||
| name = "github.com/glycerine/go-unsnap-stream" | |||
| packages = ["."] | |||
| revision = "62a9a9eb44fd8932157b1a8ace2149eff5971af6" | |||
| [[projects]] | |||
| name = "github.com/golang/protobuf" | |||
| packages = ["proto"] | |||
| revision = "925541529c1fa6821df4e44ce2723319eb2be768" | |||
| version = "v1.0.0" | |||
| [[projects]] | |||
| branch = "master" | |||
| name = "github.com/golang/snappy" | |||
| packages = ["."] | |||
| revision = "553a641470496b2327abcac10b36396bd98e45c9" | |||
| [[projects]] | |||
| branch = "master" | |||
| name = "github.com/mschoch/smat" | |||
| packages = ["."] | |||
| revision = "90eadee771aeab36e8bf796039b8c261bebebe4f" | |||
| [[projects]] | |||
| name = "github.com/philhofer/fwd" | |||
| packages = ["."] | |||
| revision = "bb6d471dc95d4fe11e432687f8b70ff496cf3136" | |||
| version = "v1.0.0" | |||
| [[projects]] | |||
| name = "github.com/pmezard/go-difflib" | |||
| packages = ["difflib"] | |||
| revision = "792786c7400a136282c1664665ae0a8db921c6c2" | |||
| version = "v1.0.0" | |||
| [[projects]] | |||
| branch = "master" | |||
| name = "github.com/steveyen/gtreap" | |||
| packages = ["."] | |||
| revision = "0abe01ef9be25c4aedc174758ec2d917314d6d70" | |||
| [[projects]] | |||
| name = "github.com/stretchr/testify" | |||
| packages = ["assert"] | |||
| revision = "12b6f73e6084dad08a7c6e575284b177ecafbc71" | |||
| version = "v1.2.1" | |||
| [[projects]] | |||
| branch = "master" | |||
| name = "github.com/tinylib/msgp" | |||
| packages = ["msgp"] | |||
| revision = "03a79185462ad029a6e7e05b2f3f3e0498d0a6c0" | |||
| [[projects]] | |||
| branch = "master" | |||
| name = "github.com/willf/bitset" | |||
| packages = ["."] | |||
| revision = "1a37ad96e8c1a11b20900a232874843b5174221f" | |||
| [[projects]] | |||
| name = "golang.org/x/net" | |||
| packages = ["context"] | |||
| revision = "309822c5b9b9f80db67f016069a12628d94fad34" | |||
| [[projects]] | |||
| name = "golang.org/x/sys" | |||
| packages = ["unix"] | |||
| revision = "3dbebcf8efb6a5011a60c2b4591c1022a759af8a" | |||
| [solve-meta] | |||
| analyzer-name = "dep" | |||
| analyzer-version = 1 | |||
| inputs-digest = "61c759f0c1136cadf86ae8a30bb78edf33fc844cdcb2316469b4ae14a8d051b0" | |||
| solver-name = "gps-cdcl" | |||
| solver-version = 1 | |||
| @@ -0,0 +1,34 @@ | |||
| # Gopkg.toml example | |||
| # | |||
| # Refer to https://github.com/golang/dep/blob/master/docs/Gopkg.toml.md | |||
| # for detailed Gopkg.toml documentation. | |||
| # | |||
| # required = ["github.com/user/thing/cmd/thing"] | |||
| # ignored = ["github.com/user/project/pkgX", "bitbucket.org/user/project/pkgA/pkgY"] | |||
| # | |||
| # [[constraint]] | |||
| # name = "github.com/user/project" | |||
| # version = "1.0.0" | |||
| # | |||
| # [[constraint]] | |||
| # name = "github.com/user/project2" | |||
| # branch = "dev" | |||
| # source = "github.com/myfork/project2" | |||
| # | |||
| # [[override]] | |||
| # name = "github.com/x/y" | |||
| # version = "2.4.0" | |||
| # | |||
| # [prune] | |||
| # non-go = false | |||
| # go-tests = true | |||
| # unused-packages = true | |||
| [[constraint]] | |||
| name = "github.com/stretchr/testify" | |||
| version = "1.2.1" | |||
| [prune] | |||
| go-tests = true | |||
| unused-packages = true | |||
| @@ -0,0 +1,21 @@ | |||
| MIT License | |||
| Copyright (c) 2018 Ethan Koenig | |||
| Permission is hereby granted, free of charge, to any person obtaining a copy | |||
| of this software and associated documentation files (the "Software"), to deal | |||
| in the Software without restriction, including without limitation the rights | |||
| to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | |||
| copies of the Software, and to permit persons to whom the Software is | |||
| furnished to do so, subject to the following conditions: | |||
| The above copyright notice and this permission notice shall be included in all | |||
| copies or substantial portions of the Software. | |||
| THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |||
| IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |||
| FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | |||
| AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | |||
| LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |||
| OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | |||
| SOFTWARE. | |||
| @@ -0,0 +1,13 @@ | |||
| # rupture | |||
| [](https://travis-ci.org/ethantkoenig/rupture) [](https://godoc.org/github.com/ethantkoenig/rupture) [](https://goreportcard.com/report/blevesearch/bleve) | |||
| An explosive companion to the [bleve indexing library](https://www.github.com/blevesearch/bleve) | |||
| ## Features | |||
| `rupture` includes the following additions to `bleve`: | |||
| - __Flushing batches__: Batches of operation which automatically flush to the underlying bleve index. | |||
| - __Sharded indices__: An index-like abstraction built on top of several underlying indices. Sharded indices provide lower write latencies for indices with large amounts of data. | |||
| - __Index metadata__: Track index version for easily managing migrations and schema changes. | |||
| @@ -0,0 +1,67 @@ | |||
| package rupture | |||
| import ( | |||
| "github.com/blevesearch/bleve" | |||
| ) | |||
| // FlushingBatch is a batch of operations that automatically flushes to the | |||
| // underlying index once it reaches a certain size. | |||
| type FlushingBatch interface { | |||
| // Index adds the specified index operation batch, possibly triggering a | |||
| // flush. | |||
| Index(id string, data interface{}) error | |||
| // Remove adds the specified delete operation to the batch, possibly | |||
| // triggering a flush. | |||
| Delete(id string) error | |||
| // Flush flushes the batch's contents. | |||
| Flush() error | |||
| } | |||
| type singleIndexFlushingBatch struct { | |||
| maxBatchSize int | |||
| batch *bleve.Batch | |||
| index bleve.Index | |||
| } | |||
| func newFlushingBatch(index bleve.Index, maxBatchSize int) *singleIndexFlushingBatch { | |||
| return &singleIndexFlushingBatch{ | |||
| maxBatchSize: maxBatchSize, | |||
| batch: index.NewBatch(), | |||
| index: index, | |||
| } | |||
| } | |||
| // NewFlushingBatch creates a new flushing batch for the specified index. Once | |||
| // the number of operations in the batch reaches the specified limit, the batch | |||
| // automatically flushes its operations to the index. | |||
| func NewFlushingBatch(index bleve.Index, maxBatchSize int) FlushingBatch { | |||
| return newFlushingBatch(index, maxBatchSize) | |||
| } | |||
| func (b *singleIndexFlushingBatch) Index(id string, data interface{}) error { | |||
| if err := b.batch.Index(id, data); err != nil { | |||
| return err | |||
| } | |||
| return b.flushIfFull() | |||
| } | |||
| func (b *singleIndexFlushingBatch) Delete(id string) error { | |||
| b.batch.Delete(id) | |||
| return b.flushIfFull() | |||
| } | |||
| func (b *singleIndexFlushingBatch) flushIfFull() error { | |||
| if b.batch.Size() < b.maxBatchSize { | |||
| return nil | |||
| } | |||
| return b.Flush() | |||
| } | |||
| func (b *singleIndexFlushingBatch) Flush() error { | |||
| err := b.index.Batch(b.batch) | |||
| if err != nil { | |||
| return err | |||
| } | |||
| b.batch.Reset() | |||
| return nil | |||
| } | |||
| @@ -0,0 +1,68 @@ | |||
| package rupture | |||
| import ( | |||
| "encoding/json" | |||
| "io/ioutil" | |||
| "os" | |||
| "path/filepath" | |||
| ) | |||
| const metaFilename = "rupture_meta.json" | |||
| func indexMetadataPath(dir string) string { | |||
| return filepath.Join(dir, metaFilename) | |||
| } | |||
| // IndexMetadata contains metadata about a bleve index. | |||
| type IndexMetadata struct { | |||
| // The version of the data in the index. This can be useful for tracking | |||
| // schema changes or data migrations. | |||
| Version int `json:"version"` | |||
| } | |||
| // in addition to the user-exposed metadata, we keep additional, internal-only | |||
| // metadata for sharded indices. | |||
| const shardedMetadataFilename = "rupture_sharded_meta.json" | |||
| func shardedIndexMetadataPath(dir string) string { | |||
| return filepath.Join(dir, shardedMetadataFilename) | |||
| } | |||
| type shardedIndexMetadata struct { | |||
| NumShards int `json:"num_shards"` | |||
| } | |||
| func readJSON(path string, meta interface{}) error { | |||
| metaBytes, err := ioutil.ReadFile(path) | |||
| if err != nil { | |||
| return err | |||
| } | |||
| return json.Unmarshal(metaBytes, meta) | |||
| } | |||
| func writeJSON(path string, meta interface{}) error { | |||
| metaBytes, err := json.Marshal(meta) | |||
| if err != nil { | |||
| return err | |||
| } | |||
| return ioutil.WriteFile(path, metaBytes, 0666) | |||
| } | |||
| // ReadIndexMetadata returns the metadata for the index at the specified path. | |||
| // If no such index metadata exists, an empty metadata and a nil error are | |||
| // returned. | |||
| func ReadIndexMetadata(path string) (*IndexMetadata, error) { | |||
| meta := &IndexMetadata{} | |||
| metaPath := indexMetadataPath(path) | |||
| if _, err := os.Stat(metaPath); os.IsNotExist(err) { | |||
| return meta, nil | |||
| } else if err != nil { | |||
| return nil, err | |||
| } | |||
| return meta, readJSON(metaPath, meta) | |||
| } | |||
| // WriteIndexMetadata writes metadata for the index at the specified path. | |||
| func WriteIndexMetadata(path string, meta *IndexMetadata) error { | |||
| return writeJSON(indexMetadataPath(path), meta) | |||
| } | |||
| @@ -0,0 +1,146 @@ | |||
| package rupture | |||
| import ( | |||
| "fmt" | |||
| "hash/fnv" | |||
| "path/filepath" | |||
| "strconv" | |||
| "github.com/blevesearch/bleve" | |||
| "github.com/blevesearch/bleve/document" | |||
| "github.com/blevesearch/bleve/mapping" | |||
| ) | |||
| // ShardedIndex an index that is built onto of multiple underlying bleve | |||
| // indices (i.e. shards). Similar to bleve's index aliases, some methods may | |||
| // not be supported. | |||
| type ShardedIndex interface { | |||
| bleve.Index | |||
| shards() []bleve.Index | |||
| } | |||
| // a type alias for bleve.Index, so that the anonymous field of | |||
| // shardedIndex does not conflict with the Index(..) method. | |||
| type bleveIndex bleve.Index | |||
| type shardedIndex struct { | |||
| bleveIndex | |||
| indices []bleve.Index | |||
| } | |||
| func hash(id string, n int) uint64 { | |||
| fnvHash := fnv.New64() | |||
| fnvHash.Write([]byte(id)) | |||
| return fnvHash.Sum64() % uint64(n) | |||
| } | |||
| func childIndexerPath(rootPath string, i int) string { | |||
| return filepath.Join(rootPath, strconv.Itoa(i)) | |||
| } | |||
| // NewShardedIndex creates a sharded index at the specified path, with the | |||
| // specified mapping and number of shards. | |||
| func NewShardedIndex(path string, mapping mapping.IndexMapping, numShards int) (ShardedIndex, error) { | |||
| if numShards <= 0 { | |||
| return nil, fmt.Errorf("Invalid number of shards: %d", numShards) | |||
| } | |||
| err := writeJSON(shardedIndexMetadataPath(path), &shardedIndexMetadata{NumShards: numShards}) | |||
| if err != nil { | |||
| return nil, err | |||
| } | |||
| s := &shardedIndex{ | |||
| indices: make([]bleve.Index, numShards), | |||
| } | |||
| for i := 0; i < numShards; i++ { | |||
| s.indices[i], err = bleve.New(childIndexerPath(path, i), mapping) | |||
| if err != nil { | |||
| return nil, err | |||
| } | |||
| } | |||
| s.bleveIndex = bleve.NewIndexAlias(s.indices...) | |||
| return s, nil | |||
| } | |||
| // OpenShardedIndex opens a sharded index at the specified path. | |||
| func OpenShardedIndex(path string) (ShardedIndex, error) { | |||
| var meta shardedIndexMetadata | |||
| var err error | |||
| if err = readJSON(shardedIndexMetadataPath(path), &meta); err != nil { | |||
| return nil, err | |||
| } | |||
| s := &shardedIndex{ | |||
| indices: make([]bleve.Index, meta.NumShards), | |||
| } | |||
| for i := 0; i < meta.NumShards; i++ { | |||
| s.indices[i], err = bleve.Open(childIndexerPath(path, i)) | |||
| if err != nil { | |||
| return nil, err | |||
| } | |||
| } | |||
| s.bleveIndex = bleve.NewIndexAlias(s.indices...) | |||
| return s, nil | |||
| } | |||
| func (s *shardedIndex) Index(id string, data interface{}) error { | |||
| return s.indices[hash(id, len(s.indices))].Index(id, data) | |||
| } | |||
| func (s *shardedIndex) Delete(id string) error { | |||
| return s.indices[hash(id, len(s.indices))].Delete(id) | |||
| } | |||
| func (s *shardedIndex) Document(id string) (*document.Document, error) { | |||
| return s.indices[hash(id, len(s.indices))].Document(id) | |||
| } | |||
| func (s *shardedIndex) Close() error { | |||
| if err := s.bleveIndex.Close(); err != nil { | |||
| return err | |||
| } | |||
| for _, index := range s.indices { | |||
| if err := index.Close(); err != nil { | |||
| return err | |||
| } | |||
| } | |||
| return nil | |||
| } | |||
| func (s *shardedIndex) shards() []bleve.Index { | |||
| return s.indices | |||
| } | |||
| type shardedIndexFlushingBatch struct { | |||
| batches []*singleIndexFlushingBatch | |||
| } | |||
| // NewShardedFlushingBatch creates a flushing batch with the specified batch | |||
| // size for the specified sharded index. | |||
| func NewShardedFlushingBatch(index ShardedIndex, maxBatchSize int) FlushingBatch { | |||
| indices := index.shards() | |||
| b := &shardedIndexFlushingBatch{ | |||
| batches: make([]*singleIndexFlushingBatch, len(indices)), | |||
| } | |||
| for i, index := range indices { | |||
| b.batches[i] = newFlushingBatch(index, maxBatchSize) | |||
| } | |||
| return b | |||
| } | |||
| func (b *shardedIndexFlushingBatch) Index(id string, data interface{}) error { | |||
| return b.batches[hash(id, len(b.batches))].Index(id, data) | |||
| } | |||
| func (b *shardedIndexFlushingBatch) Delete(id string) error { | |||
| return b.batches[hash(id, len(b.batches))].Delete(id) | |||
| } | |||
| func (b *shardedIndexFlushingBatch) Flush() error { | |||
| for _, batch := range b.batches { | |||
| if err := batch.Flush(); err != nil { | |||
| return err | |||
| } | |||
| } | |||
| return nil | |||
| } | |||
| @@ -128,6 +128,12 @@ | |||
| "revision": "174f8ed44a0bf65e7c8fb228b60b58de62654cd2", | |||
| "revisionTime": "2017-06-28T17:18:15Z" | |||
| }, | |||
| { | |||
| "checksumSHA1": "unacAFTLwgpg7wyI/mYf7Zd9eaU=", | |||
| "path": "github.com/blevesearch/bleve/analysis/token/unique", | |||
| "revision": "ff210fbc6d348ad67aa5754eaea11a463fcddafd", | |||
| "revisionTime": "2018-02-01T18:20:06Z" | |||
| }, | |||
| { | |||
| "checksumSHA1": "q7C04nlJLxKmemXLop0oyJhfi5M=", | |||
| "path": "github.com/blevesearch/bleve/analysis/tokenizer/unicode", | |||
| @@ -347,6 +353,12 @@ | |||
| "revision": "57eb5e1fc594ad4b0b1dbea7b286d299e0cb43c2", | |||
| "revisionTime": "2015-12-24T04:54:52Z" | |||
| }, | |||
| { | |||
| "checksumSHA1": "06ofBxeJ9c4LS2p31PCMIj7IjJU=", | |||
| "path": "github.com/ethantkoenig/rupture", | |||
| "revision": "0a76f03a811abcca2e6357329b673e9bb8ef9643", | |||
| "revisionTime": "2018-02-03T18:25:44Z" | |||
| }, | |||
| { | |||
| "checksumSHA1": "imR2wF388/0fBU6RRWx8RvTi8Q8=", | |||
| "path": "github.com/facebookgo/clock", | |||