Add tables for tracking the state of invites to the room server. (#165)
* Storage functions for invite events * Add table for tracking membership state * More stuff * More stuff * Use utility methods from gomatrixserverlib, rather than reimplementing them * More stuff * Return string rather than pointer to string * Update gomatrixserverlib * Use HTTP API for roomserver input. * Use synchronous HTTP API for writing events to the roomserver * Remove unused config for kafka topic * Add new output types to roomserver for invites * Write membership updates * Separate filtering from pairing up changes in membershipChanges * Fix SQL * Fix SQL * Namespace the tables * Fix SQL * Use clearer names for some of the variables * Rename senderID for consistency * Restructure update membership * Comments * More comment * Fix SQL * More comments * Assign state keys inside the transaction * Comment on the purpose of the latestEventsUpdater * Comment on the purpose of updateMembership * Remove duplicate fields from stateChange * Attempt to rewrite comment in 'english' * More comments * Fix comment * Comment * more commentsmain
parent
c35803c9d8
commit
2071387f3c
|
@ -21,8 +21,14 @@ import (
|
||||||
// An OutputType is a type of roomserver output.
|
// An OutputType is a type of roomserver output.
|
||||||
type OutputType string
|
type OutputType string
|
||||||
|
|
||||||
|
const (
|
||||||
// OutputTypeNewRoomEvent indicates that the event is an OutputNewRoomEvent
|
// OutputTypeNewRoomEvent indicates that the event is an OutputNewRoomEvent
|
||||||
const OutputTypeNewRoomEvent OutputType = "new_room_event"
|
OutputTypeNewRoomEvent OutputType = "new_room_event"
|
||||||
|
// OutputTypeNewInviteEvent indicates that the event is an OutputNewInviteEvent
|
||||||
|
OutputTypeNewInviteEvent OutputType = "new_invite_event"
|
||||||
|
// OutputTypeRetireInviteEvent indicates that the event is an OutputRetireInviteEvent
|
||||||
|
OutputTypeRetireInviteEvent OutputType = "retire_invite_event"
|
||||||
|
)
|
||||||
|
|
||||||
// An OutputEvent is an entry in the roomserver output kafka log.
|
// An OutputEvent is an entry in the roomserver output kafka log.
|
||||||
// Consumers should check the type field when consuming this event.
|
// Consumers should check the type field when consuming this event.
|
||||||
|
@ -31,6 +37,10 @@ type OutputEvent struct {
|
||||||
Type OutputType `json:"type"`
|
Type OutputType `json:"type"`
|
||||||
// The content of event with type OutputTypeNewRoomEvent
|
// The content of event with type OutputTypeNewRoomEvent
|
||||||
NewRoomEvent *OutputNewRoomEvent `json:"new_room_event,omitempty"`
|
NewRoomEvent *OutputNewRoomEvent `json:"new_room_event,omitempty"`
|
||||||
|
// The content of event with type OutputTypeNewInviteEvent
|
||||||
|
NewInviteEvent *OutputNewInviteEvent `json:"new_invite_event,omitempty"`
|
||||||
|
// The content of event with type OutputTypeRetireInviteEvent
|
||||||
|
RetireInviteEvent *OutputRetireInviteEvent `json:"retire_invite_event,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// An OutputNewRoomEvent is written when the roomserver receives a new event.
|
// An OutputNewRoomEvent is written when the roomserver receives a new event.
|
||||||
|
@ -98,3 +108,26 @@ type OutputNewRoomEvent struct {
|
||||||
// future proof the API for virtual hosting.
|
// future proof the API for virtual hosting.
|
||||||
SendAsServer string `json:"send_as_server"`
|
SendAsServer string `json:"send_as_server"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// An OutputNewInviteEvent is written whenever an invite becomes active.
|
||||||
|
// Invite events can be received outside of an existing room so have to be
|
||||||
|
// tracked separately from the room events themselves.
|
||||||
|
type OutputNewInviteEvent struct {
|
||||||
|
// The "m.room.member" invite event.
|
||||||
|
Event gomatrixserverlib.Event `json:"event"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// An OutputRetireInviteEvent is written whenever an existing invite is no longer
|
||||||
|
// active. An invite stops being active if the user joins the room or if the
|
||||||
|
// invite is rejected by the user.
|
||||||
|
type OutputRetireInviteEvent struct {
|
||||||
|
// The ID of the "m.room.member" invite event.
|
||||||
|
EventID string
|
||||||
|
// Optional event ID of the event that replaced the invite.
|
||||||
|
// This can be empty if the invite was rejected locally and we were unable
|
||||||
|
// to reach the server that originally sent the invite.
|
||||||
|
RetiredByEventID string
|
||||||
|
// The "membership" of the user after retiring the invite. One of "join"
|
||||||
|
// "leave" or "ban".
|
||||||
|
Membership string
|
||||||
|
}
|
||||||
|
|
|
@ -43,8 +43,8 @@ type RoomEventDatabase interface {
|
||||||
|
|
||||||
// OutputRoomEventWriter has the APIs needed to write an event to the output logs.
|
// OutputRoomEventWriter has the APIs needed to write an event to the output logs.
|
||||||
type OutputRoomEventWriter interface {
|
type OutputRoomEventWriter interface {
|
||||||
// Write an event.
|
// Write a list of events for a room
|
||||||
WriteOutputRoomEvent(output api.OutputNewRoomEvent) error
|
WriteOutputEvents(roomID string, updates []api.OutputEvent) error
|
||||||
}
|
}
|
||||||
|
|
||||||
func processRoomEvent(db RoomEventDatabase, ow OutputRoomEventWriter, input api.InputRoomEvent) error {
|
func processRoomEvent(db RoomEventDatabase, ow OutputRoomEventWriter, input api.InputRoomEvent) error {
|
||||||
|
|
|
@ -46,22 +46,21 @@ type RoomserverInputAPI struct {
|
||||||
processed int64
|
processed int64
|
||||||
}
|
}
|
||||||
|
|
||||||
// WriteOutputRoomEvent implements OutputRoomEventWriter
|
// WriteOutputEvents implements OutputRoomEventWriter
|
||||||
func (r *RoomserverInputAPI) WriteOutputRoomEvent(output api.OutputNewRoomEvent) error {
|
func (r *RoomserverInputAPI) WriteOutputEvents(roomID string, updates []api.OutputEvent) error {
|
||||||
var m sarama.ProducerMessage
|
messages := make([]*sarama.ProducerMessage, len(updates))
|
||||||
oe := api.OutputEvent{
|
for i := range updates {
|
||||||
Type: api.OutputTypeNewRoomEvent,
|
value, err := json.Marshal(updates[i])
|
||||||
NewRoomEvent: &output,
|
|
||||||
}
|
|
||||||
value, err := json.Marshal(oe)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
m.Topic = r.OutputRoomEventTopic
|
messages[i] = &sarama.ProducerMessage{
|
||||||
m.Key = sarama.StringEncoder("")
|
Topic: r.OutputRoomEventTopic,
|
||||||
m.Value = sarama.ByteEncoder(value)
|
Key: sarama.StringEncoder(roomID),
|
||||||
_, _, err = r.Producer.SendMessage(&m)
|
Value: sarama.ByteEncoder(value),
|
||||||
return err
|
}
|
||||||
|
}
|
||||||
|
return r.Producer.SendMessages(messages)
|
||||||
}
|
}
|
||||||
|
|
||||||
// InputRoomEvents implements api.RoomserverInputAPI
|
// InputRoomEvents implements api.RoomserverInputAPI
|
||||||
|
|
|
@ -66,69 +66,88 @@ func updateLatestEvents(
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
err = doUpdateLatestEvents(db, updater, ow, roomNID, stateAtEvent, event, sendAsServer)
|
u := latestEventsUpdater{
|
||||||
return
|
db: db, updater: updater, ow: ow, roomNID: roomNID,
|
||||||
|
stateAtEvent: stateAtEvent, event: event, sendAsServer: sendAsServer,
|
||||||
|
}
|
||||||
|
return u.doUpdateLatestEvents()
|
||||||
}
|
}
|
||||||
|
|
||||||
func doUpdateLatestEvents(
|
// latestEventsUpdater tracks the state used to update the latest events in the
|
||||||
db RoomEventDatabase,
|
// room. It mostly just ferries state between the various function calls.
|
||||||
updater types.RoomRecentEventsUpdater,
|
// The state could be passed using function arguments, but it becomes impractical
|
||||||
ow OutputRoomEventWriter,
|
// when there are so many variables to pass around.
|
||||||
roomNID types.RoomNID,
|
type latestEventsUpdater struct {
|
||||||
stateAtEvent types.StateAtEvent,
|
db RoomEventDatabase
|
||||||
event gomatrixserverlib.Event,
|
updater types.RoomRecentEventsUpdater
|
||||||
sendAsServer string,
|
ow OutputRoomEventWriter
|
||||||
) error {
|
roomNID types.RoomNID
|
||||||
|
stateAtEvent types.StateAtEvent
|
||||||
|
event gomatrixserverlib.Event
|
||||||
|
// Which server to send this event as.
|
||||||
|
sendAsServer string
|
||||||
|
// The eventID of the event that was processed before this one.
|
||||||
|
lastEventIDSent string
|
||||||
|
// The latest events in the room after processing this event.
|
||||||
|
latest []types.StateAtEventAndReference
|
||||||
|
// The state entries removed from and added to the current state of the
|
||||||
|
// room as a result of processing this event. They are sorted lists.
|
||||||
|
removed []types.StateEntry
|
||||||
|
added []types.StateEntry
|
||||||
|
// The state entries that are removed and added to recover the state before
|
||||||
|
// the event being processed. They are sorted lists.
|
||||||
|
stateBeforeEventRemoves []types.StateEntry
|
||||||
|
stateBeforeEventAdds []types.StateEntry
|
||||||
|
// The snapshots of current state before and after processing this event
|
||||||
|
oldStateNID types.StateSnapshotNID
|
||||||
|
newStateNID types.StateSnapshotNID
|
||||||
|
}
|
||||||
|
|
||||||
|
func (u *latestEventsUpdater) doUpdateLatestEvents() error {
|
||||||
var err error
|
var err error
|
||||||
var prevEvents []gomatrixserverlib.EventReference
|
var prevEvents []gomatrixserverlib.EventReference
|
||||||
prevEvents = event.PrevEvents()
|
prevEvents = u.event.PrevEvents()
|
||||||
oldLatest := updater.LatestEvents()
|
oldLatest := u.updater.LatestEvents()
|
||||||
lastEventIDSent := updater.LastEventIDSent()
|
u.lastEventIDSent = u.updater.LastEventIDSent()
|
||||||
oldStateNID := updater.CurrentStateSnapshotNID()
|
u.oldStateNID = u.updater.CurrentStateSnapshotNID()
|
||||||
|
|
||||||
if hasBeenSent, err := updater.HasEventBeenSent(stateAtEvent.EventNID); err != nil {
|
if hasBeenSent, err := u.updater.HasEventBeenSent(u.stateAtEvent.EventNID); err != nil {
|
||||||
return err
|
return err
|
||||||
} else if hasBeenSent {
|
} else if hasBeenSent {
|
||||||
// Already sent this event so we can stop processing
|
// Already sent this event so we can stop processing
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = updater.StorePreviousEvents(stateAtEvent.EventNID, prevEvents); err != nil {
|
if err = u.updater.StorePreviousEvents(u.stateAtEvent.EventNID, prevEvents); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
eventReference := event.EventReference()
|
eventReference := u.event.EventReference()
|
||||||
// Check if this event is already referenced by another event in the room.
|
// Check if this event is already referenced by another event in the room.
|
||||||
var alreadyReferenced bool
|
var alreadyReferenced bool
|
||||||
if alreadyReferenced, err = updater.IsReferenced(eventReference); err != nil {
|
if alreadyReferenced, err = u.updater.IsReferenced(eventReference); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
newLatest := calculateLatest(oldLatest, alreadyReferenced, prevEvents, types.StateAtEventAndReference{
|
u.latest = calculateLatest(oldLatest, alreadyReferenced, prevEvents, types.StateAtEventAndReference{
|
||||||
EventReference: eventReference,
|
EventReference: eventReference,
|
||||||
StateAtEvent: stateAtEvent,
|
StateAtEvent: u.stateAtEvent,
|
||||||
})
|
})
|
||||||
|
|
||||||
latestStateAtEvents := make([]types.StateAtEvent, len(newLatest))
|
if err = u.latestState(); err != nil {
|
||||||
for i := range newLatest {
|
return err
|
||||||
latestStateAtEvents[i] = newLatest[i].StateAtEvent
|
|
||||||
}
|
}
|
||||||
newStateNID, err := state.CalculateAndStoreStateAfterEvents(db, roomNID, latestStateAtEvents)
|
|
||||||
|
updates, err := updateMemberships(u.db, u.updater, u.removed, u.added)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
removed, added, err := state.DifferenceBetweeenStateSnapshots(db, oldStateNID, newStateNID)
|
update, err := u.makeOutputNewRoomEvent()
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
stateBeforeEventRemoves, stateBeforeEventAdds, err := state.DifferenceBetweeenStateSnapshots(
|
|
||||||
db, newStateNID, stateAtEvent.BeforeStateSnapshotNID,
|
|
||||||
)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
updates = append(updates, *update)
|
||||||
|
|
||||||
// Send the event to the output logs.
|
// Send the event to the output logs.
|
||||||
// We do this inside the database transaction to ensure that we only mark an event as sent if we sent it.
|
// We do this inside the database transaction to ensure that we only mark an event as sent if we sent it.
|
||||||
|
@ -138,24 +157,47 @@ func doUpdateLatestEvents(
|
||||||
// send the event asynchronously but we would need to ensure that 1) the events are written to the log in
|
// send the event asynchronously but we would need to ensure that 1) the events are written to the log in
|
||||||
// the correct order, 2) that pending writes are resent across restarts. In order to avoid writing all the
|
// the correct order, 2) that pending writes are resent across restarts. In order to avoid writing all the
|
||||||
// necessary bookkeeping we'll keep the event sending synchronous for now.
|
// necessary bookkeeping we'll keep the event sending synchronous for now.
|
||||||
if err = writeEvent(
|
if err = u.ow.WriteOutputEvents(u.event.RoomID(), updates); err != nil {
|
||||||
db, ow, lastEventIDSent, event, newLatest, removed, added,
|
|
||||||
stateBeforeEventRemoves, stateBeforeEventAdds, sendAsServer,
|
|
||||||
); err != nil {
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = updater.SetLatestEvents(roomNID, newLatest, stateAtEvent.EventNID, newStateNID); err != nil {
|
if err = u.updater.SetLatestEvents(u.roomNID, u.latest, u.stateAtEvent.EventNID, u.newStateNID); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = updater.MarkEventAsSent(stateAtEvent.EventNID); err != nil {
|
if err = u.updater.MarkEventAsSent(u.stateAtEvent.EventNID); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (u *latestEventsUpdater) latestState() error {
|
||||||
|
var err error
|
||||||
|
|
||||||
|
latestStateAtEvents := make([]types.StateAtEvent, len(u.latest))
|
||||||
|
for i := range u.latest {
|
||||||
|
latestStateAtEvents[i] = u.latest[i].StateAtEvent
|
||||||
|
}
|
||||||
|
u.newStateNID, err = state.CalculateAndStoreStateAfterEvents(u.db, u.roomNID, latestStateAtEvents)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
u.removed, u.added, err = state.DifferenceBetweeenStateSnapshots(u.db, u.oldStateNID, u.newStateNID)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
u.stateBeforeEventRemoves, u.stateBeforeEventAdds, err = state.DifferenceBetweeenStateSnapshots(
|
||||||
|
u.db, u.newStateNID, u.stateAtEvent.BeforeStateSnapshotNID,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func calculateLatest(oldLatest []types.StateAtEventAndReference, alreadyReferenced bool, prevEvents []gomatrixserverlib.EventReference, newEvent types.StateAtEventAndReference) []types.StateAtEventAndReference {
|
func calculateLatest(oldLatest []types.StateAtEventAndReference, alreadyReferenced bool, prevEvents []gomatrixserverlib.EventReference, newEvent types.StateAtEventAndReference) []types.StateAtEventAndReference {
|
||||||
var alreadyInLatest bool
|
var alreadyInLatest bool
|
||||||
var newLatest []types.StateAtEventAndReference
|
var newLatest []types.StateAtEventAndReference
|
||||||
|
@ -189,57 +231,55 @@ func calculateLatest(oldLatest []types.StateAtEventAndReference, alreadyReferenc
|
||||||
return newLatest
|
return newLatest
|
||||||
}
|
}
|
||||||
|
|
||||||
func writeEvent(
|
func (u *latestEventsUpdater) makeOutputNewRoomEvent() (*api.OutputEvent, error) {
|
||||||
db RoomEventDatabase, ow OutputRoomEventWriter, lastEventIDSent string,
|
|
||||||
event gomatrixserverlib.Event, latest []types.StateAtEventAndReference,
|
|
||||||
removed, added []types.StateEntry,
|
|
||||||
stateBeforeEventRemoves, stateBeforeEventAdds []types.StateEntry,
|
|
||||||
sendAsServer string,
|
|
||||||
) error {
|
|
||||||
|
|
||||||
latestEventIDs := make([]string, len(latest))
|
latestEventIDs := make([]string, len(u.latest))
|
||||||
for i := range latest {
|
for i := range u.latest {
|
||||||
latestEventIDs[i] = latest[i].EventID
|
latestEventIDs[i] = u.latest[i].EventID
|
||||||
}
|
}
|
||||||
|
|
||||||
ore := api.OutputNewRoomEvent{
|
ore := api.OutputNewRoomEvent{
|
||||||
Event: event,
|
Event: u.event,
|
||||||
LastSentEventID: lastEventIDSent,
|
LastSentEventID: u.lastEventIDSent,
|
||||||
LatestEventIDs: latestEventIDs,
|
LatestEventIDs: latestEventIDs,
|
||||||
}
|
}
|
||||||
|
|
||||||
var stateEventNIDs []types.EventNID
|
var stateEventNIDs []types.EventNID
|
||||||
for _, entry := range added {
|
for _, entry := range u.added {
|
||||||
stateEventNIDs = append(stateEventNIDs, entry.EventNID)
|
stateEventNIDs = append(stateEventNIDs, entry.EventNID)
|
||||||
}
|
}
|
||||||
for _, entry := range removed {
|
for _, entry := range u.removed {
|
||||||
stateEventNIDs = append(stateEventNIDs, entry.EventNID)
|
stateEventNIDs = append(stateEventNIDs, entry.EventNID)
|
||||||
}
|
}
|
||||||
for _, entry := range stateBeforeEventRemoves {
|
for _, entry := range u.stateBeforeEventRemoves {
|
||||||
stateEventNIDs = append(stateEventNIDs, entry.EventNID)
|
stateEventNIDs = append(stateEventNIDs, entry.EventNID)
|
||||||
}
|
}
|
||||||
for _, entry := range stateBeforeEventAdds {
|
for _, entry := range u.stateBeforeEventAdds {
|
||||||
stateEventNIDs = append(stateEventNIDs, entry.EventNID)
|
stateEventNIDs = append(stateEventNIDs, entry.EventNID)
|
||||||
}
|
}
|
||||||
stateEventNIDs = stateEventNIDs[:util.SortAndUnique(eventNIDSorter(stateEventNIDs))]
|
stateEventNIDs = stateEventNIDs[:util.SortAndUnique(eventNIDSorter(stateEventNIDs))]
|
||||||
eventIDMap, err := db.EventIDs(stateEventNIDs)
|
eventIDMap, err := u.db.EventIDs(stateEventNIDs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return nil, err
|
||||||
}
|
}
|
||||||
for _, entry := range added {
|
for _, entry := range u.added {
|
||||||
ore.AddsStateEventIDs = append(ore.AddsStateEventIDs, eventIDMap[entry.EventNID])
|
ore.AddsStateEventIDs = append(ore.AddsStateEventIDs, eventIDMap[entry.EventNID])
|
||||||
}
|
}
|
||||||
for _, entry := range removed {
|
for _, entry := range u.removed {
|
||||||
ore.RemovesStateEventIDs = append(ore.RemovesStateEventIDs, eventIDMap[entry.EventNID])
|
ore.RemovesStateEventIDs = append(ore.RemovesStateEventIDs, eventIDMap[entry.EventNID])
|
||||||
}
|
}
|
||||||
for _, entry := range stateBeforeEventRemoves {
|
for _, entry := range u.stateBeforeEventRemoves {
|
||||||
ore.StateBeforeRemovesEventIDs = append(ore.StateBeforeRemovesEventIDs, eventIDMap[entry.EventNID])
|
ore.StateBeforeRemovesEventIDs = append(ore.StateBeforeRemovesEventIDs, eventIDMap[entry.EventNID])
|
||||||
}
|
}
|
||||||
for _, entry := range stateBeforeEventAdds {
|
for _, entry := range u.stateBeforeEventAdds {
|
||||||
ore.StateBeforeAddsEventIDs = append(ore.StateBeforeAddsEventIDs, eventIDMap[entry.EventNID])
|
ore.StateBeforeAddsEventIDs = append(ore.StateBeforeAddsEventIDs, eventIDMap[entry.EventNID])
|
||||||
}
|
}
|
||||||
ore.SendAsServer = sendAsServer
|
ore.SendAsServer = u.sendAsServer
|
||||||
return ow.WriteOutputRoomEvent(ore)
|
|
||||||
|
return &api.OutputEvent{
|
||||||
|
Type: api.OutputTypeNewRoomEvent,
|
||||||
|
NewRoomEvent: &ore,
|
||||||
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
type eventNIDSorter []types.EventNID
|
type eventNIDSorter []types.EventNID
|
||||||
|
|
|
@ -0,0 +1,297 @@
|
||||||
|
// Copyright 2017 Vector Creations Ltd
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package input
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/matrix-org/dendrite/roomserver/api"
|
||||||
|
"github.com/matrix-org/dendrite/roomserver/types"
|
||||||
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
|
)
|
||||||
|
|
||||||
|
// updateMembership updates the current membership and the invites for each
|
||||||
|
// user affected by a change in the current state of the room.
|
||||||
|
// Returns a list of output events to write to the kafka log to inform the
|
||||||
|
// consumers about the invites added or retired by the change in current state.
|
||||||
|
func updateMemberships(
|
||||||
|
db RoomEventDatabase, updater types.RoomRecentEventsUpdater, removed, added []types.StateEntry,
|
||||||
|
) ([]api.OutputEvent, error) {
|
||||||
|
changes := membershipChanges(removed, added)
|
||||||
|
var eventNIDs []types.EventNID
|
||||||
|
for _, change := range changes {
|
||||||
|
if change.addedEventNID != 0 {
|
||||||
|
eventNIDs = append(eventNIDs, change.addedEventNID)
|
||||||
|
}
|
||||||
|
if change.removedEventNID != 0 {
|
||||||
|
eventNIDs = append(eventNIDs, change.removedEventNID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Load the event JSON so we can look up the "membership" key.
|
||||||
|
// TODO: Maybe add a membership key to the events table so we can load that
|
||||||
|
// key without having to load the entire event JSON?
|
||||||
|
events, err := db.Events(eventNIDs)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var updates []api.OutputEvent
|
||||||
|
|
||||||
|
for _, change := range changes {
|
||||||
|
var ae *gomatrixserverlib.Event
|
||||||
|
var re *gomatrixserverlib.Event
|
||||||
|
targetUserNID := change.EventStateKeyNID
|
||||||
|
if change.removedEventNID != 0 {
|
||||||
|
ev, _ := eventMap(events).lookup(change.removedEventNID)
|
||||||
|
if ev != nil {
|
||||||
|
re = &ev.Event
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if change.addedEventNID != 0 {
|
||||||
|
ev, _ := eventMap(events).lookup(change.addedEventNID)
|
||||||
|
if ev != nil {
|
||||||
|
ae = &ev.Event
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if updates, err = updateMembership(updater, targetUserNID, re, ae, updates); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func updateMembership(
|
||||||
|
updater types.RoomRecentEventsUpdater, targetUserNID types.EventStateKeyNID,
|
||||||
|
remove, add *gomatrixserverlib.Event,
|
||||||
|
updates []api.OutputEvent,
|
||||||
|
) ([]api.OutputEvent, error) {
|
||||||
|
var err error
|
||||||
|
// Default the membership to "leave" if no event was added or removed.
|
||||||
|
old := "leave"
|
||||||
|
new := "leave"
|
||||||
|
|
||||||
|
if remove != nil {
|
||||||
|
old, err = remove.Membership()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if add != nil {
|
||||||
|
new, err = add.Membership()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if old == new {
|
||||||
|
// If the membership is the same then nothing changed and we can return
|
||||||
|
// immediately. This should help speed up processing for display name
|
||||||
|
// changes where the membership is "join" both before and after.
|
||||||
|
return updates, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
mu, err := updater.MembershipUpdater(targetUserNID)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
switch new {
|
||||||
|
case "invite":
|
||||||
|
return updateToInviteMembership(mu, add, updates)
|
||||||
|
case "join":
|
||||||
|
return updateToJoinMembership(mu, add, updates)
|
||||||
|
case "leave", "ban":
|
||||||
|
return updateToLeaveMembership(mu, add, new, updates)
|
||||||
|
default:
|
||||||
|
panic(fmt.Errorf(
|
||||||
|
"input: membership %q is not one of the allowed values", new,
|
||||||
|
))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func updateToInviteMembership(
|
||||||
|
mu types.MembershipUpdater, add *gomatrixserverlib.Event, updates []api.OutputEvent,
|
||||||
|
) ([]api.OutputEvent, error) {
|
||||||
|
// We may have already sent the invite to the user, either because we are
|
||||||
|
// reprocessing this event, or because the we received this invite from a
|
||||||
|
// remote server via the federation invite API. In those cases we don't need
|
||||||
|
// to send the event.
|
||||||
|
needsSending, err := mu.SetToInvite(*add)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if needsSending {
|
||||||
|
// We notify the consumers using a special event even though we will
|
||||||
|
// notify them about the change in current state as part of the normal
|
||||||
|
// room event stream. This ensures that the consumers only have to
|
||||||
|
// consider a single stream of events when determining whether a user
|
||||||
|
// is invited, rather than having to combine multiple streams themselves.
|
||||||
|
onie := api.OutputNewInviteEvent{
|
||||||
|
Event: *add,
|
||||||
|
}
|
||||||
|
updates = append(updates, api.OutputEvent{
|
||||||
|
Type: api.OutputTypeNewInviteEvent,
|
||||||
|
NewInviteEvent: &onie,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
return updates, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func updateToJoinMembership(
|
||||||
|
mu types.MembershipUpdater, add *gomatrixserverlib.Event, updates []api.OutputEvent,
|
||||||
|
) ([]api.OutputEvent, error) {
|
||||||
|
// If the user is already marked as being joined then we can return immediately.
|
||||||
|
// TODO: Is this code reachable given the "old != new" guard in updateMembership?
|
||||||
|
if mu.IsJoin() {
|
||||||
|
return updates, nil
|
||||||
|
}
|
||||||
|
// When we mark a user as being joined we will invalidate any invites that
|
||||||
|
// are active for that user. We notify the consumers that the invites have
|
||||||
|
// been retired using a special event, even though they could infer this
|
||||||
|
// by studying the state changes in the room event stream.
|
||||||
|
retired, err := mu.SetToJoin(add.Sender())
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
for _, eventID := range retired {
|
||||||
|
orie := api.OutputRetireInviteEvent{
|
||||||
|
EventID: eventID,
|
||||||
|
Membership: "join",
|
||||||
|
}
|
||||||
|
if add != nil {
|
||||||
|
orie.RetiredByEventID = add.EventID()
|
||||||
|
}
|
||||||
|
updates = append(updates, api.OutputEvent{
|
||||||
|
Type: api.OutputTypeRetireInviteEvent,
|
||||||
|
RetireInviteEvent: &orie,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
return updates, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func updateToLeaveMembership(
|
||||||
|
mu types.MembershipUpdater, add *gomatrixserverlib.Event,
|
||||||
|
newMembership string, updates []api.OutputEvent,
|
||||||
|
) ([]api.OutputEvent, error) {
|
||||||
|
// If the user is already neither joined, nor invited to the room then we
|
||||||
|
// can return immediately.
|
||||||
|
if mu.IsLeave() {
|
||||||
|
return updates, nil
|
||||||
|
}
|
||||||
|
// When we mark a user as having left we will invalidate any invites that
|
||||||
|
// are active for that user. We notify the consumers that the invites have
|
||||||
|
// been retired using a special event, even though they could infer this
|
||||||
|
// by studying the state changes in the room event stream.
|
||||||
|
retired, err := mu.SetToLeave(add.Sender())
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
for _, eventID := range retired {
|
||||||
|
orie := api.OutputRetireInviteEvent{
|
||||||
|
EventID: eventID,
|
||||||
|
Membership: newMembership,
|
||||||
|
}
|
||||||
|
if add != nil {
|
||||||
|
orie.RetiredByEventID = add.EventID()
|
||||||
|
}
|
||||||
|
updates = append(updates, api.OutputEvent{
|
||||||
|
Type: api.OutputTypeRetireInviteEvent,
|
||||||
|
RetireInviteEvent: &orie,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
return updates, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// membershipChanges pairs up the membership state changes from a sorted list
|
||||||
|
// of state removed and a sorted list of state added.
|
||||||
|
func membershipChanges(removed, added []types.StateEntry) []stateChange {
|
||||||
|
changes := pairUpChanges(removed, added)
|
||||||
|
var result []stateChange
|
||||||
|
for _, c := range changes {
|
||||||
|
if c.EventTypeNID == types.MRoomMemberNID {
|
||||||
|
result = append(result, c)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
type stateChange struct {
|
||||||
|
types.StateKeyTuple
|
||||||
|
removedEventNID types.EventNID
|
||||||
|
addedEventNID types.EventNID
|
||||||
|
}
|
||||||
|
|
||||||
|
// pairUpChanges pairs up the state events added and removed for each type,
|
||||||
|
// state key tuple. Assumes that removed and added are sorted.
|
||||||
|
func pairUpChanges(removed, added []types.StateEntry) []stateChange {
|
||||||
|
var ai int
|
||||||
|
var ri int
|
||||||
|
var result []stateChange
|
||||||
|
for {
|
||||||
|
switch {
|
||||||
|
case ai == len(added):
|
||||||
|
// We've reached the end of the added entries.
|
||||||
|
// The rest of the removed list are events that were removed without
|
||||||
|
// an event with the same state key being added.
|
||||||
|
for _, s := range removed[ri:] {
|
||||||
|
result = append(result, stateChange{
|
||||||
|
StateKeyTuple: s.StateKeyTuple,
|
||||||
|
removedEventNID: s.EventNID,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
return result
|
||||||
|
case ri == len(removed):
|
||||||
|
// We've reached the end of the removed entries.
|
||||||
|
// The rest of the added list are events that were added without
|
||||||
|
// an event with the same state key being removed.
|
||||||
|
for _, s := range added[ai:] {
|
||||||
|
result = append(result, stateChange{
|
||||||
|
StateKeyTuple: s.StateKeyTuple,
|
||||||
|
addedEventNID: s.EventNID,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
return result
|
||||||
|
case added[ai].StateKeyTuple == removed[ri].StateKeyTuple:
|
||||||
|
// The tuple is in both lists so an event with that key is being
|
||||||
|
// removed and another event with the same key is being added.
|
||||||
|
result = append(result, stateChange{
|
||||||
|
StateKeyTuple: added[ai].StateKeyTuple,
|
||||||
|
removedEventNID: removed[ri].EventNID,
|
||||||
|
addedEventNID: added[ai].EventNID,
|
||||||
|
})
|
||||||
|
ai++
|
||||||
|
ri++
|
||||||
|
case added[ai].StateKeyTuple.LessThan(removed[ri].StateKeyTuple):
|
||||||
|
// The lists are sorted so the added entry being less than the
|
||||||
|
// removed entry means that the added event was added without an
|
||||||
|
// event with the same key being removed.
|
||||||
|
result = append(result, stateChange{
|
||||||
|
StateKeyTuple: added[ai].StateKeyTuple,
|
||||||
|
addedEventNID: added[ai].EventNID,
|
||||||
|
})
|
||||||
|
ai++
|
||||||
|
default:
|
||||||
|
// Reaching the default case implies that the removed entry is less
|
||||||
|
// than the added entry. Since the lists are sorted this means that
|
||||||
|
// the removed event was removed without an event with the same
|
||||||
|
// key being added.
|
||||||
|
result = append(result, stateChange{
|
||||||
|
StateKeyTuple: removed[ai].StateKeyTuple,
|
||||||
|
removedEventNID: removed[ri].EventNID,
|
||||||
|
})
|
||||||
|
ri++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -76,15 +76,23 @@ func (s *eventStateKeyStatements) prepare(db *sql.DB) (err error) {
|
||||||
}.prepare(db)
|
}.prepare(db)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *eventStateKeyStatements) insertEventStateKeyNID(eventStateKey string) (types.EventStateKeyNID, error) {
|
func (s *eventStateKeyStatements) insertEventStateKeyNID(txn *sql.Tx, eventStateKey string) (types.EventStateKeyNID, error) {
|
||||||
var eventStateKeyNID int64
|
var eventStateKeyNID int64
|
||||||
err := s.insertEventStateKeyNIDStmt.QueryRow(eventStateKey).Scan(&eventStateKeyNID)
|
stmt := s.insertEventStateKeyNIDStmt
|
||||||
|
if txn != nil {
|
||||||
|
stmt = txn.Stmt(stmt)
|
||||||
|
}
|
||||||
|
err := stmt.QueryRow(eventStateKey).Scan(&eventStateKeyNID)
|
||||||
return types.EventStateKeyNID(eventStateKeyNID), err
|
return types.EventStateKeyNID(eventStateKeyNID), err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *eventStateKeyStatements) selectEventStateKeyNID(eventStateKey string) (types.EventStateKeyNID, error) {
|
func (s *eventStateKeyStatements) selectEventStateKeyNID(txn *sql.Tx, eventStateKey string) (types.EventStateKeyNID, error) {
|
||||||
var eventStateKeyNID int64
|
var eventStateKeyNID int64
|
||||||
err := s.selectEventStateKeyNIDStmt.QueryRow(eventStateKey).Scan(&eventStateKeyNID)
|
stmt := s.selectEventStateKeyNIDStmt
|
||||||
|
if txn != nil {
|
||||||
|
stmt = txn.Stmt(stmt)
|
||||||
|
}
|
||||||
|
err := stmt.QueryRow(eventStateKey).Scan(&eventStateKeyNID)
|
||||||
return types.EventStateKeyNID(eventStateKeyNID), err
|
return types.EventStateKeyNID(eventStateKeyNID), err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,149 @@
|
||||||
|
// Copyright 2017 Vector Creations Ltd
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package storage
|
||||||
|
|
||||||
|
import (
|
||||||
|
"database/sql"
|
||||||
|
|
||||||
|
"github.com/matrix-org/dendrite/roomserver/types"
|
||||||
|
)
|
||||||
|
|
||||||
|
const inviteSchema = `
|
||||||
|
CREATE TABLE IF NOT EXISTS roomserver_invites (
|
||||||
|
-- The string ID of the invite event itself.
|
||||||
|
-- We can't use a numeric event ID here because we don't always have
|
||||||
|
-- enough information to store an invite in the event table.
|
||||||
|
-- In particular we don't always have a chain of auth_events for invites
|
||||||
|
-- received over federation.
|
||||||
|
invite_event_id TEXT PRIMARY KEY,
|
||||||
|
-- The numeric ID of the room the invite m.room.member event is in.
|
||||||
|
room_nid BIGINT NOT NULL,
|
||||||
|
-- The numeric ID for the state key of the invite m.room.member event.
|
||||||
|
-- This tells us who the invite is for.
|
||||||
|
-- This is used to query the active invites for a user.
|
||||||
|
target_nid BIGINT NOT NULL,
|
||||||
|
-- The numeric ID for the sender of the invite m.room.member event.
|
||||||
|
-- This tells us who sent the invite.
|
||||||
|
-- This is used to work out which matrix server we should talk to when
|
||||||
|
-- we try to join the room.
|
||||||
|
sender_nid BIGINT NOT NULL DEFAULT 0,
|
||||||
|
-- This is used to track whether the invite is still active.
|
||||||
|
-- This is set implicitly when processing new join and leave events and
|
||||||
|
-- explicitly when rejecting events over federation.
|
||||||
|
retired BOOLEAN NOT NULL DEFAULT FALSE,
|
||||||
|
-- The invite event JSON.
|
||||||
|
invite_event_json TEXT NOT NULL
|
||||||
|
);
|
||||||
|
|
||||||
|
CREATE INDEX IF NOT EXISTS roomserver_invites_active_idx ON roomserver_invites (target_nid, room_nid)
|
||||||
|
WHERE NOT retired;
|
||||||
|
`
|
||||||
|
const insertInviteEventSQL = "" +
|
||||||
|
"INSERT INTO roomserver_invites (invite_event_id, room_nid, target_nid," +
|
||||||
|
" sender_nid, invite_event_json) VALUES ($1, $2, $3, $4, $5)" +
|
||||||
|
" ON CONFLICT DO NOTHING"
|
||||||
|
|
||||||
|
const selectInviteActiveForUserInRoomSQL = "" +
|
||||||
|
"SELECT sender_nid FROM roomserver_invites" +
|
||||||
|
" WHERE target_nid = $1 AND room_nid = $2" +
|
||||||
|
" AND NOT retired"
|
||||||
|
|
||||||
|
// Retire every active invite for a user in a room.
|
||||||
|
// Ideally we'd know which invite events were retired by a given update so we
|
||||||
|
// wouldn't need to remove every active invite.
|
||||||
|
// However the matrix protocol doesn't give us a way to reliably identify the
|
||||||
|
// invites that were retired, so we are forced to retire all of them.
|
||||||
|
const updateInviteRetiredSQL = "" +
|
||||||
|
"UPDATE roomserver_invites SET retired = TRUE" +
|
||||||
|
" WHERE room_nid = $1 AND target_nid = $2 AND NOT retired" +
|
||||||
|
" RETURNING invite_event_id"
|
||||||
|
|
||||||
|
type inviteStatements struct {
|
||||||
|
insertInviteEventStmt *sql.Stmt
|
||||||
|
selectInviteActiveForUserInRoomStmt *sql.Stmt
|
||||||
|
updateInviteRetiredStmt *sql.Stmt
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *inviteStatements) prepare(db *sql.DB) (err error) {
|
||||||
|
_, err = db.Exec(inviteSchema)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
return statementList{
|
||||||
|
{&s.insertInviteEventStmt, insertInviteEventSQL},
|
||||||
|
{&s.selectInviteActiveForUserInRoomStmt, selectInviteActiveForUserInRoomSQL},
|
||||||
|
{&s.updateInviteRetiredStmt, updateInviteRetiredSQL},
|
||||||
|
}.prepare(db)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *inviteStatements) insertInviteEvent(
|
||||||
|
txn *sql.Tx, inviteEventID string, roomNID types.RoomNID,
|
||||||
|
targetUserNID, senderUserNID types.EventStateKeyNID,
|
||||||
|
inviteEventJSON []byte,
|
||||||
|
) (bool, error) {
|
||||||
|
result, err := txn.Stmt(s.insertInviteEventStmt).Exec(
|
||||||
|
inviteEventID, roomNID, targetUserNID, senderUserNID, inviteEventJSON,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
count, err := result.RowsAffected()
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
return count != 0, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *inviteStatements) updateInviteRetired(
|
||||||
|
txn *sql.Tx, roomNID types.RoomNID, targetUserNID types.EventStateKeyNID,
|
||||||
|
) ([]string, error) {
|
||||||
|
rows, err := txn.Stmt(s.updateInviteRetiredStmt).Query(roomNID, targetUserNID)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
var result []string
|
||||||
|
for rows.Next() {
|
||||||
|
var inviteEventID string
|
||||||
|
if err := rows.Scan(&inviteEventID); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
result = append(result, inviteEventID)
|
||||||
|
}
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// selectInviteActiveForUserInRoom returns a list of sender state key NIDs
|
||||||
|
func (s *inviteStatements) selectInviteActiveForUserInRoom(
|
||||||
|
targetUserNID types.EventStateKeyNID, roomNID types.RoomNID,
|
||||||
|
) ([]types.EventStateKeyNID, error) {
|
||||||
|
rows, err := s.selectInviteActiveForUserInRoomStmt.Query(
|
||||||
|
targetUserNID, roomNID,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
var result []types.EventStateKeyNID
|
||||||
|
for rows.Next() {
|
||||||
|
var senderUserNID int64
|
||||||
|
if err := rows.Scan(&senderUserNID); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
result = append(result, types.EventStateKeyNID(senderUserNID))
|
||||||
|
}
|
||||||
|
return result, nil
|
||||||
|
}
|
|
@ -0,0 +1,111 @@
|
||||||
|
// Copyright 2017 Vector Creations Ltd
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package storage
|
||||||
|
|
||||||
|
import (
|
||||||
|
"database/sql"
|
||||||
|
|
||||||
|
"github.com/matrix-org/dendrite/roomserver/types"
|
||||||
|
)
|
||||||
|
|
||||||
|
type membershipState int64
|
||||||
|
|
||||||
|
const (
|
||||||
|
membershipStateLeaveOrBan membershipState = 1
|
||||||
|
membershipStateInvite membershipState = 2
|
||||||
|
membershipStateJoin membershipState = 3
|
||||||
|
)
|
||||||
|
|
||||||
|
const membershipSchema = `
|
||||||
|
-- The membership table is used to coordinate updates between the invite table
|
||||||
|
-- and the room state tables.
|
||||||
|
-- This table is updated in one of 3 ways:
|
||||||
|
-- 1) The membership of a user changes within the current state of the room.
|
||||||
|
-- 2) An invite is received outside of a room over federation.
|
||||||
|
-- 3) An invite is rejected outside of a room over federation.
|
||||||
|
CREATE TABLE IF NOT EXISTS roomserver_membership (
|
||||||
|
room_nid BIGINT NOT NULL,
|
||||||
|
-- Numeric state key ID for the user ID this state is for.
|
||||||
|
target_nid BIGINT NOT NULL,
|
||||||
|
-- Numeric state key ID for the user ID who changed the state.
|
||||||
|
-- This may be 0 since it is not always possible to identify the user that
|
||||||
|
-- changed the state.
|
||||||
|
sender_nid BIGINT NOT NULL DEFAULT 0,
|
||||||
|
-- The state the user is in within this room.
|
||||||
|
-- Default value is "membershipStateLeaveOrBan"
|
||||||
|
membership_nid BIGINT NOT NULL DEFAULT 1,
|
||||||
|
UNIQUE (room_nid, target_nid)
|
||||||
|
);
|
||||||
|
`
|
||||||
|
|
||||||
|
// Insert a row in to membership table so that it can be locked by the
|
||||||
|
// SELECT FOR UPDATE
|
||||||
|
const insertMembershipSQL = "" +
|
||||||
|
"INSERT INTO roomserver_membership (room_nid, target_nid)" +
|
||||||
|
" VALUES ($1, $2)" +
|
||||||
|
" ON CONFLICT DO NOTHING"
|
||||||
|
|
||||||
|
const selectMembershipForUpdateSQL = "" +
|
||||||
|
"SELECT membership_nid FROM roomserver_membership" +
|
||||||
|
" WHERE room_nid = $1 AND target_nid = $2 FOR UPDATE"
|
||||||
|
|
||||||
|
const updateMembershipSQL = "" +
|
||||||
|
"UPDATE roomserver_membership SET sender_nid = $3, membership_nid = $4" +
|
||||||
|
" WHERE room_nid = $1 AND target_nid = $2"
|
||||||
|
|
||||||
|
type membershipStatements struct {
|
||||||
|
insertMembershipStmt *sql.Stmt
|
||||||
|
selectMembershipForUpdateStmt *sql.Stmt
|
||||||
|
updateMembershipStmt *sql.Stmt
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *membershipStatements) prepare(db *sql.DB) (err error) {
|
||||||
|
_, err = db.Exec(membershipSchema)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
return statementList{
|
||||||
|
{&s.insertMembershipStmt, insertMembershipSQL},
|
||||||
|
{&s.selectMembershipForUpdateStmt, selectMembershipForUpdateSQL},
|
||||||
|
{&s.updateMembershipStmt, updateMembershipSQL},
|
||||||
|
}.prepare(db)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *membershipStatements) insertMembership(
|
||||||
|
txn *sql.Tx, roomNID types.RoomNID, targetUserNID types.EventStateKeyNID,
|
||||||
|
) error {
|
||||||
|
_, err := txn.Stmt(s.insertMembershipStmt).Exec(roomNID, targetUserNID)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *membershipStatements) selectMembershipForUpdate(
|
||||||
|
txn *sql.Tx, roomNID types.RoomNID, targetUserNID types.EventStateKeyNID,
|
||||||
|
) (membership membershipState, err error) {
|
||||||
|
err = txn.Stmt(s.selectMembershipForUpdateStmt).QueryRow(
|
||||||
|
roomNID, targetUserNID,
|
||||||
|
).Scan(&membership)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *membershipStatements) updateMembership(
|
||||||
|
txn *sql.Tx, roomNID types.RoomNID, targetUserNID types.EventStateKeyNID,
|
||||||
|
senderUserNID types.EventStateKeyNID, membership membershipState,
|
||||||
|
) error {
|
||||||
|
_, err := txn.Stmt(s.updateMembershipStmt).Exec(
|
||||||
|
roomNID, targetUserNID, senderUserNID, membership,
|
||||||
|
)
|
||||||
|
return err
|
||||||
|
}
|
|
@ -28,6 +28,8 @@ type statements struct {
|
||||||
stateBlockStatements
|
stateBlockStatements
|
||||||
previousEventStatements
|
previousEventStatements
|
||||||
roomAliasesStatements
|
roomAliasesStatements
|
||||||
|
inviteStatements
|
||||||
|
membershipStatements
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *statements) prepare(db *sql.DB) error {
|
func (s *statements) prepare(db *sql.DB) error {
|
||||||
|
@ -43,6 +45,8 @@ func (s *statements) prepare(db *sql.DB) error {
|
||||||
s.stateBlockStatements.prepare,
|
s.stateBlockStatements.prepare,
|
||||||
s.previousEventStatements.prepare,
|
s.previousEventStatements.prepare,
|
||||||
s.roomAliasesStatements.prepare,
|
s.roomAliasesStatements.prepare,
|
||||||
|
s.inviteStatements.prepare,
|
||||||
|
s.membershipStatements.prepare,
|
||||||
} {
|
} {
|
||||||
if err = prepare(db); err != nil {
|
if err = prepare(db); err != nil {
|
||||||
return err
|
return err
|
||||||
|
|
|
@ -16,6 +16,7 @@ package storage
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"database/sql"
|
"database/sql"
|
||||||
|
|
||||||
// Import the postgres database driver.
|
// Import the postgres database driver.
|
||||||
_ "github.com/lib/pq"
|
_ "github.com/lib/pq"
|
||||||
"github.com/matrix-org/dendrite/roomserver/types"
|
"github.com/matrix-org/dendrite/roomserver/types"
|
||||||
|
@ -64,7 +65,7 @@ func (d *Database) StoreEvent(event gomatrixserverlib.Event, authEventNIDs []typ
|
||||||
// Assigned a numeric ID for the state_key if there is one present.
|
// Assigned a numeric ID for the state_key if there is one present.
|
||||||
// Otherwise set the numeric ID for the state_key to 0.
|
// Otherwise set the numeric ID for the state_key to 0.
|
||||||
if eventStateKey != nil {
|
if eventStateKey != nil {
|
||||||
if eventStateKeyNID, err = d.assignStateKeyNID(*eventStateKey); err != nil {
|
if eventStateKeyNID, err = d.assignStateKeyNID(nil, *eventStateKey); err != nil {
|
||||||
return 0, types.StateAtEvent{}, err
|
return 0, types.StateAtEvent{}, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -131,15 +132,15 @@ func (d *Database) assignEventTypeNID(eventType string) (types.EventTypeNID, err
|
||||||
return eventTypeNID, err
|
return eventTypeNID, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *Database) assignStateKeyNID(eventStateKey string) (types.EventStateKeyNID, error) {
|
func (d *Database) assignStateKeyNID(txn *sql.Tx, eventStateKey string) (types.EventStateKeyNID, error) {
|
||||||
// Check if we already have a numeric ID in the database.
|
// Check if we already have a numeric ID in the database.
|
||||||
eventStateKeyNID, err := d.statements.selectEventStateKeyNID(eventStateKey)
|
eventStateKeyNID, err := d.statements.selectEventStateKeyNID(txn, eventStateKey)
|
||||||
if err == sql.ErrNoRows {
|
if err == sql.ErrNoRows {
|
||||||
// We don't have a numeric ID so insert one into the database.
|
// We don't have a numeric ID so insert one into the database.
|
||||||
eventStateKeyNID, err = d.statements.insertEventStateKeyNID(eventStateKey)
|
eventStateKeyNID, err = d.statements.insertEventStateKeyNID(txn, eventStateKey)
|
||||||
if err == sql.ErrNoRows {
|
if err == sql.ErrNoRows {
|
||||||
// We raced with another insert so run the select again.
|
// We raced with another insert so run the select again.
|
||||||
eventStateKeyNID, err = d.statements.selectEventStateKeyNID(eventStateKey)
|
eventStateKeyNID, err = d.statements.selectEventStateKeyNID(txn, eventStateKey)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return eventStateKeyNID, err
|
return eventStateKeyNID, err
|
||||||
|
@ -249,12 +250,15 @@ func (d *Database) GetLatestEventsForUpdate(roomNID types.RoomNID) (types.RoomRe
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return &roomRecentEventsUpdater{txn, d, stateAndRefs, lastEventIDSent, currentStateSnapshotNID}, nil
|
return &roomRecentEventsUpdater{
|
||||||
|
transaction{txn}, d, roomNID, stateAndRefs, lastEventIDSent, currentStateSnapshotNID,
|
||||||
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
type roomRecentEventsUpdater struct {
|
type roomRecentEventsUpdater struct {
|
||||||
txn *sql.Tx
|
transaction
|
||||||
d *Database
|
d *Database
|
||||||
|
roomNID types.RoomNID
|
||||||
latestEvents []types.StateAtEventAndReference
|
latestEvents []types.StateAtEventAndReference
|
||||||
lastEventIDSent string
|
lastEventIDSent string
|
||||||
currentStateSnapshotNID types.StateSnapshotNID
|
currentStateSnapshotNID types.StateSnapshotNID
|
||||||
|
@ -319,14 +323,8 @@ func (u *roomRecentEventsUpdater) MarkEventAsSent(eventNID types.EventNID) error
|
||||||
return u.d.statements.updateEventSentToOutput(u.txn, eventNID)
|
return u.d.statements.updateEventSentToOutput(u.txn, eventNID)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Commit implements types.RoomRecentEventsUpdater
|
func (u *roomRecentEventsUpdater) MembershipUpdater(targetUserNID types.EventStateKeyNID) (types.MembershipUpdater, error) {
|
||||||
func (u *roomRecentEventsUpdater) Commit() error {
|
return u.d.membershipUpdaterTxn(u.txn, u.roomNID, targetUserNID)
|
||||||
return u.txn.Commit()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Rollback implements types.RoomRecentEventsUpdater
|
|
||||||
func (u *roomRecentEventsUpdater) Rollback() error {
|
|
||||||
return u.txn.Rollback()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// RoomNID implements query.RoomserverQueryAPIDB
|
// RoomNID implements query.RoomserverQueryAPIDB
|
||||||
|
@ -381,3 +379,124 @@ func (d *Database) StateEntriesForTuples(
|
||||||
) ([]types.StateEntryList, error) {
|
) ([]types.StateEntryList, error) {
|
||||||
return d.statements.bulkSelectFilteredStateBlockEntries(stateBlockNIDs, stateKeyTuples)
|
return d.statements.bulkSelectFilteredStateBlockEntries(stateBlockNIDs, stateKeyTuples)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type membershipUpdater struct {
|
||||||
|
transaction
|
||||||
|
d *Database
|
||||||
|
roomNID types.RoomNID
|
||||||
|
targetUserNID types.EventStateKeyNID
|
||||||
|
membership membershipState
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *Database) membershipUpdaterTxn(
|
||||||
|
txn *sql.Tx, roomNID types.RoomNID, targetUserNID types.EventStateKeyNID,
|
||||||
|
) (types.MembershipUpdater, error) {
|
||||||
|
|
||||||
|
if err := d.statements.insertMembership(txn, roomNID, targetUserNID); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
membership, err := d.statements.selectMembershipForUpdate(txn, roomNID, targetUserNID)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return &membershipUpdater{
|
||||||
|
transaction{txn}, d, roomNID, targetUserNID, membership,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// IsInvite implements types.MembershipUpdater
|
||||||
|
func (u *membershipUpdater) IsInvite() bool {
|
||||||
|
return u.membership == membershipStateInvite
|
||||||
|
}
|
||||||
|
|
||||||
|
// IsJoin implements types.MembershipUpdater
|
||||||
|
func (u *membershipUpdater) IsJoin() bool {
|
||||||
|
return u.membership == membershipStateJoin
|
||||||
|
}
|
||||||
|
|
||||||
|
// IsLeave implements types.MembershipUpdater
|
||||||
|
func (u *membershipUpdater) IsLeave() bool {
|
||||||
|
return u.membership == membershipStateLeaveOrBan
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetToInvite implements types.MembershipUpdater
|
||||||
|
func (u *membershipUpdater) SetToInvite(event gomatrixserverlib.Event) (bool, error) {
|
||||||
|
senderUserNID, err := u.d.assignStateKeyNID(u.txn, event.Sender())
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
inserted, err := u.d.statements.insertInviteEvent(
|
||||||
|
u.txn, event.EventID(), u.roomNID, u.targetUserNID, senderUserNID, event.JSON(),
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
if u.membership != membershipStateInvite {
|
||||||
|
if err = u.d.statements.updateMembership(
|
||||||
|
u.txn, u.roomNID, u.targetUserNID, senderUserNID, membershipStateInvite,
|
||||||
|
); err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return inserted, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetToJoin implements types.MembershipUpdater
|
||||||
|
func (u *membershipUpdater) SetToJoin(senderUserID string) ([]string, error) {
|
||||||
|
senderUserNID, err := u.d.assignStateKeyNID(u.txn, senderUserID)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
inviteEventIDs, err := u.d.statements.updateInviteRetired(
|
||||||
|
u.txn, u.roomNID, u.targetUserNID,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if u.membership != membershipStateJoin {
|
||||||
|
if err = u.d.statements.updateMembership(
|
||||||
|
u.txn, u.roomNID, u.targetUserNID, senderUserNID, membershipStateJoin,
|
||||||
|
); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return inviteEventIDs, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetToLeave implements types.MembershipUpdater
|
||||||
|
func (u *membershipUpdater) SetToLeave(senderUserID string) ([]string, error) {
|
||||||
|
senderUserNID, err := u.d.assignStateKeyNID(u.txn, senderUserID)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
inviteEventIDs, err := u.d.statements.updateInviteRetired(
|
||||||
|
u.txn, u.roomNID, u.targetUserNID,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if u.membership != membershipStateLeaveOrBan {
|
||||||
|
if err = u.d.statements.updateMembership(
|
||||||
|
u.txn, u.roomNID, u.targetUserNID, senderUserNID, membershipStateLeaveOrBan,
|
||||||
|
); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return inviteEventIDs, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type transaction struct {
|
||||||
|
txn *sql.Tx
|
||||||
|
}
|
||||||
|
|
||||||
|
// Commit implements types.Transaction
|
||||||
|
func (t *transaction) Commit() error {
|
||||||
|
return t.txn.Commit()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Rollback implements types.Transaction
|
||||||
|
func (t *transaction) Rollback() error {
|
||||||
|
return t.txn.Rollback()
|
||||||
|
}
|
||||||
|
|
|
@ -135,9 +135,17 @@ type StateEntryList struct {
|
||||||
StateEntries []StateEntry
|
StateEntries []StateEntry
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// A Transaction is something that can be committed or rolledback.
|
||||||
|
type Transaction interface {
|
||||||
|
// Commit the transaction
|
||||||
|
Commit() error
|
||||||
|
// Rollback the transaction.
|
||||||
|
Rollback() error
|
||||||
|
}
|
||||||
|
|
||||||
// A RoomRecentEventsUpdater is used to update the recent events in a room.
|
// A RoomRecentEventsUpdater is used to update the recent events in a room.
|
||||||
// (On postgresql this wraps a database transaction that holds a "FOR UPDATE"
|
// (On postgresql this wraps a database transaction that holds a "FOR UPDATE"
|
||||||
// lock on the row holding the latest events for the room.)
|
// lock on the row in the rooms table holding the latest events for the room.)
|
||||||
type RoomRecentEventsUpdater interface {
|
type RoomRecentEventsUpdater interface {
|
||||||
// The latest event IDs and state in the room.
|
// The latest event IDs and state in the room.
|
||||||
LatestEvents() []StateAtEventAndReference
|
LatestEvents() []StateAtEventAndReference
|
||||||
|
@ -163,10 +171,36 @@ type RoomRecentEventsUpdater interface {
|
||||||
HasEventBeenSent(eventNID EventNID) (bool, error)
|
HasEventBeenSent(eventNID EventNID) (bool, error)
|
||||||
// Mark the event as having been sent to the output logs.
|
// Mark the event as having been sent to the output logs.
|
||||||
MarkEventAsSent(eventNID EventNID) error
|
MarkEventAsSent(eventNID EventNID) error
|
||||||
// Commit the transaction
|
// Build a membership updater for the target user in this room.
|
||||||
Commit() error
|
// It will share the same transaction as this updater.
|
||||||
// Rollback the transaction.
|
MembershipUpdater(targetUserNID EventStateKeyNID) (MembershipUpdater, error)
|
||||||
Rollback() error
|
// Implements Transaction so it can be committed or rolledback
|
||||||
|
Transaction
|
||||||
|
}
|
||||||
|
|
||||||
|
// A MembershipUpdater is used to update the membership of a user in a room.
|
||||||
|
// (On postgresql this wraps a database transaction that holds a "FOR UPDATE"
|
||||||
|
// lock on the row in the membership table for this user in the room)
|
||||||
|
// The caller should call one of SetToInvite, SetToJoin or SetToLeave once to
|
||||||
|
// make the update, or none of them if no update is required.
|
||||||
|
type MembershipUpdater interface {
|
||||||
|
// True if the target user is invited to the room before updating.
|
||||||
|
IsInvite() bool
|
||||||
|
// True if the target user is joined to the room before updating.
|
||||||
|
IsJoin() bool
|
||||||
|
// True if the target user is not invited or joined to the room before updating.
|
||||||
|
IsLeave() bool
|
||||||
|
// Set the state to invite.
|
||||||
|
// Returns whether this invite needs to be sent
|
||||||
|
SetToInvite(event gomatrixserverlib.Event) (needsSending bool, err error)
|
||||||
|
// Set the state to join.
|
||||||
|
// Returns a list of invite event IDs that this state change retired.
|
||||||
|
SetToJoin(senderUserID string) (inviteEventIDs []string, err error)
|
||||||
|
// Set the state to leave.
|
||||||
|
// Returns a list of invite event IDs that this state change retired.
|
||||||
|
SetToLeave(senderUserID string) (inviteEventIDs []string, err error)
|
||||||
|
// Implements Transaction so it can be committed or rolledback.
|
||||||
|
Transaction
|
||||||
}
|
}
|
||||||
|
|
||||||
// A MissingEventError is an error that happened because the roomserver was
|
// A MissingEventError is an error that happened because the roomserver was
|
||||||
|
|
Loading…
Reference in New Issue