Change UpdateRepoIndex api to include watchers (#7012)

* Change UpdateRepoIndex api to include watchers

* Add timeout
release/v1.15
zeripath 2019-05-23 17:00:07 +01:00 committed by GitHub
parent 6eb53ac570
commit 54bd63cd5c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 20 additions and 24 deletions

View File

@ -5,7 +5,6 @@
package integrations package integrations
import ( import (
"log"
"net/http" "net/http"
"testing" "testing"
"time" "time"
@ -34,22 +33,14 @@ func TestSearchRepo(t *testing.T) {
repo, err := models.GetRepositoryByOwnerAndName("user2", "repo1") repo, err := models.GetRepositoryByOwnerAndName("user2", "repo1")
assert.NoError(t, err) assert.NoError(t, err)
models.UpdateRepoIndexer(repo) waiter := make(chan error, 1)
models.UpdateRepoIndexer(repo, waiter)
log.Printf("Waiting for indexing\n") select {
case err := <-waiter:
i := 0 assert.NoError(t, err)
for i < 60 { case <-time.After(1 * time.Minute):
if repo.IndexerStatus != nil && len(repo.IndexerStatus.CommitSha) != 0 { assert.Fail(t, "UpdateRepoIndexer took too long")
break
}
time.Sleep(1 * time.Second)
i++
}
if i < 60 {
log.Printf("Indexing took: %ds\n", i)
} else {
log.Printf("Waited the limit: %ds for indexing, continuing\n", i)
} }
req := NewRequestf(t, "GET", "/user2/repo1/search?q=Description&page=1") req := NewRequestf(t, "GET", "/user2/repo1/search?q=Description&page=1")

View File

@ -59,6 +59,7 @@ func (repo *Repository) updateIndexerStatus(sha string) error {
type repoIndexerOperation struct { type repoIndexerOperation struct {
repo *Repository repo *Repository
deleted bool deleted bool
watchers []chan<- error
} }
var repoIndexerOperationQueue chan repoIndexerOperation var repoIndexerOperationQueue chan repoIndexerOperation
@ -312,26 +313,30 @@ func nonGenesisChanges(repo *Repository, revision string) (*repoChanges, error)
func processRepoIndexerOperationQueue() { func processRepoIndexerOperationQueue() {
for { for {
op := <-repoIndexerOperationQueue op := <-repoIndexerOperationQueue
var err error
if op.deleted { if op.deleted {
if err := indexer.DeleteRepoFromIndexer(op.repo.ID); err != nil { if err = indexer.DeleteRepoFromIndexer(op.repo.ID); err != nil {
log.Error("DeleteRepoFromIndexer: %v", err) log.Error("DeleteRepoFromIndexer: %v", err)
} }
} else { } else {
if err := updateRepoIndexer(op.repo); err != nil { if err = updateRepoIndexer(op.repo); err != nil {
log.Error("updateRepoIndexer: %v", err) log.Error("updateRepoIndexer: %v", err)
} }
} }
for _, watcher := range op.watchers {
watcher <- err
}
} }
} }
// DeleteRepoFromIndexer remove all of a repository's entries from the indexer // DeleteRepoFromIndexer remove all of a repository's entries from the indexer
func DeleteRepoFromIndexer(repo *Repository) { func DeleteRepoFromIndexer(repo *Repository, watchers ...chan<- error) {
addOperationToQueue(repoIndexerOperation{repo: repo, deleted: true}) addOperationToQueue(repoIndexerOperation{repo: repo, deleted: true, watchers: watchers})
} }
// UpdateRepoIndexer update a repository's entries in the indexer // UpdateRepoIndexer update a repository's entries in the indexer
func UpdateRepoIndexer(repo *Repository) { func UpdateRepoIndexer(repo *Repository, watchers ...chan<- error) {
addOperationToQueue(repoIndexerOperation{repo: repo, deleted: false}) addOperationToQueue(repoIndexerOperation{repo: repo, deleted: false, watchers: watchers})
} }
func addOperationToQueue(op repoIndexerOperation) { func addOperationToQueue(op repoIndexerOperation) {