Federation sender event cache (#1614)
* Cache federation sender events * Store in the correct cache * Update federation event cache * Fix Unset * Give EDUs same caching treatment as PDUs * Make federationsender_cache_size configurable * Default caches configuration * Fix unit tests * Revert "Fix unit tests" This reverts commit 24eb5d22524f20e1024b1475debe61ae20538a5a. * Revert "Default caches configuration" This reverts commit 464ecd1e64b9d2983f6fd5430e9607519d543cb3. * Revert "Make federationsender_cache_size configurable" This reverts commit 4631f5324151e006a15d6f19008f06361b994607.main
parent
b507312d4c
commit
5d65a879a5
|
@ -46,7 +46,7 @@ func NewInternalAPI(
|
||||||
) api.FederationSenderInternalAPI {
|
) api.FederationSenderInternalAPI {
|
||||||
cfg := &base.Cfg.FederationSender
|
cfg := &base.Cfg.FederationSender
|
||||||
|
|
||||||
federationSenderDB, err := storage.NewDatabase(&cfg.Database)
|
federationSenderDB, err := storage.NewDatabase(&cfg.Database, base.Caches)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.WithError(err).Panic("failed to connect to federation sender db")
|
logrus.WithError(err).Panic("failed to connect to federation sender db")
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,6 +19,7 @@ import (
|
||||||
"database/sql"
|
"database/sql"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/federationsender/storage/shared"
|
"github.com/matrix-org/dendrite/federationsender/storage/shared"
|
||||||
|
"github.com/matrix-org/dendrite/internal/caching"
|
||||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||||
"github.com/matrix-org/dendrite/setup/config"
|
"github.com/matrix-org/dendrite/setup/config"
|
||||||
)
|
)
|
||||||
|
@ -32,7 +33,7 @@ type Database struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewDatabase opens a new database
|
// NewDatabase opens a new database
|
||||||
func NewDatabase(dbProperties *config.DatabaseOptions) (*Database, error) {
|
func NewDatabase(dbProperties *config.DatabaseOptions, cache caching.FederationSenderCache) (*Database, error) {
|
||||||
var d Database
|
var d Database
|
||||||
var err error
|
var err error
|
||||||
if d.db, err = sqlutil.Open(dbProperties); err != nil {
|
if d.db, err = sqlutil.Open(dbProperties); err != nil {
|
||||||
|
@ -65,6 +66,7 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*Database, error) {
|
||||||
}
|
}
|
||||||
d.Database = shared.Database{
|
d.Database = shared.Database{
|
||||||
DB: d.db,
|
DB: d.db,
|
||||||
|
Cache: cache,
|
||||||
Writer: d.writer,
|
Writer: d.writer,
|
||||||
FederationSenderJoinedHosts: joinedHosts,
|
FederationSenderJoinedHosts: joinedHosts,
|
||||||
FederationSenderQueuePDUs: queuePDUs,
|
FederationSenderQueuePDUs: queuePDUs,
|
||||||
|
|
|
@ -22,12 +22,14 @@ import (
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/federationsender/storage/tables"
|
"github.com/matrix-org/dendrite/federationsender/storage/tables"
|
||||||
"github.com/matrix-org/dendrite/federationsender/types"
|
"github.com/matrix-org/dendrite/federationsender/types"
|
||||||
|
"github.com/matrix-org/dendrite/internal/caching"
|
||||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Database struct {
|
type Database struct {
|
||||||
DB *sql.DB
|
DB *sql.DB
|
||||||
|
Cache caching.FederationSenderCache
|
||||||
Writer sqlutil.Writer
|
Writer sqlutil.Writer
|
||||||
FederationSenderQueuePDUs tables.FederationSenderQueuePDUs
|
FederationSenderQueuePDUs tables.FederationSenderQueuePDUs
|
||||||
FederationSenderQueueEDUs tables.FederationSenderQueueEDUs
|
FederationSenderQueueEDUs tables.FederationSenderQueueEDUs
|
||||||
|
|
|
@ -69,7 +69,16 @@ func (d *Database) GetNextTransactionEDUs(
|
||||||
nids: nids,
|
nids: nids,
|
||||||
}
|
}
|
||||||
|
|
||||||
blobs, err := d.FederationSenderQueueJSON.SelectQueueJSON(ctx, txn, nids)
|
retrieve := make([]int64, 0, len(nids))
|
||||||
|
for _, nid := range nids {
|
||||||
|
if edu, ok := d.Cache.GetFederationSenderQueuedEDU(nid); ok {
|
||||||
|
edus = append(edus, edu)
|
||||||
|
} else {
|
||||||
|
retrieve = append(retrieve, nid)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
blobs, err := d.FederationSenderQueueJSON.SelectQueueJSON(ctx, txn, retrieve)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("SelectQueueJSON: %w", err)
|
return fmt.Errorf("SelectQueueJSON: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -111,6 +120,7 @@ func (d *Database) CleanEDUs(
|
||||||
}
|
}
|
||||||
if count == 0 {
|
if count == 0 {
|
||||||
deleteNIDs = append(deleteNIDs, nid)
|
deleteNIDs = append(deleteNIDs, nid)
|
||||||
|
d.Cache.EvictFederationSenderQueuedEDU(nid)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -85,17 +85,27 @@ func (d *Database) GetNextTransactionPDUs(
|
||||||
nids: nids,
|
nids: nids,
|
||||||
}
|
}
|
||||||
|
|
||||||
blobs, err := d.FederationSenderQueueJSON.SelectQueueJSON(ctx, txn, nids)
|
retrieve := make([]int64, 0, len(nids))
|
||||||
|
for _, nid := range nids {
|
||||||
|
if event, ok := d.Cache.GetFederationSenderQueuedPDU(nid); ok {
|
||||||
|
events = append(events, event)
|
||||||
|
} else {
|
||||||
|
retrieve = append(retrieve, nid)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
blobs, err := d.FederationSenderQueueJSON.SelectQueueJSON(ctx, txn, retrieve)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("SelectQueueJSON: %w", err)
|
return fmt.Errorf("SelectQueueJSON: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, blob := range blobs {
|
for nid, blob := range blobs {
|
||||||
var event gomatrixserverlib.HeaderedEvent
|
var event gomatrixserverlib.HeaderedEvent
|
||||||
if err := json.Unmarshal(blob, &event); err != nil {
|
if err := json.Unmarshal(blob, &event); err != nil {
|
||||||
return fmt.Errorf("json.Unmarshal: %w", err)
|
return fmt.Errorf("json.Unmarshal: %w", err)
|
||||||
}
|
}
|
||||||
events = append(events, &event)
|
events = append(events, &event)
|
||||||
|
d.Cache.StoreFederationSenderQueuedPDU(nid, &event)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
@ -128,6 +138,7 @@ func (d *Database) CleanPDUs(
|
||||||
}
|
}
|
||||||
if count == 0 {
|
if count == 0 {
|
||||||
deleteNIDs = append(deleteNIDs, nid)
|
deleteNIDs = append(deleteNIDs, nid)
|
||||||
|
d.Cache.EvictFederationSenderQueuedPDU(nid)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -21,6 +21,7 @@ import (
|
||||||
_ "github.com/mattn/go-sqlite3"
|
_ "github.com/mattn/go-sqlite3"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/federationsender/storage/shared"
|
"github.com/matrix-org/dendrite/federationsender/storage/shared"
|
||||||
|
"github.com/matrix-org/dendrite/internal/caching"
|
||||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||||
"github.com/matrix-org/dendrite/setup/config"
|
"github.com/matrix-org/dendrite/setup/config"
|
||||||
)
|
)
|
||||||
|
@ -34,7 +35,7 @@ type Database struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewDatabase opens a new database
|
// NewDatabase opens a new database
|
||||||
func NewDatabase(dbProperties *config.DatabaseOptions) (*Database, error) {
|
func NewDatabase(dbProperties *config.DatabaseOptions, cache caching.FederationSenderCache) (*Database, error) {
|
||||||
var d Database
|
var d Database
|
||||||
var err error
|
var err error
|
||||||
if d.db, err = sqlutil.Open(dbProperties); err != nil {
|
if d.db, err = sqlutil.Open(dbProperties); err != nil {
|
||||||
|
@ -67,6 +68,7 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*Database, error) {
|
||||||
}
|
}
|
||||||
d.Database = shared.Database{
|
d.Database = shared.Database{
|
||||||
DB: d.db,
|
DB: d.db,
|
||||||
|
Cache: cache,
|
||||||
Writer: d.writer,
|
Writer: d.writer,
|
||||||
FederationSenderJoinedHosts: joinedHosts,
|
FederationSenderJoinedHosts: joinedHosts,
|
||||||
FederationSenderQueuePDUs: queuePDUs,
|
FederationSenderQueuePDUs: queuePDUs,
|
||||||
|
|
|
@ -21,16 +21,17 @@ import (
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/federationsender/storage/postgres"
|
"github.com/matrix-org/dendrite/federationsender/storage/postgres"
|
||||||
"github.com/matrix-org/dendrite/federationsender/storage/sqlite3"
|
"github.com/matrix-org/dendrite/federationsender/storage/sqlite3"
|
||||||
|
"github.com/matrix-org/dendrite/internal/caching"
|
||||||
"github.com/matrix-org/dendrite/setup/config"
|
"github.com/matrix-org/dendrite/setup/config"
|
||||||
)
|
)
|
||||||
|
|
||||||
// NewDatabase opens a new database
|
// NewDatabase opens a new database
|
||||||
func NewDatabase(dbProperties *config.DatabaseOptions) (Database, error) {
|
func NewDatabase(dbProperties *config.DatabaseOptions, cache caching.FederationSenderCache) (Database, error) {
|
||||||
switch {
|
switch {
|
||||||
case dbProperties.ConnectionString.IsSQLite():
|
case dbProperties.ConnectionString.IsSQLite():
|
||||||
return sqlite3.NewDatabase(dbProperties)
|
return sqlite3.NewDatabase(dbProperties, cache)
|
||||||
case dbProperties.ConnectionString.IsPostgres():
|
case dbProperties.ConnectionString.IsPostgres():
|
||||||
return postgres.NewDatabase(dbProperties)
|
return postgres.NewDatabase(dbProperties, cache)
|
||||||
default:
|
default:
|
||||||
return nil, fmt.Errorf("unexpected database type")
|
return nil, fmt.Errorf("unexpected database type")
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,14 +18,15 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/federationsender/storage/sqlite3"
|
"github.com/matrix-org/dendrite/federationsender/storage/sqlite3"
|
||||||
|
"github.com/matrix-org/dendrite/internal/caching"
|
||||||
"github.com/matrix-org/dendrite/setup/config"
|
"github.com/matrix-org/dendrite/setup/config"
|
||||||
)
|
)
|
||||||
|
|
||||||
// NewDatabase opens a new database
|
// NewDatabase opens a new database
|
||||||
func NewDatabase(dbProperties *config.DatabaseOptions) (Database, error) {
|
func NewDatabase(dbProperties *config.DatabaseOptions, cache caching.FederationSenderCache) (Database, error) {
|
||||||
switch {
|
switch {
|
||||||
case dbProperties.ConnectionString.IsSQLite():
|
case dbProperties.ConnectionString.IsSQLite():
|
||||||
return sqlite3.NewDatabase(dbProperties)
|
return sqlite3.NewDatabase(dbProperties, cache)
|
||||||
case dbProperties.ConnectionString.IsPostgres():
|
case dbProperties.ConnectionString.IsPostgres():
|
||||||
return nil, fmt.Errorf("can't use Postgres implementation")
|
return nil, fmt.Errorf("can't use Postgres implementation")
|
||||||
default:
|
default:
|
||||||
|
|
|
@ -0,0 +1,67 @@
|
||||||
|
package caching
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
FederationEventCacheName = "federation_event"
|
||||||
|
FederationEventCacheMaxEntries = 256
|
||||||
|
FederationEventCacheMutable = true // to allow use of Unset only
|
||||||
|
)
|
||||||
|
|
||||||
|
// FederationSenderCache contains the subset of functions needed for
|
||||||
|
// a federation event cache.
|
||||||
|
type FederationSenderCache interface {
|
||||||
|
GetFederationSenderQueuedPDU(eventNID int64) (event *gomatrixserverlib.HeaderedEvent, ok bool)
|
||||||
|
StoreFederationSenderQueuedPDU(eventNID int64, event *gomatrixserverlib.HeaderedEvent)
|
||||||
|
EvictFederationSenderQueuedPDU(eventNID int64)
|
||||||
|
|
||||||
|
GetFederationSenderQueuedEDU(eventNID int64) (event *gomatrixserverlib.EDU, ok bool)
|
||||||
|
StoreFederationSenderQueuedEDU(eventNID int64, event *gomatrixserverlib.EDU)
|
||||||
|
EvictFederationSenderQueuedEDU(eventNID int64)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c Caches) GetFederationSenderQueuedPDU(eventNID int64) (*gomatrixserverlib.HeaderedEvent, bool) {
|
||||||
|
key := fmt.Sprintf("%d", eventNID)
|
||||||
|
val, found := c.FederationEvents.Get(key)
|
||||||
|
if found && val != nil {
|
||||||
|
if event, ok := val.(*gomatrixserverlib.HeaderedEvent); ok {
|
||||||
|
return event, true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c Caches) StoreFederationSenderQueuedPDU(eventNID int64, event *gomatrixserverlib.HeaderedEvent) {
|
||||||
|
key := fmt.Sprintf("%d", eventNID)
|
||||||
|
c.FederationEvents.Set(key, event)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c Caches) EvictFederationSenderQueuedPDU(eventNID int64) {
|
||||||
|
key := fmt.Sprintf("%d", eventNID)
|
||||||
|
c.FederationEvents.Unset(key)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c Caches) GetFederationSenderQueuedEDU(eventNID int64) (*gomatrixserverlib.EDU, bool) {
|
||||||
|
key := fmt.Sprintf("%d", eventNID)
|
||||||
|
val, found := c.FederationEvents.Get(key)
|
||||||
|
if found && val != nil {
|
||||||
|
if event, ok := val.(*gomatrixserverlib.EDU); ok {
|
||||||
|
return event, true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c Caches) StoreFederationSenderQueuedEDU(eventNID int64, event *gomatrixserverlib.EDU) {
|
||||||
|
key := fmt.Sprintf("%d", eventNID)
|
||||||
|
c.FederationEvents.Set(key, event)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c Caches) EvictFederationSenderQueuedEDU(eventNID int64) {
|
||||||
|
key := fmt.Sprintf("%d", eventNID)
|
||||||
|
c.FederationEvents.Unset(key)
|
||||||
|
}
|
|
@ -10,6 +10,7 @@ type Caches struct {
|
||||||
RoomServerEventTypeNIDs Cache // RoomServerNIDsCache
|
RoomServerEventTypeNIDs Cache // RoomServerNIDsCache
|
||||||
RoomServerRoomNIDs Cache // RoomServerNIDsCache
|
RoomServerRoomNIDs Cache // RoomServerNIDsCache
|
||||||
RoomServerRoomIDs Cache // RoomServerNIDsCache
|
RoomServerRoomIDs Cache // RoomServerNIDsCache
|
||||||
|
FederationEvents Cache // FederationEventsCache
|
||||||
}
|
}
|
||||||
|
|
||||||
// Cache is the interface that an implementation must satisfy.
|
// Cache is the interface that an implementation must satisfy.
|
||||||
|
|
|
@ -63,6 +63,15 @@ func NewInMemoryLRUCache(enablePrometheus bool) (*Caches, error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
federationEvents, err := NewInMemoryLRUCachePartition(
|
||||||
|
FederationEventCacheName,
|
||||||
|
FederationEventCacheMutable,
|
||||||
|
FederationEventCacheMaxEntries,
|
||||||
|
enablePrometheus,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
return &Caches{
|
return &Caches{
|
||||||
RoomVersions: roomVersions,
|
RoomVersions: roomVersions,
|
||||||
ServerKeys: serverKeys,
|
ServerKeys: serverKeys,
|
||||||
|
@ -70,6 +79,7 @@ func NewInMemoryLRUCache(enablePrometheus bool) (*Caches, error) {
|
||||||
RoomServerEventTypeNIDs: roomServerEventTypeNIDs,
|
RoomServerEventTypeNIDs: roomServerEventTypeNIDs,
|
||||||
RoomServerRoomNIDs: roomServerRoomNIDs,
|
RoomServerRoomNIDs: roomServerRoomNIDs,
|
||||||
RoomServerRoomIDs: roomServerRoomIDs,
|
RoomServerRoomIDs: roomServerRoomIDs,
|
||||||
|
FederationEvents: federationEvents,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue