Use a single storage.Database interface (#978)

main
Kegsay 2020-04-24 10:38:58 +01:00 committed by GitHub
parent c30b12b5a1
commit a202d88fe5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 65 additions and 204 deletions

View File

@ -18,6 +18,7 @@ import (
"context" "context"
"sort" "sort"
"github.com/matrix-org/dendrite/roomserver/storage"
"github.com/matrix-org/dendrite/roomserver/types" "github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
) )
@ -26,7 +27,7 @@ import (
// Returns the numeric IDs for the auth events. // Returns the numeric IDs for the auth events.
func checkAuthEvents( func checkAuthEvents(
ctx context.Context, ctx context.Context,
db RoomEventDatabase, db storage.Database,
event gomatrixserverlib.HeaderedEvent, event gomatrixserverlib.HeaderedEvent,
authEventIDs []string, authEventIDs []string,
) ([]types.EventNID, error) { ) ([]types.EventNID, error) {
@ -127,7 +128,7 @@ func (ae *authEvents) lookupEvent(typeNID types.EventTypeNID, stateKey string) *
// loadAuthEvents loads the events needed for authentication from the supplied room state. // loadAuthEvents loads the events needed for authentication from the supplied room state.
func loadAuthEvents( func loadAuthEvents(
ctx context.Context, ctx context.Context,
db RoomEventDatabase, db storage.Database,
needed gomatrixserverlib.StateNeeded, needed gomatrixserverlib.StateNeeded,
state []types.StateEntry, state []types.StateEntry,
) (result authEvents, err error) { ) (result authEvents, err error) {

View File

@ -23,63 +23,12 @@ import (
"github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/roomserver/state" "github.com/matrix-org/dendrite/roomserver/state"
"github.com/matrix-org/dendrite/roomserver/state/database" "github.com/matrix-org/dendrite/roomserver/storage"
"github.com/matrix-org/dendrite/roomserver/types" "github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
) )
// A RoomEventDatabase has the storage APIs needed to store a room event.
type RoomEventDatabase interface {
database.RoomStateDatabase
// Stores a matrix room event in the database
StoreEvent(
ctx context.Context,
event gomatrixserverlib.Event,
txnAndSessionID *api.TransactionID,
authEventNIDs []types.EventNID,
) (types.RoomNID, types.StateAtEvent, error)
// Look up the state entries for a list of string event IDs
// Returns an error if the there is an error talking to the database
// Returns a types.MissingEventError if the event IDs aren't in the database.
StateEntriesForEventIDs(
ctx context.Context, eventIDs []string,
) ([]types.StateEntry, error)
// Set the state at an event.
SetState(
ctx context.Context,
eventNID types.EventNID,
stateNID types.StateSnapshotNID,
) error
// Look up the latest events in a room in preparation for an update.
// The RoomRecentEventsUpdater must have Commit or Rollback called on it if this doesn't return an error.
// Returns the latest events in the room and the last eventID sent to the log along with an updater.
// If this returns an error then no further action is required.
GetLatestEventsForUpdate(
ctx context.Context, roomNID types.RoomNID,
) (updater types.RoomRecentEventsUpdater, err error)
// Look up the string event IDs for a list of numeric event IDs
EventIDs(
ctx context.Context, eventNIDs []types.EventNID,
) (map[types.EventNID]string, error)
// Build a membership updater for the target user in a room.
MembershipUpdater(
ctx context.Context, roomID, targerUserID string,
roomVersion gomatrixserverlib.RoomVersion,
) (types.MembershipUpdater, error)
// Look up event ID by transaction's info.
// This is used to determine if the room event is processed/processing already.
// Returns an empty string if no such event exists.
GetTransactionEventID(
ctx context.Context, transactionID string,
sessionID int64, userID string,
) (string, error)
// Look up the room version for a given room.
GetRoomVersionForRoom(
ctx context.Context, roomID string,
) (gomatrixserverlib.RoomVersion, error)
}
// OutputRoomEventWriter has the APIs needed to write an event to the output logs. // OutputRoomEventWriter has the APIs needed to write an event to the output logs.
type OutputRoomEventWriter interface { type OutputRoomEventWriter interface {
// Write a list of events for a room // Write a list of events for a room
@ -93,7 +42,7 @@ type OutputRoomEventWriter interface {
// state deltas when sending to kafka streams // state deltas when sending to kafka streams
func processRoomEvent( func processRoomEvent(
ctx context.Context, ctx context.Context,
db RoomEventDatabase, db storage.Database,
ow OutputRoomEventWriter, ow OutputRoomEventWriter,
input api.InputRoomEvent, input api.InputRoomEvent,
) (eventID string, err error) { ) (eventID string, err error) {
@ -153,7 +102,7 @@ func processRoomEvent(
func calculateAndSetState( func calculateAndSetState(
ctx context.Context, ctx context.Context,
db RoomEventDatabase, db storage.Database,
input api.InputRoomEvent, input api.InputRoomEvent,
roomNID types.RoomNID, roomNID types.RoomNID,
stateAtEvent *types.StateAtEvent, stateAtEvent *types.StateAtEvent,
@ -184,7 +133,7 @@ func calculateAndSetState(
func processInviteEvent( func processInviteEvent(
ctx context.Context, ctx context.Context,
db RoomEventDatabase, db storage.Database,
ow OutputRoomEventWriter, ow OutputRoomEventWriter,
input api.InputInviteEvent, input api.InputInviteEvent,
) (err error) { ) (err error) {

View File

@ -24,12 +24,13 @@ import (
"github.com/Shopify/sarama" "github.com/Shopify/sarama"
"github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/roomserver/storage"
"github.com/matrix-org/util" "github.com/matrix-org/util"
) )
// RoomserverInputAPI implements api.RoomserverInputAPI // RoomserverInputAPI implements api.RoomserverInputAPI
type RoomserverInputAPI struct { type RoomserverInputAPI struct {
DB RoomEventDatabase DB storage.Database
Producer sarama.SyncProducer Producer sarama.SyncProducer
// The kafkaesque topic to output new room events to. // The kafkaesque topic to output new room events to.
// This is the name used in kafka to identify the stream to write events to. // This is the name used in kafka to identify the stream to write events to.

View File

@ -23,6 +23,7 @@ import (
"github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/roomserver/state" "github.com/matrix-org/dendrite/roomserver/state"
"github.com/matrix-org/dendrite/roomserver/storage"
"github.com/matrix-org/dendrite/roomserver/types" "github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util" "github.com/matrix-org/util"
@ -47,7 +48,7 @@ import (
// Can only be called once at a time // Can only be called once at a time
func updateLatestEvents( func updateLatestEvents(
ctx context.Context, ctx context.Context,
db RoomEventDatabase, db storage.Database,
ow OutputRoomEventWriter, ow OutputRoomEventWriter,
roomNID types.RoomNID, roomNID types.RoomNID,
stateAtEvent types.StateAtEvent, stateAtEvent types.StateAtEvent,
@ -86,7 +87,7 @@ func updateLatestEvents(
// when there are so many variables to pass around. // when there are so many variables to pass around.
type latestEventsUpdater struct { type latestEventsUpdater struct {
ctx context.Context ctx context.Context
db RoomEventDatabase db storage.Database
updater types.RoomRecentEventsUpdater updater types.RoomRecentEventsUpdater
ow OutputRoomEventWriter ow OutputRoomEventWriter
roomNID types.RoomNID roomNID types.RoomNID

View File

@ -19,6 +19,7 @@ import (
"fmt" "fmt"
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/roomserver/storage"
"github.com/matrix-org/dendrite/roomserver/types" "github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
) )
@ -29,7 +30,7 @@ import (
// consumers about the invites added or retired by the change in current state. // consumers about the invites added or retired by the change in current state.
func updateMemberships( func updateMemberships(
ctx context.Context, ctx context.Context,
db RoomEventDatabase, db storage.Database,
updater types.RoomRecentEventsUpdater, updater types.RoomRecentEventsUpdater,
removed, added []types.StateEntry, removed, added []types.StateEntry,
) ([]api.OutputEvent, error) { ) ([]api.OutputEvent, error) {

View File

@ -26,79 +26,16 @@ import (
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/roomserver/auth" "github.com/matrix-org/dendrite/roomserver/auth"
"github.com/matrix-org/dendrite/roomserver/state" "github.com/matrix-org/dendrite/roomserver/state"
"github.com/matrix-org/dendrite/roomserver/state/database" "github.com/matrix-org/dendrite/roomserver/storage"
"github.com/matrix-org/dendrite/roomserver/types" "github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/dendrite/roomserver/version" "github.com/matrix-org/dendrite/roomserver/version"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util" "github.com/matrix-org/util"
) )
// RoomserverQueryAPIEventDB has a convenience API to fetch events directly by
// EventIDs.
type RoomserverQueryAPIEventDB interface {
// Look up the Events for a list of event IDs. Does not error if event was
// not found.
// Returns an error if the retrieval went wrong.
EventsFromIDs(ctx context.Context, eventIDs []string) ([]types.Event, error)
}
// RoomserverQueryAPIDatabase has the storage APIs needed to implement the query API.
type RoomserverQueryAPIDatabase interface {
database.RoomStateDatabase
RoomserverQueryAPIEventDB
// Look up the numeric ID for the room.
// Returns 0 if the room doesn't exists.
// Returns an error if there was a problem talking to the database.
RoomNID(ctx context.Context, roomID string) (types.RoomNID, error)
// Look up event references for the latest events in the room and the current state snapshot.
// Returns the latest events, the current state and the maximum depth of the latest events plus 1.
// Returns an error if there was a problem talking to the database.
LatestEventIDs(
ctx context.Context, roomNID types.RoomNID,
) ([]gomatrixserverlib.EventReference, types.StateSnapshotNID, int64, error)
// Look up the numeric IDs for a list of events.
// Returns an error if there was a problem talking to the database.
EventNIDs(ctx context.Context, eventIDs []string) (map[string]types.EventNID, error)
// Lookup the event IDs for a batch of event numeric IDs.
// Returns an error if the retrieval went wrong.
EventIDs(ctx context.Context, eventNIDs []types.EventNID) (map[types.EventNID]string, error)
// Lookup the membership of a given user in a given room.
// Returns the numeric ID of the latest membership event sent from this user
// in this room, along a boolean set to true if the user is still in this room,
// false if not.
// Returns an error if there was a problem talking to the database.
GetMembership(
ctx context.Context, roomNID types.RoomNID, requestSenderUserID string,
) (membershipEventNID types.EventNID, stillInRoom bool, err error)
// Lookup the membership event numeric IDs for all user that are or have
// been members of a given room. Only lookup events of "join" membership if
// joinOnly is set to true.
// Returns an error if there was a problem talking to the database.
GetMembershipEventNIDsForRoom(
ctx context.Context, roomNID types.RoomNID, joinOnly bool,
) ([]types.EventNID, error)
// Look up the active invites targeting a user in a room and return the
// numeric state key IDs for the user IDs who sent them.
// Returns an error if there was a problem talking to the database.
GetInvitesForUser(
ctx context.Context,
roomNID types.RoomNID,
targetUserNID types.EventStateKeyNID,
) (senderUserNIDs []types.EventStateKeyNID, err error)
// Look up the string event state keys for a list of numeric event state keys
// Returns an error if there was a problem talking to the database.
EventStateKeys(
context.Context, []types.EventStateKeyNID,
) (map[types.EventStateKeyNID]string, error)
// Look up the room version for a given room.
GetRoomVersionForRoom(
ctx context.Context, roomID string,
) (gomatrixserverlib.RoomVersion, error)
}
// RoomserverQueryAPI is an implementation of api.RoomserverQueryAPI // RoomserverQueryAPI is an implementation of api.RoomserverQueryAPI
type RoomserverQueryAPI struct { type RoomserverQueryAPI struct {
DB RoomserverQueryAPIDatabase DB storage.Database
ImmutableCache caching.ImmutableCache ImmutableCache caching.ImmutableCache
} }
@ -741,7 +678,7 @@ func (r *RoomserverQueryAPI) QueryStateAndAuthChain(
} }
authEventIDs = util.UniqueStrings(authEventIDs) // de-dupe authEventIDs = util.UniqueStrings(authEventIDs) // de-dupe
authEvents, err := getAuthChain(ctx, r.DB, authEventIDs) authEvents, err := getAuthChain(ctx, r.DB.EventsFromIDs, authEventIDs)
if err != nil { if err != nil {
return err return err
} }
@ -788,12 +725,14 @@ func (r *RoomserverQueryAPI) loadStateAtEventIDs(ctx context.Context, eventIDs [
return r.loadStateEvents(ctx, stateEntries) return r.loadStateEvents(ctx, stateEntries)
} }
type eventsFromIDs func(context.Context, []string) ([]types.Event, error)
// getAuthChain fetches the auth chain for the given auth events. An auth chain // getAuthChain fetches the auth chain for the given auth events. An auth chain
// is the list of all events that are referenced in the auth_events section, and // is the list of all events that are referenced in the auth_events section, and
// all their auth_events, recursively. The returned set of events contain the // all their auth_events, recursively. The returned set of events contain the
// given events. Will *not* error if we don't have all auth events. // given events. Will *not* error if we don't have all auth events.
func getAuthChain( func getAuthChain(
ctx context.Context, dB RoomserverQueryAPIEventDB, authEventIDs []string, ctx context.Context, fn eventsFromIDs, authEventIDs []string,
) ([]gomatrixserverlib.Event, error) { ) ([]gomatrixserverlib.Event, error) {
// List of event IDs to fetch. On each pass, these events will be requested // List of event IDs to fetch. On each pass, these events will be requested
// from the database and the `eventsToFetch` will be updated with any new // from the database and the `eventsToFetch` will be updated with any new
@ -804,7 +743,7 @@ func getAuthChain(
for len(eventsToFetch) > 0 { for len(eventsToFetch) > 0 {
// Try to retrieve the events from the database. // Try to retrieve the events from the database.
events, err := dB.EventsFromIDs(ctx, eventsToFetch) events, err := fn(ctx, eventsToFetch)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -106,7 +106,7 @@ func TestGetAuthChainSingle(t *testing.T) {
t.Fatalf("Failed to add events to db: %v", err) t.Fatalf("Failed to add events to db: %v", err)
} }
result, err := getAuthChain(context.TODO(), db, []string{"e"}) result, err := getAuthChain(context.TODO(), db.EventsFromIDs, []string{"e"})
if err != nil { if err != nil {
t.Fatalf("getAuthChain failed: %v", err) t.Fatalf("getAuthChain failed: %v", err)
} }
@ -139,7 +139,7 @@ func TestGetAuthChainMultiple(t *testing.T) {
t.Fatalf("Failed to add events to db: %v", err) t.Fatalf("Failed to add events to db: %v", err)
} }
result, err := getAuthChain(context.TODO(), db, []string{"e", "f"}) result, err := getAuthChain(context.TODO(), db.EventsFromIDs, []string{"e", "f"})
if err != nil { if err != nil {
t.Fatalf("getAuthChain failed: %v", err) t.Fatalf("getAuthChain failed: %v", err)
} }

View File

@ -1,67 +0,0 @@
// Copyright 2017 Vector Creations Ltd
// Copyright 2018 New Vector Ltd
// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package database
import (
"context"
"github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/gomatrixserverlib"
)
// A RoomStateDatabase has the storage APIs needed to load state from the database
type RoomStateDatabase interface {
// Store the room state at an event in the database
AddState(
ctx context.Context,
roomNID types.RoomNID,
stateBlockNIDs []types.StateBlockNID,
state []types.StateEntry,
) (types.StateSnapshotNID, error)
// Look up the state of a room at each event for a list of string event IDs.
// Returns an error if there is an error talking to the database
// Returns a types.MissingEventError if the room state for the event IDs aren't in the database
StateAtEventIDs(ctx context.Context, eventIDs []string) ([]types.StateAtEvent, error)
// Look up the numeric IDs for a list of string event types.
// Returns a map from string event type to numeric ID for the event type.
EventTypeNIDs(ctx context.Context, eventTypes []string) (map[string]types.EventTypeNID, error)
// Look up the numeric IDs for a list of string event state keys.
// Returns a map from string state key to numeric ID for the state key.
EventStateKeyNIDs(ctx context.Context, eventStateKeys []string) (map[string]types.EventStateKeyNID, error)
// Look up the numeric state data IDs for each numeric state snapshot ID
// The returned slice is sorted by numeric state snapshot ID.
StateBlockNIDs(ctx context.Context, stateNIDs []types.StateSnapshotNID) ([]types.StateBlockNIDList, error)
// Look up the state data for each numeric state data ID
// The returned slice is sorted by numeric state data ID.
StateEntries(ctx context.Context, stateBlockNIDs []types.StateBlockNID) ([]types.StateEntryList, error)
// Look up the state data for the state key tuples for each numeric state block ID
// This is used to fetch a subset of the room state at a snapshot.
// If a block doesn't contain any of the requested tuples then it can be discarded from the result.
// The returned slice is sorted by numeric state block ID.
StateEntriesForTuples(
ctx context.Context,
stateBlockNIDs []types.StateBlockNID,
stateKeyTuples []types.StateKeyTuple,
) ([]types.StateEntryList, error)
// Look up the Events for a list of numeric event IDs.
// Returns a sorted list of events.
Events(ctx context.Context, eventNIDs []types.EventNID) ([]types.Event, error)
// Look up snapshot NID for an event ID string
SnapshotNIDFromEventID(ctx context.Context, eventID string) (types.StateSnapshotNID, error)
// Look up a room version from the room NID.
GetRoomVersionForRoomNID(ctx context.Context, roomNID types.RoomNID) (gomatrixserverlib.RoomVersion, error)
}

View File

@ -1 +0,0 @@
package shared

View File

@ -22,7 +22,7 @@ import (
"sort" "sort"
"time" "time"
"github.com/matrix-org/dendrite/roomserver/state/database" "github.com/matrix-org/dendrite/roomserver/storage"
"github.com/matrix-org/util" "github.com/matrix-org/util"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
@ -31,10 +31,10 @@ import (
) )
type StateResolution struct { type StateResolution struct {
db database.RoomStateDatabase db storage.Database
} }
func NewStateResolution(db database.RoomStateDatabase) StateResolution { func NewStateResolution(db storage.Database) StateResolution {
return StateResolution{ return StateResolution{
db: db, db: db,
} }

View File

@ -18,13 +18,50 @@ import (
"context" "context"
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
statedb "github.com/matrix-org/dendrite/roomserver/state/database"
"github.com/matrix-org/dendrite/roomserver/types" "github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
) )
type Database interface { type Database interface {
statedb.RoomStateDatabase // Store the room state at an event in the database
AddState(
ctx context.Context,
roomNID types.RoomNID,
stateBlockNIDs []types.StateBlockNID,
state []types.StateEntry,
) (types.StateSnapshotNID, error)
// Look up the state of a room at each event for a list of string event IDs.
// Returns an error if there is an error talking to the database
// Returns a types.MissingEventError if the room state for the event IDs aren't in the database
StateAtEventIDs(ctx context.Context, eventIDs []string) ([]types.StateAtEvent, error)
// Look up the numeric IDs for a list of string event types.
// Returns a map from string event type to numeric ID for the event type.
EventTypeNIDs(ctx context.Context, eventTypes []string) (map[string]types.EventTypeNID, error)
// Look up the numeric IDs for a list of string event state keys.
// Returns a map from string state key to numeric ID for the state key.
EventStateKeyNIDs(ctx context.Context, eventStateKeys []string) (map[string]types.EventStateKeyNID, error)
// Look up the numeric state data IDs for each numeric state snapshot ID
// The returned slice is sorted by numeric state snapshot ID.
StateBlockNIDs(ctx context.Context, stateNIDs []types.StateSnapshotNID) ([]types.StateBlockNIDList, error)
// Look up the state data for each numeric state data ID
// The returned slice is sorted by numeric state data ID.
StateEntries(ctx context.Context, stateBlockNIDs []types.StateBlockNID) ([]types.StateEntryList, error)
// Look up the state data for the state key tuples for each numeric state block ID
// This is used to fetch a subset of the room state at a snapshot.
// If a block doesn't contain any of the requested tuples then it can be discarded from the result.
// The returned slice is sorted by numeric state block ID.
StateEntriesForTuples(
ctx context.Context,
stateBlockNIDs []types.StateBlockNID,
stateKeyTuples []types.StateKeyTuple,
) ([]types.StateEntryList, error)
// Look up the Events for a list of numeric event IDs.
// Returns a sorted list of events.
Events(ctx context.Context, eventNIDs []types.EventNID) ([]types.Event, error)
// Look up snapshot NID for an event ID string
SnapshotNIDFromEventID(ctx context.Context, eventID string) (types.StateSnapshotNID, error)
// Look up a room version from the room NID.
GetRoomVersionForRoomNID(ctx context.Context, roomNID types.RoomNID) (gomatrixserverlib.RoomVersion, error)
StoreEvent(ctx context.Context, event gomatrixserverlib.Event, txnAndSessionID *api.TransactionID, authEventNIDs []types.EventNID) (types.RoomNID, types.StateAtEvent, error) StoreEvent(ctx context.Context, event gomatrixserverlib.Event, txnAndSessionID *api.TransactionID, authEventNIDs []types.EventNID) (types.RoomNID, types.StateAtEvent, error)
StateEntriesForEventIDs(ctx context.Context, eventIDs []string) ([]types.StateEntry, error) StateEntriesForEventIDs(ctx context.Context, eventIDs []string) ([]types.StateEntry, error)
EventStateKeys(ctx context.Context, eventStateKeyNIDs []types.EventStateKeyNID) (map[types.EventStateKeyNID]string, error) EventStateKeys(ctx context.Context, eventStateKeyNIDs []types.EventStateKeyNID) (map[types.EventStateKeyNID]string, error)