Fix a race with sync server integration tests (#95)
parent
42564e8ed6
commit
e226d564ec
|
@ -22,6 +22,7 @@ import (
|
||||||
"os/exec"
|
"os/exec"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/common/test"
|
"github.com/matrix-org/dendrite/common/test"
|
||||||
|
@ -60,6 +61,23 @@ var exe = test.KafkaExecutor{
|
||||||
OutputWriter: os.Stderr,
|
OutputWriter: os.Stderr,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
lastRequestMutex sync.Mutex
|
||||||
|
lastRequestErr error
|
||||||
|
)
|
||||||
|
|
||||||
|
func setLastRequestError(err error) {
|
||||||
|
lastRequestMutex.Lock()
|
||||||
|
defer lastRequestMutex.Unlock()
|
||||||
|
lastRequestErr = err
|
||||||
|
}
|
||||||
|
|
||||||
|
func getLastRequestError() error {
|
||||||
|
lastRequestMutex.Lock()
|
||||||
|
defer lastRequestMutex.Unlock()
|
||||||
|
return lastRequestErr
|
||||||
|
}
|
||||||
|
|
||||||
var syncServerConfigFileContents = (`consumer_uris: ["` + kafkaURI + `"]
|
var syncServerConfigFileContents = (`consumer_uris: ["` + kafkaURI + `"]
|
||||||
roomserver_topic: "` + inputTopic + `"
|
roomserver_topic: "` + inputTopic + `"
|
||||||
database: "` + testDatabase + `"
|
database: "` + testDatabase + `"
|
||||||
|
@ -107,53 +125,56 @@ func canonicalJSONInput(jsonData []string) []string {
|
||||||
return jsonData
|
return jsonData
|
||||||
}
|
}
|
||||||
|
|
||||||
func doSyncRequest(done chan error, want []string, since string) func() {
|
// doSyncRequest does a /sync request and returns an error if it fails or doesn't
|
||||||
return func() {
|
// return the wanted string.
|
||||||
cli := &http.Client{
|
func doSyncRequest(syncServerURL, want string) error {
|
||||||
Timeout: 5 * time.Second,
|
cli := &http.Client{
|
||||||
}
|
Timeout: 5 * time.Second,
|
||||||
res, err := cli.Get("http://" + syncserverAddr + "/api/_matrix/client/r0/sync?access_token=@alice:localhost&since=" + since)
|
}
|
||||||
|
res, err := cli.Get(syncServerURL)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if res.StatusCode != 200 {
|
||||||
|
return fmt.Errorf("/sync returned HTTP status %d", res.StatusCode)
|
||||||
|
}
|
||||||
|
defer res.Body.Close()
|
||||||
|
resBytes, err := ioutil.ReadAll(res.Body)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
jsonBytes, err := gomatrixserverlib.CanonicalJSON(resBytes)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if string(jsonBytes) != want {
|
||||||
|
return fmt.Errorf("/sync returned wrong bytes. Expected:\n%s\n\nGot:\n%s", want, string(jsonBytes))
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// syncRequestUntilSuccess blocks and performs the same /sync request over and over until
|
||||||
|
// the response returns the wanted string, where it will close the given channel and return.
|
||||||
|
// It will keep track of the last error in `lastRequestErr`.
|
||||||
|
func syncRequestUntilSuccess(done chan error, want string, since string) {
|
||||||
|
for {
|
||||||
|
err := doSyncRequest(
|
||||||
|
"http://"+syncserverAddr+"/api/_matrix/client/r0/sync?access_token=@alice:localhost&since="+since,
|
||||||
|
want,
|
||||||
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
done <- err
|
setLastRequestError(err)
|
||||||
return
|
time.Sleep(1 * time.Second) // don't tightloop
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
if res.StatusCode != 200 {
|
|
||||||
done <- fmt.Errorf("/sync returned HTTP status %d", res.StatusCode)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
defer res.Body.Close()
|
|
||||||
resBytes, err := ioutil.ReadAll(res.Body)
|
|
||||||
if err != nil {
|
|
||||||
done <- err
|
|
||||||
return
|
|
||||||
}
|
|
||||||
jsonBytes, err := gomatrixserverlib.CanonicalJSON(resBytes)
|
|
||||||
if err != nil {
|
|
||||||
done <- err
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if string(jsonBytes) != want[0] {
|
|
||||||
fmt.Println("Expected:")
|
|
||||||
fmt.Println(want[0])
|
|
||||||
fmt.Println("Got:")
|
|
||||||
fmt.Println(string(jsonBytes))
|
|
||||||
done <- fmt.Errorf("/sync returned wrong bytes")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
// all good, clean up
|
|
||||||
close(done)
|
close(done)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func testSyncServer(input, want []string, since string) {
|
// prepareSyncServer creates the database and config file needed for the sync server to run.
|
||||||
exe.DeleteTopic(inputTopic)
|
// It also prepares the CLI command to execute.
|
||||||
if err := exe.CreateTopic(inputTopic); err != nil {
|
func prepareSyncServer() *exec.Cmd {
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
if err := exe.WriteToTopic(inputTopic, canonicalJSONInput(input)); err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := createDatabase(testDatabaseName); err != nil {
|
if err := createDatabase(testDatabaseName); err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
@ -171,36 +192,58 @@ func testSyncServer(input, want []string, since string) {
|
||||||
)
|
)
|
||||||
cmd.Stderr = os.Stderr
|
cmd.Stderr = os.Stderr
|
||||||
cmd.Stdout = os.Stderr
|
cmd.Stdout = os.Stderr
|
||||||
|
return cmd
|
||||||
|
}
|
||||||
|
|
||||||
|
func testSyncServer(input, want []string, since string) {
|
||||||
|
// Write the logs to kafka so the sync server has some data to work with.
|
||||||
|
exe.DeleteTopic(inputTopic)
|
||||||
|
if err := exe.CreateTopic(inputTopic); err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
if err := exe.WriteToTopic(inputTopic, canonicalJSONInput(input)); err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
cmd := prepareSyncServer()
|
||||||
if err := cmd.Start(); err != nil {
|
if err := cmd.Start(); err != nil {
|
||||||
panic("failed to start sync server: " + err.Error())
|
panic("failed to start sync server: " + err.Error())
|
||||||
}
|
}
|
||||||
done := make(chan error, 1)
|
done := make(chan error, 1)
|
||||||
|
|
||||||
// TODO: Waiting 1s is racey. Maybe keep hitting it until it doesn't get Connection Refused?
|
// We need to wait for the sync server to:
|
||||||
time.AfterFunc(1*time.Second, doSyncRequest(done, want, since))
|
// - have created the tables
|
||||||
|
// - be listening on the given port
|
||||||
|
// - have consumed the kafka logs
|
||||||
|
// before we begin hitting it with /sync requests. We don't get told when it has done
|
||||||
|
// all these things, so we just continually hit /sync until it returns the right bytes.
|
||||||
|
// We can't even wait for the first valid 200 OK response because it's possible to race
|
||||||
|
// with consuming the kafka logs (so the /sync response will be missing events and
|
||||||
|
// therefore fail the test).
|
||||||
|
go syncRequestUntilSuccess(done, want[0], since)
|
||||||
|
|
||||||
// wait for it to die or timeout
|
// wait for the sync server to exit or our test timeout to expire
|
||||||
go func() {
|
go func() {
|
||||||
cmdErr := cmd.Wait()
|
done <- cmd.Wait()
|
||||||
if cmdErr != nil {
|
|
||||||
exitErr, ok := cmdErr.(*exec.ExitError)
|
|
||||||
if ok {
|
|
||||||
fmt.Println("\nSYNC SERVER ERROR: (", exitErr, ")")
|
|
||||||
fmt.Println("sync server failed to run. If failing with 'pq: password authentication failed for user' try:")
|
|
||||||
fmt.Println(" export PGHOST=/var/run/postgresql\n")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
done <- cmdErr
|
|
||||||
}()
|
}()
|
||||||
select {
|
select {
|
||||||
case <-time.After(timeout):
|
case <-time.After(timeout):
|
||||||
|
if reqErr := getLastRequestError(); reqErr != nil {
|
||||||
|
fmt.Println("Last /sync request error:")
|
||||||
|
fmt.Println(reqErr)
|
||||||
|
}
|
||||||
|
|
||||||
if err := cmd.Process.Kill(); err != nil {
|
if err := cmd.Process.Kill(); err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
panic("dendrite-sync-api-server timed out")
|
panic("dendrite-sync-api-server timed out")
|
||||||
case err := <-done:
|
case err, open := <-done:
|
||||||
cmd.Process.Kill() // ensure server is dead, only cleaning up so don't care about errors this returns.
|
cmd.Process.Kill() // ensure server is dead, only cleaning up so don't care about errors this returns.
|
||||||
if err != nil {
|
if open { // channel is closed on success
|
||||||
|
fmt.Println("=============================================================================================")
|
||||||
|
fmt.Println("sync server failed to run. If failing with 'pq: password authentication failed for user' try:")
|
||||||
|
fmt.Println(" export PGHOST=/var/run/postgresql\n")
|
||||||
|
fmt.Println("=============================================================================================")
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue