Implement Typing server (#567)

* update gomatrixserverlib

* Make removeUser public

* Implement api.TypingServerInputAPI

* Integrate the typing server component, create kafka topic

* Add typing server cmd for multiprocess dendrite
main
Anant Prakash 2018-08-02 22:52:44 +05:30 committed by GitHub
parent 9cdd3a66e4
commit 1165b49da7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 236 additions and 28 deletions

View File

@ -86,6 +86,7 @@ kafka:
topics: topics:
output_room_event: roomserverOutput output_room_event: roomserverOutput
output_client_data: clientapiOutput output_client_data: clientapiOutput
output_typing_event: typingServerOutput
user_updates: userUpdates user_updates: userUpdates
# The postgres connection configs for connecting to the databases e.g a postgres:// URI # The postgres connection configs for connecting to the databases e.g a postgres:// URI

View File

@ -18,20 +18,20 @@ import (
"flag" "flag"
"net/http" "net/http"
"github.com/matrix-org/dendrite/common/keydb"
"github.com/matrix-org/dendrite/common/transactions"
"github.com/matrix-org/dendrite/typingserver"
"github.com/matrix-org/dendrite/appservice" "github.com/matrix-org/dendrite/appservice"
"github.com/matrix-org/dendrite/clientapi" "github.com/matrix-org/dendrite/clientapi"
"github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/common/basecomponent" "github.com/matrix-org/dendrite/common/basecomponent"
"github.com/matrix-org/dendrite/common/keydb"
"github.com/matrix-org/dendrite/common/transactions"
"github.com/matrix-org/dendrite/federationapi" "github.com/matrix-org/dendrite/federationapi"
"github.com/matrix-org/dendrite/federationsender" "github.com/matrix-org/dendrite/federationsender"
"github.com/matrix-org/dendrite/mediaapi" "github.com/matrix-org/dendrite/mediaapi"
"github.com/matrix-org/dendrite/publicroomsapi" "github.com/matrix-org/dendrite/publicroomsapi"
"github.com/matrix-org/dendrite/roomserver" "github.com/matrix-org/dendrite/roomserver"
"github.com/matrix-org/dendrite/syncapi" "github.com/matrix-org/dendrite/syncapi"
"github.com/matrix-org/dendrite/typingserver"
"github.com/matrix-org/dendrite/typingserver/cache"
"github.com/prometheus/client_golang/prometheus/promhttp" "github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
@ -56,7 +56,7 @@ func main() {
keyRing := keydb.CreateKeyRing(federation.Client, keyDB) keyRing := keydb.CreateKeyRing(federation.Client, keyDB)
alias, input, query := roomserver.SetupRoomServerComponent(base) alias, input, query := roomserver.SetupRoomServerComponent(base)
typingInputAPI := typingserver.SetupTypingServerComponent(base) typingInputAPI := typingserver.SetupTypingServerComponent(base, cache.NewTypingCache())
clientapi.SetupClientAPIComponent( clientapi.SetupClientAPIComponent(
base, deviceDB, accountDB, base, deviceDB, accountDB,

View File

@ -0,0 +1,36 @@
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
_ "net/http/pprof"
"github.com/matrix-org/dendrite/common/basecomponent"
"github.com/matrix-org/dendrite/typingserver"
"github.com/matrix-org/dendrite/typingserver/cache"
"github.com/sirupsen/logrus"
)
func main() {
cfg := basecomponent.ParseFlags()
base := basecomponent.NewBaseDendrite(cfg, "TypingServerAPI")
defer func() {
if err := base.Close(); err != nil {
logrus.WithError(err).Warn("BaseDendrite close failed")
}
}()
typingserver.SetupTypingServerComponent(base, cache.NewTypingCache())
base.SetupAndServeHTTP(string(base.Cfg.Listen.TypingServer))
}

View File

@ -134,6 +134,8 @@ type Dendrite struct {
OutputRoomEvent Topic `yaml:"output_room_event"` OutputRoomEvent Topic `yaml:"output_room_event"`
// Topic for sending account data from client API to sync API // Topic for sending account data from client API to sync API
OutputClientData Topic `yaml:"output_client_data"` OutputClientData Topic `yaml:"output_client_data"`
// Topic for typingserver/api.OutputTypingEvent events.
OutputTypingEvent Topic `yaml:"output_typing_event"`
// Topic for user updates (profile, presence) // Topic for user updates (profile, presence)
UserUpdates Topic `yaml:"user_updates"` UserUpdates Topic `yaml:"user_updates"`
} }
@ -527,6 +529,7 @@ func (config *Dendrite) checkKafka(configErrs *configErrors, monolithic bool) {
} }
checkNotEmpty(configErrs, "kafka.topics.output_room_event", string(config.Kafka.Topics.OutputRoomEvent)) checkNotEmpty(configErrs, "kafka.topics.output_room_event", string(config.Kafka.Topics.OutputRoomEvent))
checkNotEmpty(configErrs, "kafka.topics.output_client_data", string(config.Kafka.Topics.OutputClientData)) checkNotEmpty(configErrs, "kafka.topics.output_client_data", string(config.Kafka.Topics.OutputClientData))
checkNotEmpty(configErrs, "kafka.topics.output_typing_event", string(config.Kafka.Topics.OutputTypingEvent))
checkNotEmpty(configErrs, "kafka.topics.user_updates", string(config.Kafka.Topics.UserUpdates)) checkNotEmpty(configErrs, "kafka.topics.user_updates", string(config.Kafka.Topics.UserUpdates))
} }

