Deep checking of forward extremities (#1491)

* Deep forward extremity calculation

* Use updater txn

* Update error

* Update error

* Create previous event references in StoreEvent

* Use latest events updater to row-lock prev events

* Fix unexpected fallthrough

* Fix deadlock

* Don't roll back

* Update comments in calculateLatest

* Don't include events that we can't find references for in the forward extremities

* Add another passing test
main
Neil Alexander 2020-10-07 14:05:33 +01:00 committed by GitHub
parent f7c15071de
commit d821f9d3c9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 81 additions and 74 deletions

View File

@ -32,9 +32,6 @@ const (
// there was a new event that references an event that we don't
// have a copy of.
KindNew = 2
// KindBackfill event extend the contiguous graph going backwards.
// They always have state.
KindBackfill = 3
)
// DoNotSendToOtherServers tells us not to send the event to other matrix

View File

@ -54,7 +54,7 @@ func (r *Inputer) processRoomEvent(
}
var softfail bool
if input.Kind == api.KindBackfill || input.Kind == api.KindNew {
if input.Kind == api.KindNew {
// Check that the event passes authentication checks based on the
// current room state.
softfail, err = helpers.CheckForSoftFail(ctx, r.DB, headered, input.StateEventIDs)

View File

@ -28,6 +28,7 @@ import (
"github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"github.com/sirupsen/logrus"
)
// updateLatestEvents updates the list of latest events for this room in the database and writes the
@ -116,7 +117,6 @@ type latestEventsUpdater struct {
}
func (u *latestEventsUpdater) doUpdateLatestEvents() error {
prevEvents := u.event.PrevEvents()
u.lastEventIDSent = u.updater.LastEventIDSent()
u.oldStateNID = u.updater.CurrentStateSnapshotNID()
@ -140,30 +140,12 @@ func (u *latestEventsUpdater) doUpdateLatestEvents() error {
return nil
}
// Update the roomserver_previous_events table with references. This
// is effectively tracking the structure of the DAG.
if err = u.updater.StorePreviousEvents(u.stateAtEvent.EventNID, prevEvents); err != nil {
return fmt.Errorf("u.updater.StorePreviousEvents: %w", err)
}
// Get the event reference for our new event. This will be used when
// determining if the event is referenced by an existing event.
eventReference := u.event.EventReference()
// Check if our new event is already referenced by an existing event
// in the room. If it is then it isn't a latest event.
alreadyReferenced, err := u.updater.IsReferenced(eventReference)
if err != nil {
return fmt.Errorf("u.updater.IsReferenced: %w", err)
}
// Work out what the latest events are.
u.latest = calculateLatest(
// Work out what the latest events are. This will include the new
// event if it is not already referenced.
u.calculateLatest(
oldLatest,
alreadyReferenced,
prevEvents,
types.StateAtEventAndReference{
EventReference: eventReference,
EventReference: u.event.EventReference(),
StateAtEvent: u.stateAtEvent,
},
)
@ -215,27 +197,12 @@ func (u *latestEventsUpdater) latestState() error {
var err error
roomState := state.NewStateResolution(u.api.DB, *u.roomInfo)
// Get a list of the current room state events if available.
var currentState []types.StateEntry
if u.roomInfo.StateSnapshotNID != 0 {
currentState, _ = roomState.LoadStateAtSnapshot(u.ctx, u.roomInfo.StateSnapshotNID)
}
// Get a list of the current latest events. This will include both
// the current room state and the latest events after the input event.
// The idea is that we will perform state resolution on this set and
// any conflicting events will be resolved properly.
latestStateAtEvents := make([]types.StateAtEvent, len(u.latest)+len(currentState))
offset := 0
for i := range currentState {
latestStateAtEvents[i] = types.StateAtEvent{
BeforeStateSnapshotNID: u.roomInfo.StateSnapshotNID,
StateEntry: currentState[i],
}
offset++
}
// Get a list of the current latest events. This may or may not
// include the new event from the input path, depending on whether
// it is a forward extremity or not.
latestStateAtEvents := make([]types.StateAtEvent, len(u.latest))
for i := range u.latest {
latestStateAtEvents[offset+i] = u.latest[i].StateAtEvent
latestStateAtEvents[i] = u.latest[i].StateAtEvent
}
// Takes the NIDs of the latest events and creates a state snapshot
@ -266,6 +233,14 @@ func (u *latestEventsUpdater) latestState() error {
if err != nil {
return fmt.Errorf("roomState.DifferenceBetweenStateSnapshots: %w", err)
}
if len(u.removed) > len(u.added) {
// This really shouldn't happen.
// TODO: What is ultimately the best way to handle this situation?
return fmt.Errorf(
"invalid state delta wants to remove %d state but only add %d state (between state snapshots %d and %d)",
len(u.removed), len(u.added), u.oldStateNID, u.newStateNID,
)
}
// Also work out the state before the event removes and the event
// adds.
@ -279,42 +254,49 @@ func (u *latestEventsUpdater) latestState() error {
return nil
}
func calculateLatest(
func (u *latestEventsUpdater) calculateLatest(
oldLatest []types.StateAtEventAndReference,
alreadyReferenced bool,
prevEvents []gomatrixserverlib.EventReference,
newEvent types.StateAtEventAndReference,
) []types.StateAtEventAndReference {
var alreadyInLatest bool
) {
var newLatest []types.StateAtEventAndReference
// First of all, let's see if any of the existing forward extremities
// now have entries in the previous events table. If they do then we
// will no longer include them as forward extremities.
for _, l := range oldLatest {
keep := true
for _, prevEvent := range prevEvents {
if l.EventID == prevEvent.EventID && bytes.Equal(l.EventSHA256, prevEvent.EventSHA256) {
// This event can be removed from the latest events cause we've found an event that references it.
// (If an event is referenced by another event then it can't be one of the latest events in the room
// because we have an event that comes after it)
keep = false
break
}
}
if l.EventNID == newEvent.EventNID {
alreadyInLatest = true
}
if keep {
// Keep the event in the latest events.
referenced, err := u.updater.IsReferenced(l.EventReference)
if err != nil {
logrus.WithError(err).Errorf("Failed to retrieve event reference for %q", l.EventID)
} else if !referenced {
newLatest = append(newLatest, l)
}
}
if !alreadyReferenced && !alreadyInLatest {
// This event is not referenced by any of the events in the room
// and the event is not already in the latest events.
// Add it to the latest events
// Then check and see if our new event is already included in that set.
// This ordinarily won't happen but it covers the edge-case that we've
// already seen this event before and it's a forward extremity, so rather
// than adding a duplicate, we'll just return the set as complete.
for _, l := range newLatest {
if l.EventReference.EventID == newEvent.EventReference.EventID && bytes.Equal(l.EventReference.EventSHA256, newEvent.EventReference.EventSHA256) {
// We've already referenced this new event so we can just return
// the newly completed extremities at this point.
u.latest = newLatest
return
}
}
// At this point we've processed the old extremities, and we've checked
// that our new event isn't already in that set. Therefore now we can
// check if our *new* event is a forward extremity, and if it is, add
// it in.
referenced, err := u.updater.IsReferenced(newEvent.EventReference)
if err != nil {
logrus.WithError(err).Errorf("Failed to retrieve event reference for %q", newEvent.EventReference.EventID)
} else if !referenced {
newLatest = append(newLatest, newEvent)
}
return newLatest
u.latest = newLatest
}
func (u *latestEventsUpdater) makeOutputNewRoomEvent() (*api.OutputEvent, error) {

View File

@ -474,6 +474,32 @@ func (d *Database) StoreEvent(
return 0, types.StateAtEvent{}, nil, "", fmt.Errorf("d.Writer.Do: %w", err)
}
// We should attempt to update the previous events table with any
// references that this new event makes. We do this using a latest
// events updater because it somewhat works as a mutex, ensuring
// that there's a row-level lock on the latest room events (well,
// on Postgres at least).
var roomInfo *types.RoomInfo
var updater *LatestEventsUpdater
if prevEvents := event.PrevEvents(); len(prevEvents) > 0 {
roomInfo, err = d.RoomInfo(ctx, event.RoomID())
if err != nil {
return 0, types.StateAtEvent{}, nil, "", fmt.Errorf("d.RoomInfo: %w", err)
}
if roomInfo == nil && len(prevEvents) > 0 {
return 0, types.StateAtEvent{}, nil, "", fmt.Errorf("expected room %q to exist", event.RoomID())
}
updater, err = d.GetLatestEventsForUpdate(ctx, *roomInfo)
if err != nil {
return 0, types.StateAtEvent{}, nil, "", fmt.Errorf("NewLatestEventsUpdater: %w", err)
}
if err = updater.StorePreviousEvents(eventNID, prevEvents); err != nil {
return 0, types.StateAtEvent{}, nil, "", fmt.Errorf("updater.StorePreviousEvents: %w", err)
}
succeeded := true
err = sqlutil.EndTransaction(updater, &succeeded)
}
return roomNID, types.StateAtEvent{
BeforeStateSnapshotNID: stateNID,
StateEntry: types.StateEntry{
@ -483,7 +509,7 @@ func (d *Database) StoreEvent(
},
EventNID: eventNID,
},
}, redactionEvent, redactedEventID, nil
}, redactionEvent, redactedEventID, err
}
func (d *Database) PublishRoom(ctx context.Context, roomID string, publish bool) error {

View File

@ -473,4 +473,6 @@ Inbound federation rejects invites which include invalid JSON for room version 6
Inbound federation rejects invite rejections which include invalid JSON for room version 6
GET /capabilities is present and well formed for registered user
m.room.history_visibility == "joined" allows/forbids appropriately for Guest users
m.room.history_visibility == "joined" allows/forbids appropriately for Real users
m.room.history_visibility == "joined" allows/forbids appropriately for Real users
Users cannot kick users who have already left a room
A prev_batch token from incremental sync can be used in the v1 messages API