Don't re-request state events that are already in the timeline (#1739)

* Don't request state events if we already have the timeline events (Postgres only)

* Rename variable

* nocyclo

* Add SQLite

* Tweaks

* Revert query change

* Don't dedupe if asking for full state

* Update query
main
Neil Alexander 2021-02-04 12:20:37 +00:00 committed by GitHub
parent 6099379ea4
commit 6e44450cc9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 45 additions and 23 deletions

View File

@ -35,7 +35,7 @@ type Database interface {
MaxStreamPositionForAccountData(ctx context.Context) (types.StreamPosition, error)
MaxStreamPositionForSendToDeviceMessages(ctx context.Context) (types.StreamPosition, error)
CurrentState(ctx context.Context, roomID string, stateFilterPart *gomatrixserverlib.StateFilter) ([]*gomatrixserverlib.HeaderedEvent, error)
CurrentState(ctx context.Context, roomID string, stateFilterPart *gomatrixserverlib.StateFilter, excludeEventIDs []string) ([]*gomatrixserverlib.HeaderedEvent, error)
GetStateDeltasForFullStateSync(ctx context.Context, device *userapi.Device, r types.Range, userID string, stateFilter *gomatrixserverlib.StateFilter) ([]types.StateDelta, []string, error)
GetStateDeltas(ctx context.Context, device *userapi.Device, r types.Range, userID string, stateFilter *gomatrixserverlib.StateFilter) ([]types.StateDelta, []string, error)
RoomIDsWithMembership(ctx context.Context, userID string, membership string) ([]string, error)

View File

@ -84,7 +84,8 @@ const selectCurrentStateSQL = "" +
" AND ( $4::text[] IS NULL OR type LIKE ANY($4) )" +
" AND ( $5::text[] IS NULL OR NOT(type LIKE ANY($5)) )" +
" AND ( $6::bool IS NULL OR contains_url = $6 )" +
" LIMIT $7"
" AND (event_id = ANY($7)) IS NOT TRUE" +
" LIMIT $8"
const selectJoinedUsersSQL = "" +
"SELECT room_id, state_key FROM syncapi_current_room_state WHERE type = 'm.room.member' AND membership = 'join'"
@ -197,6 +198,7 @@ func (s *currentRoomStateStatements) SelectRoomIDsWithMembership(
func (s *currentRoomStateStatements) SelectCurrentState(
ctx context.Context, txn *sql.Tx, roomID string,
stateFilter *gomatrixserverlib.StateFilter,
excludeEventIDs []string,
) ([]*gomatrixserverlib.HeaderedEvent, error) {
stmt := sqlutil.TxStmt(txn, s.selectCurrentStateStmt)
rows, err := stmt.QueryContext(ctx, roomID,
@ -205,6 +207,7 @@ func (s *currentRoomStateStatements) SelectCurrentState(
pq.StringArray(filterConvertTypeWildcardToSQL(stateFilter.Types)),
pq.StringArray(filterConvertTypeWildcardToSQL(stateFilter.NotTypes)),
stateFilter.ContainsURL,
pq.StringArray(excludeEventIDs),
stateFilter.Limit,
)
if err != nil {

View File

@ -103,8 +103,8 @@ func (d *Database) MaxStreamPositionForAccountData(ctx context.Context) (types.S
return types.StreamPosition(id), nil
}
func (d *Database) CurrentState(ctx context.Context, roomID string, stateFilterPart *gomatrixserverlib.StateFilter) ([]*gomatrixserverlib.HeaderedEvent, error) {
return d.CurrentRoomState.SelectCurrentState(ctx, nil, roomID, stateFilterPart)
func (d *Database) CurrentState(ctx context.Context, roomID string, stateFilterPart *gomatrixserverlib.StateFilter, excludeEventIDs []string) ([]*gomatrixserverlib.HeaderedEvent, error) {
return d.CurrentRoomState.SelectCurrentState(ctx, nil, roomID, stateFilterPart, excludeEventIDs)
}
func (d *Database) RoomIDsWithMembership(ctx context.Context, userID string, membership string) ([]string, error) {
@ -195,7 +195,7 @@ func (d *Database) GetStateEvent(
func (d *Database) GetStateEventsForRoom(
ctx context.Context, roomID string, stateFilter *gomatrixserverlib.StateFilter,
) (stateEvents []*gomatrixserverlib.HeaderedEvent, err error) {
stateEvents, err = d.CurrentRoomState.SelectCurrentState(ctx, nil, roomID, stateFilter)
stateEvents, err = d.CurrentRoomState.SelectCurrentState(ctx, nil, roomID, stateFilter, nil)
return
}
@ -870,7 +870,7 @@ func (d *Database) currentStateStreamEventsForRoom(
ctx context.Context, txn *sql.Tx, roomID string,
stateFilter *gomatrixserverlib.StateFilter,
) ([]types.StreamEvent, error) {
allState, err := d.CurrentRoomState.SelectCurrentState(ctx, txn, roomID, stateFilter)
allState, err := d.CurrentRoomState.SelectCurrentState(ctx, txn, roomID, stateFilter, nil)
if err != nil {
return nil, err
}

View File

@ -178,6 +178,7 @@ func (s *currentRoomStateStatements) SelectRoomIDsWithMembership(
func (s *currentRoomStateStatements) SelectCurrentState(
ctx context.Context, txn *sql.Tx, roomID string,
stateFilter *gomatrixserverlib.StateFilter,
excludeEventIDs []string,
) ([]*gomatrixserverlib.HeaderedEvent, error) {
stmt, params, err := prepareWithFilters(
s.db, txn, selectCurrentStateSQL,
@ -186,7 +187,7 @@ func (s *currentRoomStateStatements) SelectCurrentState(
},
stateFilter.Senders, stateFilter.NotSenders,
stateFilter.Types, stateFilter.NotTypes,
stateFilter.Limit, FilterOrderNone,
excludeEventIDs, stateFilter.Limit, FilterOrderNone,
)
if err != nil {
return nil, fmt.Errorf("s.prepareWithFilters: %w", err)

View File

@ -23,9 +23,10 @@ const (
// fields might come from either a StateFilter or an EventFilter,
// and it's easier just to have the caller extract the relevant
// parts.
// nolint:gocyclo
func prepareWithFilters(
db *sql.DB, txn *sql.Tx, query string, params []interface{},
senders, notsenders, types, nottypes []string,
senders, notsenders, types, nottypes []string, excludeEventIDs []string,
limit int, order FilterOrder,
) (*sql.Stmt, []interface{}, error) {
offset := len(params)
@ -53,6 +54,12 @@ func prepareWithFilters(
params, offset = append(params, v), offset+1
}
}
if count := len(excludeEventIDs); count > 0 {
query += " AND event_id NOT IN " + sqlutil.QueryVariadicOffset(count, offset)
for _, v := range excludeEventIDs {
params, offset = append(params, v), offset+1
}
}
switch order {
case FilterOrderAsc:
query += " ORDER BY id ASC"

View File

@ -150,7 +150,7 @@ func (s *outputRoomEventsStatements) SelectStateInRange(
},
stateFilter.Senders, stateFilter.NotSenders,
stateFilter.Types, stateFilter.NotTypes,
stateFilter.Limit, FilterOrderAsc,
nil, stateFilter.Limit, FilterOrderAsc,
)
if err != nil {
return nil, nil, fmt.Errorf("s.prepareWithFilters: %w", err)
@ -326,7 +326,7 @@ func (s *outputRoomEventsStatements) SelectRecentEvents(
},
eventFilter.Senders, eventFilter.NotSenders,
eventFilter.Types, eventFilter.NotTypes,
eventFilter.Limit+1, FilterOrderDesc,
nil, eventFilter.Limit+1, FilterOrderDesc,
)
if err != nil {
return nil, false, fmt.Errorf("s.prepareWithFilters: %w", err)
@ -374,7 +374,7 @@ func (s *outputRoomEventsStatements) SelectEarlyEvents(
},
eventFilter.Senders, eventFilter.NotSenders,
eventFilter.Types, eventFilter.NotTypes,
eventFilter.Limit, FilterOrderAsc,
nil, eventFilter.Limit, FilterOrderAsc,
)
if err != nil {
return nil, fmt.Errorf("s.prepareWithFilters: %w", err)

View File

@ -91,7 +91,7 @@ type CurrentRoomState interface {
DeleteRoomStateByEventID(ctx context.Context, txn *sql.Tx, eventID string) error
DeleteRoomStateForRoom(ctx context.Context, txn *sql.Tx, roomID string) error
// SelectCurrentState returns all the current state events for the given room.
SelectCurrentState(ctx context.Context, txn *sql.Tx, roomID string, stateFilter *gomatrixserverlib.StateFilter) ([]*gomatrixserverlib.HeaderedEvent, error)
SelectCurrentState(ctx context.Context, txn *sql.Tx, roomID string, stateFilter *gomatrixserverlib.StateFilter, excludeEventIDs []string) ([]*gomatrixserverlib.HeaderedEvent, error)
// SelectRoomIDsWithMembership returns the list of room IDs which have the given user in the given membership state.
SelectRoomIDsWithMembership(ctx context.Context, txn *sql.Tx, userID string, membership string) ([]string, error)
// SelectJoinedUsers returns a map of room ID to a list of joined user IDs.

View File

@ -98,7 +98,7 @@ func (p *PDUStreamProvider) CompleteSync(
var jr *types.JoinResponse
jr, err = p.getJoinResponseForCompleteSync(
ctx, roomID, r, &stateFilter, &eventFilter, req.Device,
ctx, roomID, r, &stateFilter, &eventFilter, req.WantFullState, req.Device,
)
if err != nil {
req.Log.WithError(err).Error("p.getJoinResponseForCompleteSync failed")
@ -124,7 +124,7 @@ func (p *PDUStreamProvider) CompleteSync(
if !peek.Deleted {
var jr *types.JoinResponse
jr, err = p.getJoinResponseForCompleteSync(
ctx, peek.RoomID, r, &stateFilter, &eventFilter, req.Device,
ctx, peek.RoomID, r, &stateFilter, &eventFilter, req.WantFullState, req.Device,
)
if err != nil {
req.Log.WithError(err).Error("p.getJoinResponseForCompleteSync failed")
@ -254,26 +254,37 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
return nil
}
// nolint:gocyclo
func (p *PDUStreamProvider) getJoinResponseForCompleteSync(
ctx context.Context,
roomID string,
r types.Range,
stateFilter *gomatrixserverlib.StateFilter,
eventFilter *gomatrixserverlib.RoomEventFilter,
wantFullState bool,
device *userapi.Device,
) (jr *types.JoinResponse, err error) {
var stateEvents []*gomatrixserverlib.HeaderedEvent
stateEvents, err = p.DB.CurrentState(ctx, roomID, stateFilter)
// TODO: When filters are added, we may need to call this multiple times to get enough events.
// See: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L316
recentStreamEvents, limited, err := p.DB.RecentEvents(
ctx, roomID, r, eventFilter, true, true,
)
if err != nil {
return
}
// TODO: When filters are added, we may need to call this multiple times to get enough events.
// See: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L316
var recentStreamEvents []types.StreamEvent
var limited bool
recentStreamEvents, limited, err = p.DB.RecentEvents(
ctx, roomID, r, eventFilter, true, true,
)
// Get the event IDs of the stream events we fetched. There's no point in us
var excludingEventIDs []string
if !wantFullState {
excludingEventIDs = make([]string, 0, len(recentStreamEvents))
for _, event := range recentStreamEvents {
if event.StateKey() != nil {
excludingEventIDs = append(excludingEventIDs, event.EventID())
}
}
}
stateEvents, err := p.DB.CurrentState(ctx, roomID, stateFilter, excludingEventIDs)
if err != nil {
return
}