6942ee1de0
* Upgrade gomatrixserverlib dependency Signed-off-by: Thibaut CHARLES cromfr@gmail.com * Added missing passing sytest Signed-off-by: Thibaut CHARLES cromfr@gmail.com * Fix login using identifier key Not a full fix, it only really supports logging in with the localpart of an mxid. Signed-off-by: Serra Allgood <serra@allgood.dev> * Replace deprecated prometheus.InstrumentHandler and unsafe time.Ticker * goimports * re-add temporarily missing deps? * Refactor InstrumentHandlerCounter definition * URL decode args * Return server names (#833) * Remove unnecessary map->array processing * Return server names in room federation directory query * Knock off a TODO * Fix /send_join and /send_leave (#821) Fix the /send_join and /send_leave endpoints, so that they use the v2 endpoints as mandated by MSC1802. Also comment out the SyTest tests that are failing because of lack of support for the v1 endpoints. * Refuse /send_join without m.room.create (#824) Signed-off-by: Abhishek Kumar <abhishekkumar2718@gmail.com> * AS should use the v1 endpoint, rather than r0 (#827) * docker: Passthrough parameters to dendrite-monolith-server * Fix copy & paste error (#812) * Use gomatrixserverlib.Transaction instead of local type (#590) (#811) * Move files back if linting fails (#810) * replaced gometalinter description with golangci-lint (#837) * Amend syncapi SQL queries to return missing columns (#840) * This commit updates a couple of the syncapi SQL queries to return additional columns that are required/expected by rowsToStreamEvents in output_room_events_table.go. It's not exactly clear to me yet what transaction_id and session_id do, but these being added n #367 results in state events breaking the /sync endpoint. This is a temporary fix. We need to come up with a better solution. * gomatrix to gomatrixserverlib on some weird line change * Tweaks from @babolivier review comments * Implement storage interfaces (#841) * Implement interfaces for federationsender storage * Implement interfaces for mediaapi storage * Implement interfaces for publicroomsapi storage * Implement interfaces for roomserver storage * Implement interfaces for syncapi storage * Implement interfaces for keydb storage * common.PartitionStorer in publicroomsapi interface * Update copyright notices * make cmd directory path absolute in build.sh (#830) * Resync testfile with current sytest pass/fail (#832) * Resync testfile with current sytest pass/fail * Add displayname test * Fall back to postgres when database connection string parsing fails (#842) * Fall back to postgres when parsing the database connection string for a URI schema fails * Fix behaviour so that it really tries postgres when URL parsing fails and it complains about unknown schema if it succeeds * Fix #842 * Fix #842 - again... * Federation fixes (#845) * Update gomatrixserverlib to p2p commit 92c0338, other tweaks * Update gomatrixserverlib to p2p commit e5dcc65 * Rewrite getAuthChain * Update gomatrixserverlib in go.mod/go.sum * Correct a couple of package refs for updated gmsl/gomatrix * Update gomatrixserverlib ref in go.mod/go.sum * Update getAuthChain comments following @babolivier review * Add a Sytest blacklist file (#849) * Add more passing tests to the testfile, add test blacklist file (#848) * CS API: Support for /messages, fixes for /sync (#847) * Merge forward * Tidy up a bit * TODO: What to do with NextBatch here? * Replace SyncPosition with PaginationToken throughout syncapi * Fix PaginationTokens * Fix lint errors * Add a couple of missing functions into the syncapi external storage interface * Some updates based on review comments from @babolivier * Some updates based on review comments from @babolivier * argh whitespacing * Fix opentracing span * Remove dead code * Don't overshadow err (fix lint issue) * Handle extremities after inserting event into topology * Try insert event topology as ON CONFLICT DO NOTHING * Prevent OOB error in addRoomDeltaToResponse * Thwarted by gocyclo again * Fix NewPaginationTokenFromString, define unit test for it * Update pagination token test * Update sytest-whitelist * Hopefully fix some of the sync batch tokens * Remove extraneous sync position func * Revert to topology tokens in addRoomDeltaToResponse etc * Fix typo * Remove prevPDUPos as dead now that backwardTopologyPos is used instead * Fix selectEventsWithEventIDsSQL * Update sytest-blacklist * Update sytest-whitelist * Some fixes for #847 (#850) * Fix a couple of cases where backfilling events we already had causes panics, hopefully fix ordering of events, update GMSL dependency for backfill URL fixes * Remove commented out lines from output_room_events_table schema * Wire up publicroomsapi for roomserver events (#851) * Wire up publicroomsapi to roomserver events * Remove parameter that was incorrectly brought over from p2p work * nolint containsBackwardExtremity for now * Store our own keys in the keydb (#853) * Store our own keys in the keydb The DirectKeyFetcher makes the assumption that you can always reach the key/v2/server endpoint of any server, including our own. We previously haven't bothered to store our own keys in the keydb so this would mean we end up making key requests to ourselves. In the libp2p world as an example, self-dialling is not possible, therefore this would render it impossible to get our own keys. This commit adds our own keys into the keydb so that we don't create unnecessarily (and maybe impossible) requests. * Use golang.org/x/crypto/ed25519 instead of crypto/ed25519 for pre-Go 1.13 * More sync fixes (#854) * Further sync tweaks * Remove unnecessary blank line * getBackwardTopologyPos always returns a usable value * Revert order fixing * Implement GET endpoints for account_data in clientapi (#861) * Implement GET endpoints for account_data in clientapi * Fix accountDB parameter * Remove fmt.Println * Add empty push rules into account data on account creation (#862) * Handle kind=guest query parameter on /register (#860) * Handle kind=guest query parameter on /register * Reorganized imports * Pass device_id as nil * Added tests to systest-whitelist * Update sytest-whitelist * Blacklist 'displayname updates affect room member events' (#859) * Room version abstractions (#865) * Rough first pass at adding room version abstractions * Define newer room versions * Update room version metadata * Fix roomserver/versions * Try to fix whitespace in roomsSchema * Implement room version capabilities in CS API (#866) * Add wiring for querying the roomserver for the default room version * Try to implement /capabilities for room versions * Update copyright notices * Update sytests, add /capabilities endpoint into CS API * Update sytest-whitelist * Add GetDefaultRoomVersion * Fix cases where state package was shadowed * Fix version formatting * Update Dockerfile to Go 1.13.6 * oh yes types I remember * And fix the default too * Update documentation for Go 1.13 (#867) * Pass cfg by reference around the codebase (#819) * Pass cfg by reference around the codebase * Merge branch 'master' into pass-cfg-by-ref Co-authored-by: Neil Alexander <neilalexander@users.noreply.github.com> * Implement missing device management features (#835) * Implement missing device management features Signed-off-by: Till Faelligen <tfaelligen@gmail.com> * Add a little more documentation * Undo changes * Use non-anonymous struct to decode devices list * Update sytest-whitelist * Update sytest-whitelist * Update sytest-blacklist Co-authored-by: Neil Alexander <neilalexander@users.noreply.github.com> * Adding sslmode: disable to sytest server config (#813) Co-authored-by: Neil Alexander <neilalexander@users.noreply.github.com> * Fix AppService bind addrs in test (#805) Co-authored-by: Neil Alexander <neilalexander@users.noreply.github.com> * Always defer *sql.Rows.Close and consult with Err (#844) * Always defer *sql.Rows.Close and consult with Err database/sql.Rows.Next() makes sure to call Close only after exhausting result rows which would NOT happen when returning early from a bad Scan. Close being idempotent makes it a great candidate to get always deferred regardless of what happens later on the result set. This change also makes sure call Err() after exhausting Next() and propagate non-nil results from it as the documentation advises. Closes #764 Signed-off-by: Kiril Vladimiroff <kiril@vladimiroff.org> * Override named result parameters in last returns Signed-off-by: Kiril Vladimiroff <kiril@vladimiroff.org> * Do the same over new changes that got merged Signed-off-by: Kiril Vladimiroff <kiril@vladimiroff.org> Co-authored-by: Neil Alexander <neilalexander@users.noreply.github.com> * Clean up Co-authored-by: Serra Allgood <serra@allgood.dev> Co-authored-by: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com> Co-authored-by: Neil Alexander <neilalexander@users.noreply.github.com> Co-authored-by: Brendan Abolivier <github@brendanabolivier.com> Co-authored-by: Abhishek Kumar <31231064+abhishekkumar2718@users.noreply.github.com> Co-authored-by: Will Hunt <will@half-shot.uk> Co-authored-by: S7evinK <tfaelligen@gmail.com> Co-authored-by: Arshpreet <30545756+arsh-7@users.noreply.github.com> Co-authored-by: Prateek Sachan <42961174+prateek2211@users.noreply.github.com> Co-authored-by: Behouba Manassé <behouba@gmail.com> Co-authored-by: aditsachde <23707194+aditsachde@users.noreply.github.com> Co-authored-by: Kiril Vladimiroff <kiril@vladimiroff.org>
396 lines
14 KiB
Go
396 lines
14 KiB
Go
// Copyright 2017-2018 New Vector Ltd
|
|
// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package postgres
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"encoding/json"
|
|
"sort"
|
|
|
|
"github.com/matrix-org/dendrite/roomserver/api"
|
|
"github.com/matrix-org/dendrite/syncapi/types"
|
|
|
|
"github.com/lib/pq"
|
|
"github.com/matrix-org/dendrite/common"
|
|
"github.com/matrix-org/gomatrixserverlib"
|
|
log "github.com/sirupsen/logrus"
|
|
)
|
|
|
|
const outputRoomEventsSchema = `
|
|
-- This sequence is shared between all the tables generated from kafka logs.
|
|
CREATE SEQUENCE IF NOT EXISTS syncapi_stream_id;
|
|
|
|
-- Stores output room events received from the roomserver.
|
|
CREATE TABLE IF NOT EXISTS syncapi_output_room_events (
|
|
-- An incrementing ID which denotes the position in the log that this event resides at.
|
|
-- NB: 'serial' makes no guarantees to increment by 1 every time, only that it increments.
|
|
-- This isn't a problem for us since we just want to order by this field.
|
|
id BIGINT PRIMARY KEY DEFAULT nextval('syncapi_stream_id'),
|
|
-- The event ID for the event
|
|
event_id TEXT NOT NULL CONSTRAINT syncapi_event_id_idx UNIQUE,
|
|
-- The 'room_id' key for the event.
|
|
room_id TEXT NOT NULL,
|
|
-- The JSON for the event. Stored as TEXT because this should be valid UTF-8.
|
|
event_json TEXT NOT NULL,
|
|
-- The event type e.g 'm.room.member'.
|
|
type TEXT NOT NULL,
|
|
-- The 'sender' property of the event.
|
|
sender TEXT NOT NULL,
|
|
-- true if the event content contains a url key.
|
|
contains_url BOOL NOT NULL,
|
|
-- A list of event IDs which represent a delta of added/removed room state. This can be NULL
|
|
-- if there is no delta.
|
|
add_state_ids TEXT[],
|
|
remove_state_ids TEXT[],
|
|
-- The client session that sent the event, if any
|
|
session_id BIGINT,
|
|
-- The transaction id used to send the event, if any
|
|
transaction_id TEXT,
|
|
-- Should the event be excluded from responses to /sync requests. Useful for
|
|
-- events retrieved through backfilling that have a position in the stream
|
|
-- that relates to the moment these were retrieved rather than the moment these
|
|
-- were emitted.
|
|
exclude_from_sync BOOL DEFAULT FALSE
|
|
);
|
|
`
|
|
|
|
const insertEventSQL = "" +
|
|
"INSERT INTO syncapi_output_room_events (" +
|
|
"room_id, event_id, event_json, type, sender, contains_url, add_state_ids, remove_state_ids, session_id, transaction_id, exclude_from_sync" +
|
|
") VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) " +
|
|
"ON CONFLICT ON CONSTRAINT syncapi_event_id_idx DO UPDATE SET exclude_from_sync = $11 " +
|
|
"RETURNING id"
|
|
|
|
const selectEventsSQL = "" +
|
|
"SELECT id, event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events WHERE event_id = ANY($1)"
|
|
|
|
const selectRecentEventsSQL = "" +
|
|
"SELECT id, event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" +
|
|
" WHERE room_id = $1 AND id > $2 AND id <= $3" +
|
|
" ORDER BY id DESC LIMIT $4"
|
|
|
|
const selectRecentEventsForSyncSQL = "" +
|
|
"SELECT id, event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" +
|
|
" WHERE room_id = $1 AND id > $2 AND id <= $3 AND exclude_from_sync = FALSE" +
|
|
" ORDER BY id DESC LIMIT $4"
|
|
|
|
const selectEarlyEventsSQL = "" +
|
|
"SELECT id, event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" +
|
|
" WHERE room_id = $1 AND id > $2 AND id <= $3" +
|
|
" ORDER BY id ASC LIMIT $4"
|
|
|
|
const selectMaxEventIDSQL = "" +
|
|
"SELECT MAX(id) FROM syncapi_output_room_events"
|
|
|
|
// In order for us to apply the state updates correctly, rows need to be ordered in the order they were received (id).
|
|
const selectStateInRangeSQL = "" +
|
|
"SELECT id, event_json, exclude_from_sync, add_state_ids, remove_state_ids" +
|
|
" FROM syncapi_output_room_events" +
|
|
" WHERE (id > $1 AND id <= $2) AND (add_state_ids IS NOT NULL OR remove_state_ids IS NOT NULL)" +
|
|
" AND ( $3::text[] IS NULL OR sender = ANY($3) )" +
|
|
" AND ( $4::text[] IS NULL OR NOT(sender = ANY($4)) )" +
|
|
" AND ( $5::text[] IS NULL OR type LIKE ANY($5) )" +
|
|
" AND ( $6::text[] IS NULL OR NOT(type LIKE ANY($6)) )" +
|
|
" AND ( $7::bool IS NULL OR contains_url = $7 )" +
|
|
" ORDER BY id ASC" +
|
|
" LIMIT $8"
|
|
|
|
type outputRoomEventsStatements struct {
|
|
insertEventStmt *sql.Stmt
|
|
selectEventsStmt *sql.Stmt
|
|
selectMaxEventIDStmt *sql.Stmt
|
|
selectRecentEventsStmt *sql.Stmt
|
|
selectRecentEventsForSyncStmt *sql.Stmt
|
|
selectEarlyEventsStmt *sql.Stmt
|
|
selectStateInRangeStmt *sql.Stmt
|
|
}
|
|
|
|
func (s *outputRoomEventsStatements) prepare(db *sql.DB) (err error) {
|
|
_, err = db.Exec(outputRoomEventsSchema)
|
|
if err != nil {
|
|
return
|
|
}
|
|
if s.insertEventStmt, err = db.Prepare(insertEventSQL); err != nil {
|
|
return
|
|
}
|
|
if s.selectEventsStmt, err = db.Prepare(selectEventsSQL); err != nil {
|
|
return
|
|
}
|
|
if s.selectMaxEventIDStmt, err = db.Prepare(selectMaxEventIDSQL); err != nil {
|
|
return
|
|
}
|
|
if s.selectRecentEventsStmt, err = db.Prepare(selectRecentEventsSQL); err != nil {
|
|
return
|
|
}
|
|
if s.selectRecentEventsForSyncStmt, err = db.Prepare(selectRecentEventsForSyncSQL); err != nil {
|
|
return
|
|
}
|
|
if s.selectEarlyEventsStmt, err = db.Prepare(selectEarlyEventsSQL); err != nil {
|
|
return
|
|
}
|
|
if s.selectStateInRangeStmt, err = db.Prepare(selectStateInRangeSQL); err != nil {
|
|
return
|
|
}
|
|
return
|
|
}
|
|
|
|
// selectStateInRange returns the state events between the two given PDU stream positions, exclusive of oldPos, inclusive of newPos.
|
|
// Results are bucketed based on the room ID. If the same state is overwritten multiple times between the
|
|
// two positions, only the most recent state is returned.
|
|
func (s *outputRoomEventsStatements) selectStateInRange(
|
|
ctx context.Context, txn *sql.Tx, oldPos, newPos types.StreamPosition,
|
|
stateFilter *gomatrixserverlib.StateFilter,
|
|
) (map[string]map[string]bool, map[string]types.StreamEvent, error) {
|
|
stmt := common.TxStmt(txn, s.selectStateInRangeStmt)
|
|
|
|
rows, err := stmt.QueryContext(
|
|
ctx, oldPos, newPos,
|
|
pq.StringArray(stateFilter.Senders),
|
|
pq.StringArray(stateFilter.NotSenders),
|
|
pq.StringArray(filterConvertTypeWildcardToSQL(stateFilter.Types)),
|
|
pq.StringArray(filterConvertTypeWildcardToSQL(stateFilter.NotTypes)),
|
|
stateFilter.ContainsURL,
|
|
stateFilter.Limit,
|
|
)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
defer rows.Close() // nolint: errcheck
|
|
// Fetch all the state change events for all rooms between the two positions then loop each event and:
|
|
// - Keep a cache of the event by ID (99% of state change events are for the event itself)
|
|
// - For each room ID, build up an array of event IDs which represents cumulative adds/removes
|
|
// For each room, map cumulative event IDs to events and return. This may need to a batch SELECT based on event ID
|
|
// if they aren't in the event ID cache. We don't handle state deletion yet.
|
|
eventIDToEvent := make(map[string]types.StreamEvent)
|
|
|
|
// RoomID => A set (map[string]bool) of state event IDs which are between the two positions
|
|
stateNeeded := make(map[string]map[string]bool)
|
|
|
|
for rows.Next() {
|
|
var (
|
|
streamPos types.StreamPosition
|
|
eventBytes []byte
|
|
excludeFromSync bool
|
|
addIDs pq.StringArray
|
|
delIDs pq.StringArray
|
|
)
|
|
if err := rows.Scan(&streamPos, &eventBytes, &excludeFromSync, &addIDs, &delIDs); err != nil {
|
|
return nil, nil, err
|
|
}
|
|
// Sanity check for deleted state and whine if we see it. We don't need to do anything
|
|
// since it'll just mark the event as not being needed.
|
|
if len(addIDs) < len(delIDs) {
|
|
log.WithFields(log.Fields{
|
|
"since": oldPos,
|
|
"current": newPos,
|
|
"adds": addIDs,
|
|
"dels": delIDs,
|
|
}).Warn("StateBetween: ignoring deleted state")
|
|
}
|
|
|
|
// TODO: Handle redacted events
|
|
ev, err := gomatrixserverlib.NewEventFromTrustedJSON(eventBytes, false)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
needSet := stateNeeded[ev.RoomID()]
|
|
if needSet == nil { // make set if required
|
|
needSet = make(map[string]bool)
|
|
}
|
|
for _, id := range delIDs {
|
|
needSet[id] = false
|
|
}
|
|
for _, id := range addIDs {
|
|
needSet[id] = true
|
|
}
|
|
stateNeeded[ev.RoomID()] = needSet
|
|
|
|
eventIDToEvent[ev.EventID()] = types.StreamEvent{
|
|
Event: ev,
|
|
StreamPosition: streamPos,
|
|
ExcludeFromSync: excludeFromSync,
|
|
}
|
|
}
|
|
|
|
return stateNeeded, eventIDToEvent, rows.Err()
|
|
}
|
|
|
|
// MaxID returns the ID of the last inserted event in this table. 'txn' is optional. If it is not supplied,
|
|
// then this function should only ever be used at startup, as it will race with inserting events if it is
|
|
// done afterwards. If there are no inserted events, 0 is returned.
|
|
func (s *outputRoomEventsStatements) selectMaxEventID(
|
|
ctx context.Context, txn *sql.Tx,
|
|
) (id int64, err error) {
|
|
var nullableID sql.NullInt64
|
|
stmt := common.TxStmt(txn, s.selectMaxEventIDStmt)
|
|
err = stmt.QueryRowContext(ctx).Scan(&nullableID)
|
|
if nullableID.Valid {
|
|
id = nullableID.Int64
|
|
}
|
|
return
|
|
}
|
|
|
|
// InsertEvent into the output_room_events table. addState and removeState are an optional list of state event IDs. Returns the position
|
|
// of the inserted event.
|
|
func (s *outputRoomEventsStatements) insertEvent(
|
|
ctx context.Context, txn *sql.Tx,
|
|
event *gomatrixserverlib.Event, addState, removeState []string,
|
|
transactionID *api.TransactionID, excludeFromSync bool,
|
|
) (streamPos types.StreamPosition, err error) {
|
|
var txnID *string
|
|
var sessionID *int64
|
|
if transactionID != nil {
|
|
sessionID = &transactionID.SessionID
|
|
txnID = &transactionID.TransactionID
|
|
}
|
|
|
|
// Parse content as JSON and search for an "url" key
|
|
containsURL := false
|
|
var content map[string]interface{}
|
|
if json.Unmarshal(event.Content(), &content) != nil {
|
|
// Set containsURL to true if url is present
|
|
_, containsURL = content["url"]
|
|
}
|
|
|
|
stmt := common.TxStmt(txn, s.insertEventStmt)
|
|
err = stmt.QueryRowContext(
|
|
ctx,
|
|
event.RoomID(),
|
|
event.EventID(),
|
|
event.JSON(),
|
|
event.Type(),
|
|
event.Sender(),
|
|
containsURL,
|
|
pq.StringArray(addState),
|
|
pq.StringArray(removeState),
|
|
sessionID,
|
|
txnID,
|
|
excludeFromSync,
|
|
).Scan(&streamPos)
|
|
return
|
|
}
|
|
|
|
// selectRecentEvents returns the most recent events in the given room, up to a maximum of 'limit'.
|
|
// If onlySyncEvents has a value of true, only returns the events that aren't marked as to exclude
|
|
// from sync.
|
|
func (s *outputRoomEventsStatements) selectRecentEvents(
|
|
ctx context.Context, txn *sql.Tx,
|
|
roomID string, fromPos, toPos types.StreamPosition, limit int,
|
|
chronologicalOrder bool, onlySyncEvents bool,
|
|
) ([]types.StreamEvent, error) {
|
|
var stmt *sql.Stmt
|
|
if onlySyncEvents {
|
|
stmt = common.TxStmt(txn, s.selectRecentEventsForSyncStmt)
|
|
} else {
|
|
stmt = common.TxStmt(txn, s.selectRecentEventsStmt)
|
|
}
|
|
|
|
rows, err := stmt.QueryContext(ctx, roomID, fromPos, toPos, limit)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close() // nolint: errcheck
|
|
events, err := rowsToStreamEvents(rows)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if chronologicalOrder {
|
|
// The events need to be returned from oldest to latest, which isn't
|
|
// necessary the way the SQL query returns them, so a sort is necessary to
|
|
// ensure the events are in the right order in the slice.
|
|
sort.SliceStable(events, func(i int, j int) bool {
|
|
return events[i].StreamPosition < events[j].StreamPosition
|
|
})
|
|
}
|
|
return events, nil
|
|
}
|
|
|
|
// selectEarlyEvents returns the earliest events in the given room, starting
|
|
// from a given position, up to a maximum of 'limit'.
|
|
func (s *outputRoomEventsStatements) selectEarlyEvents(
|
|
ctx context.Context, txn *sql.Tx,
|
|
roomID string, fromPos, toPos types.StreamPosition, limit int,
|
|
) ([]types.StreamEvent, error) {
|
|
stmt := common.TxStmt(txn, s.selectEarlyEventsStmt)
|
|
rows, err := stmt.QueryContext(ctx, roomID, fromPos, toPos, limit)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close() // nolint: errcheck
|
|
events, err := rowsToStreamEvents(rows)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
// The events need to be returned from oldest to latest, which isn't
|
|
// necessarily the way the SQL query returns them, so a sort is necessary to
|
|
// ensure the events are in the right order in the slice.
|
|
sort.SliceStable(events, func(i int, j int) bool {
|
|
return events[i].StreamPosition < events[j].StreamPosition
|
|
})
|
|
return events, nil
|
|
}
|
|
|
|
// selectEvents returns the events for the given event IDs. If an event is
|
|
// missing from the database, it will be omitted.
|
|
func (s *outputRoomEventsStatements) selectEvents(
|
|
ctx context.Context, txn *sql.Tx, eventIDs []string,
|
|
) ([]types.StreamEvent, error) {
|
|
stmt := common.TxStmt(txn, s.selectEventsStmt)
|
|
rows, err := stmt.QueryContext(ctx, pq.StringArray(eventIDs))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close() // nolint: errcheck
|
|
return rowsToStreamEvents(rows)
|
|
}
|
|
|
|
func rowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) {
|
|
var result []types.StreamEvent
|
|
for rows.Next() {
|
|
var (
|
|
streamPos types.StreamPosition
|
|
eventBytes []byte
|
|
excludeFromSync bool
|
|
sessionID *int64
|
|
txnID *string
|
|
transactionID *api.TransactionID
|
|
)
|
|
if err := rows.Scan(&streamPos, &eventBytes, &sessionID, &excludeFromSync, &txnID); err != nil {
|
|
return nil, err
|
|
}
|
|
// TODO: Handle redacted events
|
|
ev, err := gomatrixserverlib.NewEventFromTrustedJSON(eventBytes, false)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if sessionID != nil && txnID != nil {
|
|
transactionID = &api.TransactionID{
|
|
SessionID: *sessionID,
|
|
TransactionID: *txnID,
|
|
}
|
|
}
|
|
|
|
result = append(result, types.StreamEvent{
|
|
Event: ev,
|
|
StreamPosition: streamPos,
|
|
TransactionID: transactionID,
|
|
ExcludeFromSync: excludeFromSync,
|
|
})
|
|
}
|
|
return result, rows.Err()
|
|
}
|