View File

@ -45,6 +45,7 @@ kafka:
topics: topics:
output_room_event: output.room output_room_event: output.room
output_client_data: output.client output_client_data: output.client
output_typing_event: output.typing
user_updates: output.user user_updates: output.user
database: database:
media_api: "postgresql:///media_api" media_api: "postgresql:///media_api"

View File

@ -83,6 +83,7 @@ func MakeConfig(configDir, kafkaURI, database, host string, startPort int) (*con
// Make this configurable somehow? // Make this configurable somehow?
cfg.Kafka.Topics.OutputRoomEvent = "test.room.output" cfg.Kafka.Topics.OutputRoomEvent = "test.room.output"
cfg.Kafka.Topics.OutputClientData = "test.clientapi.output" cfg.Kafka.Topics.OutputClientData = "test.clientapi.output"
cfg.Kafka.Topics.OutputTypingEvent = "test.typing.output"
cfg.Kafka.Topics.UserUpdates = "test.user.output" cfg.Kafka.Topics.UserUpdates = "test.user.output"
// TODO: Use different databases for the different schemas. // TODO: Use different databases for the different schemas.

View File

@ -0,0 +1,31 @@
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package api
// OutputTypingEvent is an entry in typing server output kafka log.
type OutputTypingEvent struct {
// The Event for the typing edu event.
Event TypingEvent `json:"event"`
}
// TypingEvent represents a matrix edu event of type 'm.typing'.
type TypingEvent struct {
Type string `json:"type"`
RoomID string `json:"room_id"`
Content TypingEventContent `json:"content"`
}
// TypingEventContent for TypingEvent
type TypingEventContent struct {
UserIDs []string `json:"user_ids"`
}

View File

@ -85,12 +85,12 @@ func (t *TypingCache) addUser(userID, roomID string, expiryTimer *time.Timer) {
// This removes the user. // This removes the user.
func (t *TypingCache) timeoutCallback(userID, roomID string) func() { func (t *TypingCache) timeoutCallback(userID, roomID string) func() {
return func() { return func() {
t.removeUser(userID, roomID) t.RemoveUser(userID, roomID)
} }
} }
// removeUser with mutex lock & stop the timer. // RemoveUser with mutex lock & stop the timer.
func (t *TypingCache) removeUser(userID, roomID string) { func (t *TypingCache) RemoveUser(userID, roomID string) {
t.Lock() t.Lock()
defer t.Unlock() defer t.Unlock()

View File

@ -33,7 +33,7 @@ func TestTypingCache(t *testing.T) {
testGetTypingUsers(t, tCache) testGetTypingUsers(t, tCache)
}) })
t.Run("removeUser", func(t *testing.T) { t.Run("RemoveUser", func(t *testing.T) {
testRemoveUser(t, tCache) testRemoveUser(t, tCache)
}) })
} }
@ -90,7 +90,7 @@ func testRemoveUser(t *testing.T, tCache *TypingCache) {
} }
length := len(tt.userIDs) length := len(tt.userIDs)
tCache.removeUser(tt.userIDs[length-1], tt.roomID) tCache.RemoveUser(tt.userIDs[length-1], tt.roomID)
expLeftUsers := tt.userIDs[:length-1] expLeftUsers := tt.userIDs[:length-1]
if leftUsers := tCache.GetTypingUsers(tt.roomID); !test.UnsortedStringSliceEqual(leftUsers, expLeftUsers) { if leftUsers := tCache.GetTypingUsers(tt.roomID); !test.UnsortedStringSliceEqual(leftUsers, expLeftUsers) {
t.Errorf("Response after removal is unexpected. Want = %s, got = %s", leftUsers, expLeftUsers) t.Errorf("Response after removal is unexpected. Want = %s, got = %s", leftUsers, expLeftUsers)

View File

@ -0,0 +1,96 @@
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package input
import (
"context"
"encoding/json"
"net/http"
"time"
"github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/typingserver/api"
"github.com/matrix-org/dendrite/typingserver/cache"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"gopkg.in/Shopify/sarama.v1"
)
// TypingServerInputAPI implements api.TypingServerInputAPI
type TypingServerInputAPI struct {
// Cache to store the current typing members in each room.
Cache *cache.TypingCache
// The kafka topic to output new typing events to.
OutputTypingEventTopic string
// kafka producer
Producer sarama.SyncProducer
}
// InputTypingEvent implements api.TypingServerInputAPI
func (t *TypingServerInputAPI) InputTypingEvent(
ctx context.Context,
request *api.InputTypingEventRequest,
response *api.InputTypingEventResponse,
) error {
ite := &request.InputTypingEvent
if ite.Typing {
// user is typing, update our current state of users typing.
expireTime := ite.OriginServerTS.Time().Add(
time.Duration(ite.Timeout) * time.Millisecond,
)
t.Cache.AddTypingUser(ite.UserID, ite.RoomID, &expireTime)
} else {
t.Cache.RemoveUser(ite.UserID, ite.RoomID)
}
return t.sendUpdateForRoom(ite.RoomID)
}
func (t *TypingServerInputAPI) sendUpdateForRoom(roomID string) error {
userIDs := t.Cache.GetTypingUsers(roomID)
event := &api.TypingEvent{
Type: gomatrixserverlib.MTyping,
RoomID: roomID,
Content: api.TypingEventContent{UserIDs: userIDs},
}
eventJSON, err := json.Marshal(api.OutputTypingEvent{Event: *event})
if err != nil {
return err
}
m := &sarama.ProducerMessage{
Topic: string(t.OutputTypingEventTopic),
Key: sarama.StringEncoder(roomID),
Value: sarama.ByteEncoder(eventJSON),
}
_, _, err = t.Producer.SendMessage(m)
return err
}
// SetupHTTP adds the TypingServerInputAPI handlers to the http.ServeMux.
func (t *TypingServerInputAPI) SetupHTTP(servMux *http.ServeMux) {
servMux.Handle(api.TypingServerInputTypingEventPath,
common.MakeInternalAPI("inputTypingEvents", func(req *http.Request) util.JSONResponse {
var request api.InputTypingEventRequest
var response api.InputTypingEventResponse
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
return util.MessageResponse(http.StatusBadRequest, err.Error())
}
if err := t.InputTypingEvent(req.Context(), &request, &response); err != nil {
return util.ErrorResponse(err)
}
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}),
)
}

View File

@ -13,8 +13,12 @@
package typingserver package typingserver
import ( import (
"net/http"
"github.com/matrix-org/dendrite/common/basecomponent" "github.com/matrix-org/dendrite/common/basecomponent"
"github.com/matrix-org/dendrite/typingserver/api" "github.com/matrix-org/dendrite/typingserver/api"
"github.com/matrix-org/dendrite/typingserver/cache"
"github.com/matrix-org/dendrite/typingserver/input"
) )
// SetupTypingServerComponent sets up and registers HTTP handlers for the // SetupTypingServerComponent sets up and registers HTTP handlers for the
@ -23,7 +27,14 @@ import (
// APIs directly instead of having to use HTTP. // APIs directly instead of having to use HTTP.
func SetupTypingServerComponent( func SetupTypingServerComponent(
base *basecomponent.BaseDendrite, base *basecomponent.BaseDendrite,
typingCache *cache.TypingCache,
) api.TypingServerInputAPI { ) api.TypingServerInputAPI {
// TODO: implement typing server inputAPI := &input.TypingServerInputAPI{
return base.CreateHTTPTypingServerAPIs() Cache: typingCache,
Producer: base.KafkaProducer,
OutputTypingEventTopic: string(base.Cfg.Kafka.Topics.OutputTypingEvent),
}
inputAPI.SetupHTTP(http.DefaultServeMux)
return inputAPI
} }

28
vendor/manifest vendored
View File

@ -108,6 +108,19 @@
"revision": "392c28fe23e1c45ddba891b0320b3b5df220beea", "revision": "392c28fe23e1c45ddba891b0320b3b5df220beea",
"branch": "master" "branch": "master"
}, },
{
"importpath": "github.com/jaegertracing/jaeger-client-go",
"repository": "https://github.com/jaegertracing/jaeger-client-go",
"revision": "3ad49a1d839b517923a6fdac36d81cbf7b744f37",
"branch": "master"
},
{
"importpath": "github.com/jaegertracing/jaeger-lib/metrics",
"repository": "https://github.com/jaegertracing/jaeger-lib",
"revision": "21a3da6d66fe0e278072676fdc84cd4c9ccb9b67",
"branch": "master",
"path": "/metrics"
},
{ {
"importpath": "github.com/klauspost/crc32", "importpath": "github.com/klauspost/crc32",
"repository": "https://github.com/klauspost/crc32", "repository": "https://github.com/klauspost/crc32",
@ -135,7 +148,7 @@
{ {
"importpath": "github.com/matrix-org/gomatrixserverlib", "importpath": "github.com/matrix-org/gomatrixserverlib",
"repository": "https://github.com/matrix-org/gomatrixserverlib", "repository": "https://github.com/matrix-org/gomatrixserverlib",
"revision": "929828872b51e6733166553d6b1a20155b6ab829", "revision": "677bbe93ffc9ad9ba5de615cd81185d0493f5d25",
"branch": "master" "branch": "master"
}, },
{ {
@ -304,19 +317,6 @@
"revision": "54f72d32435d760d5604f17a82e2435b28dc4ba5", "revision": "54f72d32435d760d5604f17a82e2435b28dc4ba5",
"branch": "master" "branch": "master"
}, },
{
"importpath": "github.com/jaegertracing/jaeger-client-go",
"repository": "https://github.com/jaegertracing/jaeger-client-go",
"revision": "3ad49a1d839b517923a6fdac36d81cbf7b744f37",
"branch": "master"
},
{
"importpath": "github.com/jaegertracing/jaeger-lib/metrics",
"repository": "https://github.com/jaegertracing/jaeger-lib",
"revision": "21a3da6d66fe0e278072676fdc84cd4c9ccb9b67",
"branch": "master",
"path": "/metrics"
},
{ {
"importpath": "github.com/uber/tchannel-go", "importpath": "github.com/uber/tchannel-go",
"repository": "https://github.com/uber/tchannel-go", "repository": "https://github.com/uber/tchannel-go",

View File

@ -0,0 +1,23 @@
/* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package gomatrixserverlib
// EDU represents a EDU received via federation
// https://matrix.org/docs/spec/server_server/unstable.html#edus
type EDU struct {
Type string `json:"edu_type"`
Origin string `json:"origin"`
Destination string `json:"destination"`
Content RawJSON `json:"content"`
}

View File

@ -47,6 +47,8 @@ const (
MRoomHistoryVisibility = "m.room.history_visibility" MRoomHistoryVisibility = "m.room.history_visibility"
// MRoomRedaction https://matrix.org/docs/spec/client_server/r0.2.0.html#id21 // MRoomRedaction https://matrix.org/docs/spec/client_server/r0.2.0.html#id21
MRoomRedaction = "m.room.redaction" MRoomRedaction = "m.room.redaction"
// MTyping https://matrix.org/docs/spec/client_server/r0.3.0.html#m-typing
MTyping = "m.typing"
) )
// StateNeeded lists the event types and state_keys needed to authenticate an event. // StateNeeded lists the event types and state_keys needed to authenticate an event.

0
vendor/src/github.com/matrix-org/gomatrixserverlib/hooks/install.sh vendored Executable file → Normal file
View File

0
vendor/src/github.com/matrix-org/gomatrixserverlib/hooks/pre-commit vendored Executable file → Normal file
View File

View File

@ -11,7 +11,7 @@
"structcheck", "structcheck",
"maligned", "maligned",
"ineffassign", "ineffassign",
"gas", "gosec",
"misspell", "misspell",
"gosimple", "gosimple",
"megacheck", "megacheck",

View File

@ -16,11 +16,14 @@ type Transaction struct {
// the destination server. Multiple transactions can be sent by the origin // the destination server. Multiple transactions can be sent by the origin
// server to the destination server in parallel so there may be more than // server to the destination server in parallel so there may be more than
// one previous transaction. // one previous transaction.
PreviousIDs []TransactionID `json:"previous_ids"` PreviousIDs []TransactionID `json:"previous_ids,omitempty"`
// The room events pushed from the origin server to the destination server // The room events pushed from the origin server to the destination server
// by this transaction. The events should either be events that originate // by this transaction. The events should either be events that originate
// on the origin server or be join m.room.member events. // on the origin server or be join m.room.member events.
PDUs []Event `json:"pdus"` PDUs []Event `json:"pdus"`
// The ephemeral events pushed from origin server to destination server
// by this transaction. The events must orginate at the origin server.
EDUs []EDU `json:"edus,omitempty"`
} }
// A TransactionID identifies a transaction sent by a matrix server to another // A TransactionID identifies a transaction sent by a matrix server to another

0
vendor/src/github.com/matrix-org/gomatrixserverlib/travis.sh vendored Executable file → Normal file
View File