2020-02-13 17:27:33 +00:00
|
|
|
// Copyright 2017-2018 New Vector Ltd
|
|
|
|
// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
|
|
|
|
//
|
|
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
// you may not use this file except in compliance with the License.
|
|
|
|
// You may obtain a copy of the License at
|
|
|
|
//
|
|
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
//
|
|
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
// See the License for the specific language governing permissions and
|
|
|
|
// limitations under the License.
|
|
|
|
|
|
|
|
package sqlite3
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
"database/sql"
|
Add peer-to-peer support into Dendrite via libp2p and fetch (#880)
* Use a fork of pq which supports userCurrent on wasm
* Use sqlite3_js driver when running in JS
* Add cmd/dendritejs to pull in sqlite3_js driver for wasm only
* Update to latest go-sqlite-js version
* Replace prometheus with a stub. sigh
* Hard-code a config and don't use opentracing
* Latest go-sqlite3-js version
* Generate a key for now
* Listen for fetch traffic rather than HTTP
* Latest hacks for js
* libp2p support
* More libp2p
* Fork gjson to allow us to enforce auth checks as before
Previously, all events would come down redacted because the hash
checks would fail. They would fail because sjson.DeleteBytes didn't
remove keys not used for hashing. This didn't work because of a build
tag which included a file which no-oped the index returned.
See https://github.com/tidwall/gjson/issues/157
When it's resolved, let's go back to mainline.
* Use gjson@1.6.0 as it fixes https://github.com/tidwall/gjson/issues/157
* Use latest gomatrixserverlib for sig checks
* Fix a bug which could cause exclude_from_sync to not be set
Caused when sending events over federation.
* Use query variadic to make lookups actually work!
* Latest gomatrixserverlib
* Add notes on getting p2p up and running
Partly so I don't forget myself!
* refactor: Move p2p specific stuff to cmd/dendritejs
This is important or else the normal build of dendrite will fail
because the p2p libraries depend on syscall/js which doesn't work
on normal builds.
Also, clean up main.go to read a bit better.
* Update ho-http-js-libp2p to return errors from RoundTrip
* Add an LRU cache around the key DB
We actually need this for P2P because otherwise we can *segfault*
with things like: "runtime: unexpected return pc for runtime.handleEvent"
where the event is a `syscall/js` event, caused by spamming sql.js
caused by "Checking event signatures for 14 events of room state" which
hammers the key DB repeatedly in quick succession.
Using a cache fixes this, though the underlying cause is probably a bug
in the version of Go I'm on (1.13.7)
* breaking: Add Tracing.Enabled to toggle whether we do opentracing
Defaults to false, which is why this is a breaking change. We need
this flag because WASM builds cannot do opentracing.
* Start adding conditional builds for wasm to handle lib/pq
The general idea here is to have the wasm build have a `NewXXXDatabase`
that doesn't import any postgres package and hence we never import
`lib/pq`, which doesn't work under WASM (undefined `userCurrent`).
* Remove lib/pq for wasm for syncapi
* Add conditional building to remaining storage APIs
* Update build script to set env vars correctly for dendritejs
* sqlite bug fixes
* Docs
* Add a no-op main for dendritejs when not building under wasm
* Use the real prometheus, even for WASM
Instead, the dendrite-sw.js must mock out `process.pid` and
`fs.stat` - which must invoke the callback with an error (e.g `EINVAL`)
in order for it to work:
```
global.process = {
pid: 1,
};
global.fs.stat = function(path, cb) {
cb({
code: "EINVAL",
});
}
```
* Linting
2020-03-06 10:23:55 +00:00
|
|
|
"encoding/json"
|
2020-02-13 17:27:33 +00:00
|
|
|
"fmt"
|
2021-04-26 12:25:57 +00:00
|
|
|
"sort"
|
2020-02-13 17:27:33 +00:00
|
|
|
"strings"
|
|
|
|
|
2020-05-21 13:40:13 +00:00
|
|
|
"github.com/matrix-org/dendrite/internal"
|
2020-06-12 13:55:57 +00:00
|
|
|
"github.com/matrix-org/dendrite/internal/sqlutil"
|
2020-05-27 10:03:47 +00:00
|
|
|
"github.com/matrix-org/dendrite/roomserver/storage/shared"
|
2020-05-26 15:45:28 +00:00
|
|
|
"github.com/matrix-org/dendrite/roomserver/storage/tables"
|
2020-02-13 17:27:33 +00:00
|
|
|
"github.com/matrix-org/dendrite/roomserver/types"
|
|
|
|
"github.com/matrix-org/gomatrixserverlib"
|
|
|
|
)
|
|
|
|
|
|
|
|
const eventsSchema = `
|
|
|
|
CREATE TABLE IF NOT EXISTS roomserver_events (
|
|
|
|
event_nid INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
|
|
room_nid INTEGER NOT NULL,
|
|
|
|
event_type_nid INTEGER NOT NULL,
|
|
|
|
event_state_key_nid INTEGER NOT NULL,
|
|
|
|
sent_to_output BOOLEAN NOT NULL DEFAULT FALSE,
|
|
|
|
state_snapshot_nid INTEGER NOT NULL DEFAULT 0,
|
|
|
|
depth INTEGER NOT NULL,
|
|
|
|
event_id TEXT NOT NULL UNIQUE,
|
|
|
|
reference_sha256 BLOB NOT NULL,
|
2020-09-16 12:00:52 +00:00
|
|
|
auth_event_nids TEXT NOT NULL DEFAULT '[]',
|
|
|
|
is_rejected BOOLEAN NOT NULL DEFAULT FALSE
|
2020-02-13 17:27:33 +00:00
|
|
|
);
|
|
|
|
`
|
|
|
|
|
|
|
|
const insertEventSQL = `
|
2020-09-16 12:00:52 +00:00
|
|
|
INSERT INTO roomserver_events (room_nid, event_type_nid, event_state_key_nid, event_id, reference_sha256, auth_event_nids, depth, is_rejected)
|
|
|
|
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
|
2020-02-13 17:27:33 +00:00
|
|
|
ON CONFLICT DO NOTHING;
|
|
|
|
`
|
|
|
|
|
|
|
|
const selectEventSQL = "" +
|
|
|
|
"SELECT event_nid, state_snapshot_nid FROM roomserver_events WHERE event_id = $1"
|
|
|
|
|
|
|
|
// Bulk lookup of events by string ID.
|
|
|
|
// Sort by the numeric IDs for event type and state key.
|
|
|
|
// This means we can use binary search to lookup entries by type and state key.
|
|
|
|
const bulkSelectStateEventByIDSQL = "" +
|
|
|
|
"SELECT event_type_nid, event_state_key_nid, event_nid FROM roomserver_events" +
|
|
|
|
" WHERE event_id IN ($1)" +
|
|
|
|
" ORDER BY event_type_nid, event_state_key_nid ASC"
|
|
|
|
|
2021-04-26 12:25:57 +00:00
|
|
|
const bulkSelectStateEventByNIDSQL = "" +
|
|
|
|
"SELECT event_type_nid, event_state_key_nid, event_nid FROM roomserver_events" +
|
|
|
|
" WHERE event_nid IN ($1)"
|
|
|
|
// Rest of query is built by BulkSelectStateEventByNID
|
|
|
|
|
2020-02-13 17:27:33 +00:00
|
|
|
const bulkSelectStateAtEventByIDSQL = "" +
|
2020-09-16 12:00:52 +00:00
|
|
|
"SELECT event_type_nid, event_state_key_nid, event_nid, state_snapshot_nid, is_rejected FROM roomserver_events" +
|
2020-02-13 17:27:33 +00:00
|
|
|
" WHERE event_id IN ($1)"
|
|
|
|
|
|
|
|
const updateEventStateSQL = "" +
|
|
|
|
"UPDATE roomserver_events SET state_snapshot_nid = $1 WHERE event_nid = $2"
|
|
|
|
|
|
|
|
const selectEventSentToOutputSQL = "" +
|
|
|
|
"SELECT sent_to_output FROM roomserver_events WHERE event_nid = $1"
|
|
|
|
|
|
|
|
const updateEventSentToOutputSQL = "" +
|
|
|
|
"UPDATE roomserver_events SET sent_to_output = TRUE WHERE event_nid = $1"
|
|
|
|
|
|
|
|
const selectEventIDSQL = "" +
|
|
|
|
"SELECT event_id FROM roomserver_events WHERE event_nid = $1"
|
|
|
|
|
|
|
|
const bulkSelectStateAtEventAndReferenceSQL = "" +
|
|
|
|
"SELECT event_type_nid, event_state_key_nid, event_nid, state_snapshot_nid, event_id, reference_sha256" +
|
|
|
|
" FROM roomserver_events WHERE event_nid IN ($1)"
|
|
|
|
|
|
|
|
const bulkSelectEventReferenceSQL = "" +
|
|
|
|
"SELECT event_id, reference_sha256 FROM roomserver_events WHERE event_nid IN ($1)"
|
|
|
|
|
|
|
|
const bulkSelectEventIDSQL = "" +
|
|
|
|
"SELECT event_nid, event_id FROM roomserver_events WHERE event_nid IN ($1)"
|
|
|
|
|
|
|
|
const bulkSelectEventNIDSQL = "" +
|
|
|
|
"SELECT event_id, event_nid FROM roomserver_events WHERE event_id IN ($1)"
|
|
|
|
|
|
|
|
const selectMaxEventDepthSQL = "" +
|
|
|
|
"SELECT COALESCE(MAX(depth) + 1, 0) FROM roomserver_events WHERE event_nid IN ($1)"
|
|
|
|
|
2020-12-16 10:33:28 +00:00
|
|
|
const selectRoomNIDsForEventNIDsSQL = "" +
|
|
|
|
"SELECT event_nid, room_nid FROM roomserver_events WHERE event_nid IN ($1)"
|
2020-03-27 16:28:22 +00:00
|
|
|
|
2020-02-13 17:27:33 +00:00
|
|
|
type eventStatements struct {
|
|
|
|
db *sql.DB
|
|
|
|
insertEventStmt *sql.Stmt
|
|
|
|
selectEventStmt *sql.Stmt
|
|
|
|
bulkSelectStateEventByIDStmt *sql.Stmt
|
|
|
|
bulkSelectStateAtEventByIDStmt *sql.Stmt
|
|
|
|
updateEventStateStmt *sql.Stmt
|
|
|
|
selectEventSentToOutputStmt *sql.Stmt
|
|
|
|
updateEventSentToOutputStmt *sql.Stmt
|
|
|
|
selectEventIDStmt *sql.Stmt
|
|
|
|
bulkSelectStateAtEventAndReferenceStmt *sql.Stmt
|
|
|
|
bulkSelectEventReferenceStmt *sql.Stmt
|
|
|
|
bulkSelectEventIDStmt *sql.Stmt
|
|
|
|
bulkSelectEventNIDStmt *sql.Stmt
|
2020-12-16 10:33:28 +00:00
|
|
|
//selectRoomNIDsForEventNIDsStmt *sql.Stmt
|
2020-02-13 17:27:33 +00:00
|
|
|
}
|
|
|
|
|
2021-04-26 12:25:57 +00:00
|
|
|
func createEventsTable(db *sql.DB) error {
|
|
|
|
_, err := db.Exec(eventsSchema)
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
func prepareEventsTable(db *sql.DB) (tables.Events, error) {
|
2020-07-21 09:48:49 +00:00
|
|
|
s := &eventStatements{
|
2020-08-19 14:38:27 +00:00
|
|
|
db: db,
|
2020-07-21 09:48:49 +00:00
|
|
|
}
|
2020-02-13 17:27:33 +00:00
|
|
|
|
2020-05-27 10:03:47 +00:00
|
|
|
return s, shared.StatementList{
|
2020-02-13 17:27:33 +00:00
|
|
|
{&s.insertEventStmt, insertEventSQL},
|
|
|
|
{&s.selectEventStmt, selectEventSQL},
|
|
|
|
{&s.bulkSelectStateEventByIDStmt, bulkSelectStateEventByIDSQL},
|
|
|
|
{&s.bulkSelectStateAtEventByIDStmt, bulkSelectStateAtEventByIDSQL},
|
|
|
|
{&s.updateEventStateStmt, updateEventStateSQL},
|
|
|
|
{&s.updateEventSentToOutputStmt, updateEventSentToOutputSQL},
|
|
|
|
{&s.selectEventSentToOutputStmt, selectEventSentToOutputSQL},
|
|
|
|
{&s.selectEventIDStmt, selectEventIDSQL},
|
|
|
|
{&s.bulkSelectStateAtEventAndReferenceStmt, bulkSelectStateAtEventAndReferenceSQL},
|
|
|
|
{&s.bulkSelectEventReferenceStmt, bulkSelectEventReferenceSQL},
|
|
|
|
{&s.bulkSelectEventIDStmt, bulkSelectEventIDSQL},
|
|
|
|
{&s.bulkSelectEventNIDStmt, bulkSelectEventNIDSQL},
|
2020-12-16 10:33:28 +00:00
|
|
|
//{&s.selectRoomNIDForEventNIDStmt, selectRoomNIDForEventNIDSQL},
|
2020-05-27 10:03:47 +00:00
|
|
|
}.Prepare(db)
|
2020-02-13 17:27:33 +00:00
|
|
|
}
|
|
|
|
|
2020-05-26 15:45:28 +00:00
|
|
|
func (s *eventStatements) InsertEvent(
|
2020-02-13 17:27:33 +00:00
|
|
|
ctx context.Context,
|
|
|
|
txn *sql.Tx,
|
|
|
|
roomNID types.RoomNID,
|
|
|
|
eventTypeNID types.EventTypeNID,
|
|
|
|
eventStateKeyNID types.EventStateKeyNID,
|
|
|
|
eventID string,
|
|
|
|
referenceSHA256 []byte,
|
|
|
|
authEventNIDs []types.EventNID,
|
|
|
|
depth int64,
|
2020-09-16 12:00:52 +00:00
|
|
|
isRejected bool,
|
2020-05-26 15:45:28 +00:00
|
|
|
) (types.EventNID, types.StateSnapshotNID, error) {
|
2020-04-29 17:41:45 +00:00
|
|
|
// attempt to insert: the last_row_id is the event NID
|
2020-07-21 09:48:49 +00:00
|
|
|
var eventNID int64
|
2020-08-19 14:38:27 +00:00
|
|
|
insertStmt := sqlutil.TxStmt(txn, s.insertEventStmt)
|
|
|
|
result, err := insertStmt.ExecContext(
|
|
|
|
ctx, int64(roomNID), int64(eventTypeNID), int64(eventStateKeyNID),
|
2020-09-16 12:00:52 +00:00
|
|
|
eventID, referenceSHA256, eventNIDsAsArray(authEventNIDs), depth, isRejected,
|
2020-08-19 14:38:27 +00:00
|
|
|
)
|
|
|
|
if err != nil {
|
|
|
|
return 0, 0, err
|
|
|
|
}
|
|
|
|
modified, err := result.RowsAffected()
|
|
|
|
if modified == 0 && err == nil {
|
|
|
|
return 0, 0, sql.ErrNoRows
|
|
|
|
}
|
|
|
|
eventNID, err = result.LastInsertId()
|
2020-05-26 15:45:28 +00:00
|
|
|
return types.EventNID(eventNID), 0, err
|
2020-02-13 17:27:33 +00:00
|
|
|
}
|
|
|
|
|
2020-05-26 15:45:28 +00:00
|
|
|
func (s *eventStatements) SelectEvent(
|
2020-02-13 17:27:33 +00:00
|
|
|
ctx context.Context, txn *sql.Tx, eventID string,
|
|
|
|
) (types.EventNID, types.StateSnapshotNID, error) {
|
|
|
|
var eventNID int64
|
|
|
|
var stateNID int64
|
2020-06-12 13:55:57 +00:00
|
|
|
selectStmt := sqlutil.TxStmt(txn, s.selectEventStmt)
|
2020-02-13 17:27:33 +00:00
|
|
|
err := selectStmt.QueryRowContext(ctx, eventID).Scan(&eventNID, &stateNID)
|
|
|
|
return types.EventNID(eventNID), types.StateSnapshotNID(stateNID), err
|
|
|
|
}
|
|
|
|
|
|
|
|
// bulkSelectStateEventByID lookups a list of state events by event ID.
|
|
|
|
// If any of the requested events are missing from the database it returns a types.MissingEventError
|
2020-05-26 15:45:28 +00:00
|
|
|
func (s *eventStatements) BulkSelectStateEventByID(
|
|
|
|
ctx context.Context, eventIDs []string,
|
2020-02-13 17:27:33 +00:00
|
|
|
) ([]types.StateEntry, error) {
|
|
|
|
///////////////
|
|
|
|
iEventIDs := make([]interface{}, len(eventIDs))
|
|
|
|
for k, v := range eventIDs {
|
|
|
|
iEventIDs[k] = v
|
|
|
|
}
|
2020-06-12 13:55:57 +00:00
|
|
|
selectOrig := strings.Replace(bulkSelectStateEventByIDSQL, "($1)", sqlutil.QueryVariadic(len(iEventIDs)), 1)
|
2020-05-26 15:45:28 +00:00
|
|
|
selectStmt, err := s.db.Prepare(selectOrig)
|
2020-02-13 17:27:33 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
///////////////
|
|
|
|
|
|
|
|
rows, err := selectStmt.QueryContext(ctx, iEventIDs...)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2020-05-21 13:40:13 +00:00
|
|
|
defer internal.CloseAndLogIfError(ctx, rows, "bulkSelectStateEventByID: rows.close() failed")
|
2020-02-13 17:27:33 +00:00
|
|
|
// We know that we will only get as many results as event IDs
|
|
|
|
// because of the unique constraint on event IDs.
|
|
|
|
// So we can allocate an array of the correct size now.
|
|
|
|
// We might get fewer results than IDs so we adjust the length of the slice before returning it.
|
|
|
|
results := make([]types.StateEntry, len(eventIDs))
|
|
|
|
i := 0
|
|
|
|
for ; rows.Next(); i++ {
|
|
|
|
result := &results[i]
|
|
|
|
if err = rows.Scan(
|
|
|
|
&result.EventTypeNID,
|
|
|
|
&result.EventStateKeyNID,
|
|
|
|
&result.EventNID,
|
|
|
|
); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if i != len(eventIDs) {
|
|
|
|
// If there are fewer rows returned than IDs then we were asked to lookup event IDs we don't have.
|
|
|
|
// We don't know which ones were missing because we don't return the string IDs in the query.
|
|
|
|
// However it should be possible debug this by replaying queries or entries from the input kafka logs.
|
|
|
|
// If this turns out to be impossible and we do need the debug information here, it would be better
|
2020-05-21 13:40:13 +00:00
|
|
|
// to do it as a separate query rather than slowing down/complicating the internal case.
|
2020-02-13 17:27:33 +00:00
|
|
|
return nil, types.MissingEventError(
|
|
|
|
fmt.Sprintf("storage: state event IDs missing from the database (%d != %d)", i, len(eventIDs)),
|
|
|
|
)
|
|
|
|
}
|
|
|
|
return results, err
|
|
|
|
}
|
|
|
|
|
2021-04-26 12:25:57 +00:00
|
|
|
// bulkSelectStateEventByID lookups a list of state events by event ID.
|
|
|
|
// If any of the requested events are missing from the database it returns a types.MissingEventError
|
|
|
|
func (s *eventStatements) BulkSelectStateEventByNID(
|
|
|
|
ctx context.Context, eventNIDs []types.EventNID,
|
|
|
|
stateKeyTuples []types.StateKeyTuple,
|
|
|
|
) ([]types.StateEntry, error) {
|
|
|
|
tuples := stateKeyTupleSorter(stateKeyTuples)
|
|
|
|
sort.Sort(tuples)
|
|
|
|
eventTypeNIDArray, eventStateKeyNIDArray := tuples.typesAndStateKeysAsArrays()
|
|
|
|
params := make([]interface{}, 0, len(eventNIDs)+len(eventTypeNIDArray)+len(eventStateKeyNIDArray))
|
|
|
|
selectOrig := strings.Replace(bulkSelectStateEventByNIDSQL, "($1)", sqlutil.QueryVariadic(len(eventNIDs)), 1)
|
|
|
|
for _, v := range eventNIDs {
|
|
|
|
params = append(params, v)
|
|
|
|
}
|
|
|
|
if len(eventTypeNIDArray) > 0 {
|
|
|
|
selectOrig += " AND event_type_nid IN " + sqlutil.QueryVariadicOffset(len(eventTypeNIDArray), len(params))
|
|
|
|
for _, v := range eventTypeNIDArray {
|
|
|
|
params = append(params, v)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if len(eventStateKeyNIDArray) > 0 {
|
|
|
|
selectOrig += " AND event_state_key_nid IN " + sqlutil.QueryVariadicOffset(len(eventStateKeyNIDArray), len(params))
|
|
|
|
for _, v := range eventStateKeyNIDArray {
|
|
|
|
params = append(params, v)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
selectOrig += " ORDER BY event_type_nid, event_state_key_nid ASC"
|
|
|
|
selectStmt, err := s.db.Prepare(selectOrig)
|
|
|
|
if err != nil {
|
|
|
|
return nil, fmt.Errorf("s.db.Prepare: %w", err)
|
|
|
|
}
|
|
|
|
rows, err := selectStmt.QueryContext(ctx, params...)
|
|
|
|
if err != nil {
|
|
|
|
return nil, fmt.Errorf("selectStmt.QueryContext: %w", err)
|
|
|
|
}
|
|
|
|
defer internal.CloseAndLogIfError(ctx, rows, "bulkSelectStateEventByID: rows.close() failed")
|
|
|
|
// We know that we will only get as many results as event IDs
|
|
|
|
// because of the unique constraint on event IDs.
|
|
|
|
// So we can allocate an array of the correct size now.
|
|
|
|
// We might get fewer results than IDs so we adjust the length of the slice before returning it.
|
|
|
|
results := make([]types.StateEntry, len(eventNIDs))
|
|
|
|
i := 0
|
|
|
|
for ; rows.Next(); i++ {
|
|
|
|
result := &results[i]
|
|
|
|
if err = rows.Scan(
|
|
|
|
&result.EventTypeNID,
|
|
|
|
&result.EventStateKeyNID,
|
|
|
|
&result.EventNID,
|
|
|
|
); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return results[:i], err
|
|
|
|
}
|
|
|
|
|
2020-02-13 17:27:33 +00:00
|
|
|
// bulkSelectStateAtEventByID lookups the state at a list of events by event ID.
|
|
|
|
// If any of the requested events are missing from the database it returns a types.MissingEventError.
|
|
|
|
// If we do not have the state for any of the requested events it returns a types.MissingEventError.
|
2020-05-26 15:45:28 +00:00
|
|
|
func (s *eventStatements) BulkSelectStateAtEventByID(
|
|
|
|
ctx context.Context, eventIDs []string,
|
2020-02-13 17:27:33 +00:00
|
|
|
) ([]types.StateAtEvent, error) {
|
|
|
|
///////////////
|
|
|
|
iEventIDs := make([]interface{}, len(eventIDs))
|
|
|
|
for k, v := range eventIDs {
|
|
|
|
iEventIDs[k] = v
|
|
|
|
}
|
2020-06-12 13:55:57 +00:00
|
|
|
selectOrig := strings.Replace(bulkSelectStateAtEventByIDSQL, "($1)", sqlutil.QueryVariadic(len(iEventIDs)), 1)
|
2020-05-26 15:45:28 +00:00
|
|
|
selectStmt, err := s.db.Prepare(selectOrig)
|
2020-02-13 17:27:33 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
///////////////
|
|
|
|
rows, err := selectStmt.QueryContext(ctx, iEventIDs...)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2020-05-21 13:40:13 +00:00
|
|
|
defer internal.CloseAndLogIfError(ctx, rows, "bulkSelectStateAtEventByID: rows.close() failed")
|
2020-02-13 17:27:33 +00:00
|
|
|
results := make([]types.StateAtEvent, len(eventIDs))
|
|
|
|
i := 0
|
|
|
|
for ; rows.Next(); i++ {
|
|
|
|
result := &results[i]
|
|
|
|
if err = rows.Scan(
|
|
|
|
&result.EventTypeNID,
|
|
|
|
&result.EventStateKeyNID,
|
|
|
|
&result.EventNID,
|
|
|
|
&result.BeforeStateSnapshotNID,
|
2020-09-16 12:00:52 +00:00
|
|
|
&result.IsRejected,
|
2020-02-13 17:27:33 +00:00
|
|
|
); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
if result.BeforeStateSnapshotNID == 0 {
|
|
|
|
return nil, types.MissingEventError(
|
|
|
|
fmt.Sprintf("storage: missing state for event NID %d", result.EventNID),
|
|
|
|
)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if i != len(eventIDs) {
|
|
|
|
return nil, types.MissingEventError(
|
|
|
|
fmt.Sprintf("storage: event IDs missing from the database (%d != %d)", i, len(eventIDs)),
|
|
|
|
)
|
|
|
|
}
|
|
|
|
return results, err
|
|
|
|
}
|
|
|
|
|
2020-05-26 15:45:28 +00:00
|
|
|
func (s *eventStatements) UpdateEventState(
|
2020-08-20 15:24:33 +00:00
|
|
|
ctx context.Context, txn *sql.Tx, eventNID types.EventNID, stateNID types.StateSnapshotNID,
|
2020-02-13 17:27:33 +00:00
|
|
|
) error {
|
2020-08-20 15:24:33 +00:00
|
|
|
stmt := sqlutil.TxStmt(txn, s.updateEventStateStmt)
|
|
|
|
_, err := stmt.ExecContext(ctx, int64(stateNID), int64(eventNID))
|
2020-08-19 14:38:27 +00:00
|
|
|
return err
|
2020-02-13 17:27:33 +00:00
|
|
|
}
|
|
|
|
|
2020-05-26 15:45:28 +00:00
|
|
|
func (s *eventStatements) SelectEventSentToOutput(
|
2020-02-13 17:27:33 +00:00
|
|
|
ctx context.Context, txn *sql.Tx, eventNID types.EventNID,
|
|
|
|
) (sentToOutput bool, err error) {
|
2020-06-12 13:55:57 +00:00
|
|
|
selectStmt := sqlutil.TxStmt(txn, s.selectEventSentToOutputStmt)
|
2020-02-13 17:27:33 +00:00
|
|
|
err = selectStmt.QueryRowContext(ctx, int64(eventNID)).Scan(&sentToOutput)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2020-05-26 15:45:28 +00:00
|
|
|
func (s *eventStatements) UpdateEventSentToOutput(ctx context.Context, txn *sql.Tx, eventNID types.EventNID) error {
|
2020-08-19 14:38:27 +00:00
|
|
|
updateStmt := sqlutil.TxStmt(txn, s.updateEventSentToOutputStmt)
|
|
|
|
_, err := updateStmt.ExecContext(ctx, int64(eventNID))
|
|
|
|
return err
|
2020-02-13 17:27:33 +00:00
|
|
|
}
|
|
|
|
|
2020-05-26 15:45:28 +00:00
|
|
|
func (s *eventStatements) SelectEventID(
|
2020-02-13 17:27:33 +00:00
|
|
|
ctx context.Context, txn *sql.Tx, eventNID types.EventNID,
|
|
|
|
) (eventID string, err error) {
|
2020-06-12 13:55:57 +00:00
|
|
|
selectStmt := sqlutil.TxStmt(txn, s.selectEventIDStmt)
|
2020-02-13 17:27:33 +00:00
|
|
|
err = selectStmt.QueryRowContext(ctx, int64(eventNID)).Scan(&eventID)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2020-05-26 15:45:28 +00:00
|
|
|
func (s *eventStatements) BulkSelectStateAtEventAndReference(
|
2020-02-13 17:27:33 +00:00
|
|
|
ctx context.Context, txn *sql.Tx, eventNIDs []types.EventNID,
|
|
|
|
) ([]types.StateAtEventAndReference, error) {
|
|
|
|
///////////////
|
|
|
|
iEventNIDs := make([]interface{}, len(eventNIDs))
|
|
|
|
for k, v := range eventNIDs {
|
|
|
|
iEventNIDs[k] = v
|
|
|
|
}
|
2020-06-12 13:55:57 +00:00
|
|
|
selectOrig := strings.Replace(bulkSelectStateAtEventAndReferenceSQL, "($1)", sqlutil.QueryVariadic(len(iEventNIDs)), 1)
|
2020-08-19 12:24:54 +00:00
|
|
|
selectPrep, err := s.db.Prepare(selectOrig)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2020-02-13 17:27:33 +00:00
|
|
|
//////////////
|
|
|
|
|
2020-08-19 12:24:54 +00:00
|
|
|
rows, err := sqlutil.TxStmt(txn, selectPrep).QueryContext(ctx, iEventNIDs...)
|
2020-02-13 17:27:33 +00:00
|
|
|
if err != nil {
|
2020-08-19 14:38:27 +00:00
|
|
|
return nil, fmt.Errorf("sqlutil.TxStmt.QueryContext: %w", err)
|
2020-02-13 17:27:33 +00:00
|
|
|
}
|
2020-05-21 13:40:13 +00:00
|
|
|
defer internal.CloseAndLogIfError(ctx, rows, "bulkSelectStateAtEventAndReference: rows.close() failed")
|
2020-02-13 17:27:33 +00:00
|
|
|
results := make([]types.StateAtEventAndReference, len(eventNIDs))
|
|
|
|
i := 0
|
|
|
|
for ; rows.Next(); i++ {
|
|
|
|
var (
|
|
|
|
eventTypeNID int64
|
|
|
|
eventStateKeyNID int64
|
|
|
|
eventNID int64
|
|
|
|
stateSnapshotNID int64
|
|
|
|
eventID string
|
|
|
|
eventSHA256 []byte
|
|
|
|
)
|
|
|
|
if err = rows.Scan(
|
|
|
|
&eventTypeNID, &eventStateKeyNID, &eventNID, &stateSnapshotNID, &eventID, &eventSHA256,
|
|
|
|
); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
result := &results[i]
|
|
|
|
result.EventTypeNID = types.EventTypeNID(eventTypeNID)
|
|
|
|
result.EventStateKeyNID = types.EventStateKeyNID(eventStateKeyNID)
|
|
|
|
result.EventNID = types.EventNID(eventNID)
|
|
|
|
result.BeforeStateSnapshotNID = types.StateSnapshotNID(stateSnapshotNID)
|
|
|
|
result.EventID = eventID
|
|
|
|
result.EventSHA256 = eventSHA256
|
|
|
|
}
|
|
|
|
if i != len(eventNIDs) {
|
|
|
|
return nil, fmt.Errorf("storage: event NIDs missing from the database (%d != %d)", i, len(eventNIDs))
|
|
|
|
}
|
|
|
|
return results, nil
|
|
|
|
}
|
|
|
|
|
2020-05-26 15:45:28 +00:00
|
|
|
func (s *eventStatements) BulkSelectEventReference(
|
2020-02-13 17:27:33 +00:00
|
|
|
ctx context.Context, txn *sql.Tx, eventNIDs []types.EventNID,
|
|
|
|
) ([]gomatrixserverlib.EventReference, error) {
|
|
|
|
///////////////
|
|
|
|
iEventNIDs := make([]interface{}, len(eventNIDs))
|
|
|
|
for k, v := range eventNIDs {
|
|
|
|
iEventNIDs[k] = v
|
|
|
|
}
|
2020-06-12 13:55:57 +00:00
|
|
|
selectOrig := strings.Replace(bulkSelectEventReferenceSQL, "($1)", sqlutil.QueryVariadic(len(iEventNIDs)), 1)
|
2020-08-19 12:24:54 +00:00
|
|
|
selectPrep, err := s.db.Prepare(selectOrig)
|
2020-02-13 17:27:33 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
///////////////
|
|
|
|
|
2020-06-12 13:55:57 +00:00
|
|
|
selectStmt := sqlutil.TxStmt(txn, selectPrep)
|
2020-02-13 17:27:33 +00:00
|
|
|
rows, err := selectStmt.QueryContext(ctx, iEventNIDs...)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2020-05-21 13:40:13 +00:00
|
|
|
defer internal.CloseAndLogIfError(ctx, rows, "bulkSelectEventReference: rows.close() failed")
|
2020-02-13 17:27:33 +00:00
|
|
|
results := make([]gomatrixserverlib.EventReference, len(eventNIDs))
|
|
|
|
i := 0
|
|
|
|
for ; rows.Next(); i++ {
|
|
|
|
result := &results[i]
|
|
|
|
if err = rows.Scan(&result.EventID, &result.EventSHA256); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if i != len(eventNIDs) {
|
|
|
|
return nil, fmt.Errorf("storage: event NIDs missing from the database (%d != %d)", i, len(eventNIDs))
|
|
|
|
}
|
|
|
|
return results, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// bulkSelectEventID returns a map from numeric event ID to string event ID.
|
2020-05-26 15:45:28 +00:00
|
|
|
func (s *eventStatements) BulkSelectEventID(ctx context.Context, eventNIDs []types.EventNID) (map[types.EventNID]string, error) {
|
2020-02-13 17:27:33 +00:00
|
|
|
///////////////
|
|
|
|
iEventNIDs := make([]interface{}, len(eventNIDs))
|
|
|
|
for k, v := range eventNIDs {
|
|
|
|
iEventNIDs[k] = v
|
|
|
|
}
|
2020-06-12 13:55:57 +00:00
|
|
|
selectOrig := strings.Replace(bulkSelectEventIDSQL, "($1)", sqlutil.QueryVariadic(len(iEventNIDs)), 1)
|
2020-05-26 15:45:28 +00:00
|
|
|
selectStmt, err := s.db.Prepare(selectOrig)
|
2020-02-13 17:27:33 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
///////////////
|
|
|
|
|
|
|
|
rows, err := selectStmt.QueryContext(ctx, iEventNIDs...)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2020-05-21 13:40:13 +00:00
|
|
|
defer internal.CloseAndLogIfError(ctx, rows, "bulkSelectEventID: rows.close() failed")
|
2020-02-13 17:27:33 +00:00
|
|
|
results := make(map[types.EventNID]string, len(eventNIDs))
|
|
|
|
i := 0
|
|
|
|
for ; rows.Next(); i++ {
|
|
|
|
var eventNID int64
|
|
|
|
var eventID string
|
|
|
|
if err = rows.Scan(&eventNID, &eventID); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
results[types.EventNID(eventNID)] = eventID
|
|
|
|
}
|
|
|
|
if i != len(eventNIDs) {
|
|
|
|
return nil, fmt.Errorf("storage: event NIDs missing from the database (%d != %d)", i, len(eventNIDs))
|
|
|
|
}
|
|
|
|
return results, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// bulkSelectEventNIDs returns a map from string event ID to numeric event ID.
|
|
|
|
// If an event ID is not in the database then it is omitted from the map.
|
2020-05-26 15:45:28 +00:00
|
|
|
func (s *eventStatements) BulkSelectEventNID(ctx context.Context, eventIDs []string) (map[string]types.EventNID, error) {
|
2020-02-13 17:27:33 +00:00
|
|
|
///////////////
|
|
|
|
iEventIDs := make([]interface{}, len(eventIDs))
|
|
|
|
for k, v := range eventIDs {
|
|
|
|
iEventIDs[k] = v
|
|
|
|
}
|
2020-06-12 13:55:57 +00:00
|
|
|
selectOrig := strings.Replace(bulkSelectEventNIDSQL, "($1)", sqlutil.QueryVariadic(len(iEventIDs)), 1)
|
2020-05-26 15:45:28 +00:00
|
|
|
selectStmt, err := s.db.Prepare(selectOrig)
|
2020-02-13 17:27:33 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
///////////////
|
|
|
|
rows, err := selectStmt.QueryContext(ctx, iEventIDs...)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2020-05-21 13:40:13 +00:00
|
|
|
defer internal.CloseAndLogIfError(ctx, rows, "bulkSelectEventNID: rows.close() failed")
|
2020-02-13 17:27:33 +00:00
|
|
|
results := make(map[string]types.EventNID, len(eventIDs))
|
|
|
|
for rows.Next() {
|
|
|
|
var eventID string
|
|
|
|
var eventNID int64
|
|
|
|
if err = rows.Scan(&eventID, &eventNID); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
results[eventID] = types.EventNID(eventNID)
|
|
|
|
}
|
|
|
|
return results, nil
|
|
|
|
}
|
|
|
|
|
2020-05-26 15:45:28 +00:00
|
|
|
func (s *eventStatements) SelectMaxEventDepth(ctx context.Context, txn *sql.Tx, eventNIDs []types.EventNID) (int64, error) {
|
2020-02-13 17:27:33 +00:00
|
|
|
var result int64
|
2020-03-06 14:31:12 +00:00
|
|
|
iEventIDs := make([]interface{}, len(eventNIDs))
|
|
|
|
for i, v := range eventNIDs {
|
|
|
|
iEventIDs[i] = v
|
|
|
|
}
|
2020-06-12 13:55:57 +00:00
|
|
|
sqlStr := strings.Replace(selectMaxEventDepthSQL, "($1)", sqlutil.QueryVariadic(len(iEventIDs)), 1)
|
2020-08-19 12:24:54 +00:00
|
|
|
sqlPrep, err := s.db.Prepare(sqlStr)
|
|
|
|
if err != nil {
|
|
|
|
return 0, err
|
|
|
|
}
|
|
|
|
err = sqlutil.TxStmt(txn, sqlPrep).QueryRowContext(ctx, iEventIDs...).Scan(&result)
|
2020-02-13 17:27:33 +00:00
|
|
|
if err != nil {
|
2020-08-19 14:38:27 +00:00
|
|
|
return 0, fmt.Errorf("sqlutil.TxStmt.QueryRowContext: %w", err)
|
2020-02-13 17:27:33 +00:00
|
|
|
}
|
|
|
|
return result, nil
|
|
|
|
}
|
|
|
|
|
2020-12-16 10:33:28 +00:00
|
|
|
func (s *eventStatements) SelectRoomNIDsForEventNIDs(
|
|
|
|
ctx context.Context, eventNIDs []types.EventNID,
|
|
|
|
) (map[types.EventNID]types.RoomNID, error) {
|
|
|
|
sqlStr := strings.Replace(selectRoomNIDsForEventNIDsSQL, "($1)", sqlutil.QueryVariadic(len(eventNIDs)), 1)
|
|
|
|
sqlPrep, err := s.db.Prepare(sqlStr)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
iEventNIDs := make([]interface{}, len(eventNIDs))
|
|
|
|
for i, v := range eventNIDs {
|
|
|
|
iEventNIDs[i] = v
|
|
|
|
}
|
|
|
|
rows, err := sqlPrep.QueryContext(ctx, iEventNIDs...)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
defer internal.CloseAndLogIfError(ctx, rows, "selectRoomNIDsForEventNIDsStmt: rows.close() failed")
|
|
|
|
result := make(map[types.EventNID]types.RoomNID)
|
|
|
|
for rows.Next() {
|
|
|
|
var eventNID types.EventNID
|
|
|
|
var roomNID types.RoomNID
|
|
|
|
if err = rows.Scan(&eventNID, &roomNID); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
result[eventNID] = roomNID
|
|
|
|
}
|
|
|
|
return result, nil
|
2020-03-27 16:28:22 +00:00
|
|
|
}
|
|
|
|
|
Add peer-to-peer support into Dendrite via libp2p and fetch (#880)
* Use a fork of pq which supports userCurrent on wasm
* Use sqlite3_js driver when running in JS
* Add cmd/dendritejs to pull in sqlite3_js driver for wasm only
* Update to latest go-sqlite-js version
* Replace prometheus with a stub. sigh
* Hard-code a config and don't use opentracing
* Latest go-sqlite3-js version
* Generate a key for now
* Listen for fetch traffic rather than HTTP
* Latest hacks for js
* libp2p support
* More libp2p
* Fork gjson to allow us to enforce auth checks as before
Previously, all events would come down redacted because the hash
checks would fail. They would fail because sjson.DeleteBytes didn't
remove keys not used for hashing. This didn't work because of a build
tag which included a file which no-oped the index returned.
See https://github.com/tidwall/gjson/issues/157
When it's resolved, let's go back to mainline.
* Use gjson@1.6.0 as it fixes https://github.com/tidwall/gjson/issues/157
* Use latest gomatrixserverlib for sig checks
* Fix a bug which could cause exclude_from_sync to not be set
Caused when sending events over federation.
* Use query variadic to make lookups actually work!
* Latest gomatrixserverlib
* Add notes on getting p2p up and running
Partly so I don't forget myself!
* refactor: Move p2p specific stuff to cmd/dendritejs
This is important or else the normal build of dendrite will fail
because the p2p libraries depend on syscall/js which doesn't work
on normal builds.
Also, clean up main.go to read a bit better.
* Update ho-http-js-libp2p to return errors from RoundTrip
* Add an LRU cache around the key DB
We actually need this for P2P because otherwise we can *segfault*
with things like: "runtime: unexpected return pc for runtime.handleEvent"
where the event is a `syscall/js` event, caused by spamming sql.js
caused by "Checking event signatures for 14 events of room state" which
hammers the key DB repeatedly in quick succession.
Using a cache fixes this, though the underlying cause is probably a bug
in the version of Go I'm on (1.13.7)
* breaking: Add Tracing.Enabled to toggle whether we do opentracing
Defaults to false, which is why this is a breaking change. We need
this flag because WASM builds cannot do opentracing.
* Start adding conditional builds for wasm to handle lib/pq
The general idea here is to have the wasm build have a `NewXXXDatabase`
that doesn't import any postgres package and hence we never import
`lib/pq`, which doesn't work under WASM (undefined `userCurrent`).
* Remove lib/pq for wasm for syncapi
* Add conditional building to remaining storage APIs
* Update build script to set env vars correctly for dendritejs
* sqlite bug fixes
* Docs
* Add a no-op main for dendritejs when not building under wasm
* Use the real prometheus, even for WASM
Instead, the dendrite-sw.js must mock out `process.pid` and
`fs.stat` - which must invoke the callback with an error (e.g `EINVAL`)
in order for it to work:
```
global.process = {
pid: 1,
};
global.fs.stat = function(path, cb) {
cb({
code: "EINVAL",
});
}
```
* Linting
2020-03-06 10:23:55 +00:00
|
|
|
func eventNIDsAsArray(eventNIDs []types.EventNID) string {
|
|
|
|
b, _ := json.Marshal(eventNIDs)
|
|
|
|
return string(b)
|
2020-02-13 17:27:33 +00:00
|
|
|
}
|