2017-09-08 14:17:12 +00:00
// Copyright 2017 Vector Creations Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
2017-10-11 17:16:53 +00:00
package routing
2017-06-07 13:32:53 +00:00
import (
2017-09-13 10:03:41 +00:00
"context"
2017-06-07 13:32:53 +00:00
"encoding/json"
"fmt"
2017-08-23 14:13:47 +00:00
"net/http"
2020-09-28 10:32:59 +00:00
"sync"
2020-09-07 11:32:40 +00:00
"time"
2017-08-23 14:13:47 +00:00
2017-06-07 13:32:53 +00:00
"github.com/matrix-org/dendrite/clientapi/jsonerror"
2020-06-10 11:17:54 +00:00
eduserverAPI "github.com/matrix-org/dendrite/eduserver/api"
2020-05-21 13:40:13 +00:00
"github.com/matrix-org/dendrite/internal/config"
2020-08-05 12:41:16 +00:00
keyapi "github.com/matrix-org/dendrite/keyserver/api"
2017-06-07 13:32:53 +00:00
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
2020-05-05 14:48:37 +00:00
"github.com/sirupsen/logrus"
2017-06-07 13:32:53 +00:00
)
// Send implements /_matrix/federation/v1/send/{txnID}
func Send (
2017-09-04 12:14:01 +00:00
httpReq * http . Request ,
request * gomatrixserverlib . FederationRequest ,
2017-06-07 13:32:53 +00:00
txnID gomatrixserverlib . TransactionID ,
2020-08-10 13:18:04 +00:00
cfg * config . FederationAPI ,
2020-05-01 09:48:17 +00:00
rsAPI api . RoomserverInternalAPI ,
2020-06-10 11:17:54 +00:00
eduAPI eduserverAPI . EDUServerInputAPI ,
2020-08-05 12:41:16 +00:00
keyAPI keyapi . KeyInternalAPI ,
2020-06-15 15:57:59 +00:00
keys gomatrixserverlib . JSONVerifier ,
2017-06-07 13:32:53 +00:00
federation * gomatrixserverlib . FederationClient ,
) util . JSONResponse {
t := txnReq {
2020-06-10 11:17:54 +00:00
rsAPI : rsAPI ,
eduAPI : eduAPI ,
keys : keys ,
federation : federation ,
haveEvents : make ( map [ string ] * gomatrixserverlib . HeaderedEvent ) ,
newEvents : make ( map [ string ] bool ) ,
2020-08-05 12:41:16 +00:00
keyAPI : keyAPI ,
2017-06-07 13:32:53 +00:00
}
2020-03-27 16:28:22 +00:00
var txnEvents struct {
2020-03-30 15:40:28 +00:00
PDUs [ ] json . RawMessage ` json:"pdus" `
EDUs [ ] gomatrixserverlib . EDU ` json:"edus" `
2020-03-27 16:28:22 +00:00
}
if err := json . Unmarshal ( request . Content ( ) , & txnEvents ) ; err != nil {
2017-06-07 13:32:53 +00:00
return util . JSONResponse {
2018-03-13 15:55:45 +00:00
Code : http . StatusBadRequest ,
2017-08-23 14:13:47 +00:00
JSON : jsonerror . NotJSON ( "The request body could not be decoded into valid JSON. " + err . Error ( ) ) ,
2017-06-07 13:32:53 +00:00
}
}
2020-06-23 12:15:15 +00:00
// Transactions are limited in size; they can have at most 50 PDUs and 100 EDUs.
// https://matrix.org/docs/spec/server_server/latest#transactions
if len ( txnEvents . PDUs ) > 50 || len ( txnEvents . EDUs ) > 100 {
return util . JSONResponse {
Code : http . StatusBadRequest ,
JSON : jsonerror . BadJSON ( "max 50 pdus / 100 edus" ) ,
}
}
2017-06-07 13:32:53 +00:00
2020-03-30 15:40:28 +00:00
// TODO: Really we should have a function to convert FederationRequest to txnReq
2020-03-27 16:28:22 +00:00
t . PDUs = txnEvents . PDUs
2020-03-30 15:40:28 +00:00
t . EDUs = txnEvents . EDUs
2017-06-07 13:32:53 +00:00
t . Origin = request . Origin ( )
t . TransactionID = txnID
2017-06-19 14:21:04 +00:00
t . Destination = cfg . Matrix . ServerName
2017-06-07 13:32:53 +00:00
2020-03-27 16:28:22 +00:00
util . GetLogger ( httpReq . Context ( ) ) . Infof ( "Received transaction %q containing %d PDUs, %d EDUs" , txnID , len ( t . PDUs ) , len ( t . EDUs ) )
2020-09-07 11:32:40 +00:00
resp , jsonErr := t . processTransaction ( httpReq . Context ( ) )
2020-06-23 12:15:15 +00:00
if jsonErr != nil {
util . GetLogger ( httpReq . Context ( ) ) . WithField ( "jsonErr" , jsonErr ) . Error ( "t.processTransaction failed" )
return * jsonErr
2017-06-07 13:32:53 +00:00
}
2020-05-13 12:01:45 +00:00
// https://matrix.org/docs/spec/server_server/r0.1.3#put-matrix-federation-v1-send-txnid
// Status code 200:
// The result of processing the transaction. The server is to use this response
// even in the event of one or more PDUs failing to be processed.
2017-06-07 13:32:53 +00:00
return util . JSONResponse {
2020-05-13 12:01:45 +00:00
Code : http . StatusOK ,
JSON : resp ,
2017-06-07 13:32:53 +00:00
}
}
type txnReq struct {
gomatrixserverlib . Transaction
2020-06-10 11:17:54 +00:00
rsAPI api . RoomserverInternalAPI
eduAPI eduserverAPI . EDUServerInputAPI
2020-08-05 12:41:16 +00:00
keyAPI keyapi . KeyInternalAPI
2020-06-10 11:17:54 +00:00
keys gomatrixserverlib . JSONVerifier
federation txnFederationClient
2020-05-12 15:24:28 +00:00
// local cache of events for auth checks, etc - this may include events
// which the roomserver is unaware of.
haveEvents map [ string ] * gomatrixserverlib . HeaderedEvent
// new events which the roomserver does not know about
newEvents map [ string ] bool
2020-05-06 13:27:02 +00:00
}
// A subset of FederationClient functionality that txn requires. Useful for testing.
type txnFederationClient interface {
LookupState ( ctx context . Context , s gomatrixserverlib . ServerName , roomID string , eventID string , roomVersion gomatrixserverlib . RoomVersion ) (
res gomatrixserverlib . RespState , err error ,
)
LookupStateIDs ( ctx context . Context , s gomatrixserverlib . ServerName , roomID string , eventID string ) ( res gomatrixserverlib . RespStateIDs , err error )
GetEvent ( ctx context . Context , s gomatrixserverlib . ServerName , eventID string ) ( res gomatrixserverlib . Transaction , err error )
2020-05-12 15:24:28 +00:00
LookupMissingEvents ( ctx context . Context , s gomatrixserverlib . ServerName , roomID string , missing gomatrixserverlib . MissingEvents ,
roomVersion gomatrixserverlib . RoomVersion ) ( res gomatrixserverlib . RespMissingEvents , err error )
2017-06-07 13:32:53 +00:00
}
2020-09-07 11:32:40 +00:00
func ( t * txnReq ) processTransaction ( ctx context . Context ) ( * gomatrixserverlib . RespSend , * util . JSONResponse ) {
2020-04-16 16:59:55 +00:00
results := make ( map [ string ] gomatrixserverlib . PDUResult )
2020-05-27 10:16:27 +00:00
pdus := [ ] gomatrixserverlib . HeaderedEvent { }
2020-03-27 16:28:22 +00:00
for _ , pdu := range t . PDUs {
var header struct {
RoomID string ` json:"room_id" `
}
if err := json . Unmarshal ( pdu , & header ) ; err != nil {
2020-09-07 11:32:40 +00:00
util . GetLogger ( ctx ) . WithError ( err ) . Warn ( "Transaction: Failed to extract room ID from event" )
2020-05-13 12:01:45 +00:00
// We don't know the event ID at this point so we can't return the
// failure in the PDU results
continue
2020-03-27 16:28:22 +00:00
}
verReq := api . QueryRoomVersionForRoomRequest { RoomID : header . RoomID }
verRes := api . QueryRoomVersionForRoomResponse { }
2020-09-07 11:32:40 +00:00
if err := t . rsAPI . QueryRoomVersionForRoom ( ctx , & verReq , & verRes ) ; err != nil {
util . GetLogger ( ctx ) . WithError ( err ) . Warn ( "Transaction: Failed to query room version for room" , verReq . RoomID )
2020-05-13 12:01:45 +00:00
// We don't know the event ID at this point so we can't return the
// failure in the PDU results
continue
2020-03-27 16:28:22 +00:00
}
event , err := gomatrixserverlib . NewEventFromUntrustedJSON ( pdu , verRes . RoomVersion )
if err != nil {
2020-06-23 12:15:15 +00:00
if _ , ok := err . ( gomatrixserverlib . BadJSONError ) ; ok {
// Room version 6 states that homeservers should strictly enforce canonical JSON
// on PDUs.
//
// This enforces that the entire transaction is rejected if a single bad PDU is
// sent. It is unclear if this is the correct behaviour or not.
//
// See https://github.com/matrix-org/synapse/issues/7543
return nil , & util . JSONResponse {
Code : 400 ,
JSON : jsonerror . BadJSON ( "PDU contains bad JSON" ) ,
}
2020-05-13 12:01:45 +00:00
}
2020-09-07 11:32:40 +00:00
util . GetLogger ( ctx ) . WithError ( err ) . Warnf ( "Transaction: Failed to parse event JSON of event %s" , string ( pdu ) )
2020-05-13 12:01:45 +00:00
continue
2020-03-27 16:28:22 +00:00
}
2020-09-07 11:32:40 +00:00
if api . IsServerBannedFromRoom ( ctx , t . rsAPI , event . RoomID ( ) , t . Origin ) {
2020-08-11 17:19:11 +00:00
results [ event . EventID ( ) ] = gomatrixserverlib . PDUResult {
Error : "Forbidden by server ACLs" ,
}
continue
}
2020-09-07 11:32:40 +00:00
if err = gomatrixserverlib . VerifyAllEventSignatures ( ctx , [ ] gomatrixserverlib . Event { event } , t . keys ) ; err != nil {
util . GetLogger ( ctx ) . WithError ( err ) . Warnf ( "Transaction: Couldn't validate signature of event %q" , event . EventID ( ) )
2020-05-13 12:01:45 +00:00
results [ event . EventID ( ) ] = gomatrixserverlib . PDUResult {
Error : err . Error ( ) ,
}
continue
2020-03-27 16:28:22 +00:00
}
pdus = append ( pdus , event . Headered ( verRes . RoomVersion ) )
2017-06-07 13:32:53 +00:00
}
// Process the events.
2020-03-27 16:28:22 +00:00
for _ , e := range pdus {
2020-09-07 11:32:40 +00:00
if err := t . processEvent ( ctx , e . Unwrap ( ) , true ) ; err != nil {
2017-06-07 13:32:53 +00:00
// If the error is due to the event itself being bad then we skip
// it and move onto the next event. We report an error so that the
// sender knows that we have skipped processing it.
//
// However if the event is due to a temporary failure in our server
// such as a database being unavailable then we should bail, and
// hope that the sender will retry when we are feeling better.
//
// It is uncertain what we should do if an event fails because
// we failed to fetch more information from the sending server.
// For example if a request to /state fails.
// If we skip the event then we risk missing the event until we
// receive another event referencing it.
// If we bail and stop processing then we risk wedging incoming
// transactions from that server forever.
2020-05-13 12:01:45 +00:00
if isProcessingErrorFatal ( err ) {
2017-06-07 13:32:53 +00:00
// Any other error should be the result of a temporary error in
// our server so we should bail processing the transaction entirely.
2020-09-07 11:32:40 +00:00
util . GetLogger ( ctx ) . Warnf ( "Processing %s failed fatally: %s" , e . EventID ( ) , err )
2020-06-23 12:15:15 +00:00
jsonErr := util . ErrorResponse ( err )
return nil , & jsonErr
2020-05-13 12:01:45 +00:00
} else {
2020-06-23 09:31:17 +00:00
// Auth errors mean the event is 'rejected' which have to be silent to appease sytest
2020-09-29 12:40:29 +00:00
errMsg := ""
2020-06-23 09:31:17 +00:00
_ , rejected := err . ( * gomatrixserverlib . NotAllowed )
2020-09-29 12:40:29 +00:00
if ! rejected {
errMsg = err . Error ( )
2020-06-23 09:31:17 +00:00
}
2020-09-07 11:32:40 +00:00
util . GetLogger ( ctx ) . WithError ( err ) . WithField ( "event_id" , e . EventID ( ) ) . WithField ( "rejected" , rejected ) . Warn (
2020-06-23 09:31:17 +00:00
"Failed to process incoming federation event, skipping" ,
)
2020-05-13 12:01:45 +00:00
results [ e . EventID ( ) ] = gomatrixserverlib . PDUResult {
2020-06-23 09:31:17 +00:00
Error : errMsg ,
2020-05-13 12:01:45 +00:00
}
2017-06-07 13:32:53 +00:00
}
} else {
results [ e . EventID ( ) ] = gomatrixserverlib . PDUResult { }
}
}
2020-09-07 11:32:40 +00:00
t . processEDUs ( ctx )
2020-08-07 14:00:23 +00:00
if c := len ( results ) ; c > 0 {
2020-09-07 11:32:40 +00:00
util . GetLogger ( ctx ) . Infof ( "Processed %d PDUs from transaction %q" , c , t . TransactionID )
2020-08-07 14:00:23 +00:00
}
2017-06-07 13:32:53 +00:00
return & gomatrixserverlib . RespSend { PDUs : results } , nil
}
2020-05-13 12:01:45 +00:00
// isProcessingErrorFatal returns true if the error is really bad and
// we should stop processing the transaction, and returns false if it
// is just some less serious error about a specific event.
func isProcessingErrorFatal ( err error ) bool {
switch err . ( type ) {
case roomNotFoundError :
case * gomatrixserverlib . NotAllowed :
case missingPrevEventsError :
default :
switch err {
case context . Canceled :
case context . DeadlineExceeded :
default :
return true
}
}
return false
}
2020-04-16 16:59:55 +00:00
type roomNotFoundError struct {
2017-06-07 13:32:53 +00:00
roomID string
}
2020-04-16 16:59:55 +00:00
type unmarshalError struct {
err error
}
type verifySigError struct {
eventID string
err error
}
2020-05-12 15:24:28 +00:00
type missingPrevEventsError struct {
eventID string
err error
}
2017-06-07 13:32:53 +00:00
2020-04-16 16:59:55 +00:00
func ( e roomNotFoundError ) Error ( ) string { return fmt . Sprintf ( "room %q not found" , e . roomID ) }
func ( e unmarshalError ) Error ( ) string { return fmt . Sprintf ( "unable to parse event: %s" , e . err ) }
func ( e verifySigError ) Error ( ) string {
return fmt . Sprintf ( "unable to verify signature of event %q: %s" , e . eventID , e . err )
}
2020-05-12 15:24:28 +00:00
func ( e missingPrevEventsError ) Error ( ) string {
return fmt . Sprintf ( "unable to get prev_events for event %q: %s" , e . eventID , e . err )
}
func ( t * txnReq ) haveEventIDs ( ) map [ string ] bool {
result := make ( map [ string ] bool , len ( t . haveEvents ) )
for eventID := range t . haveEvents {
if t . newEvents [ eventID ] {
continue
}
result [ eventID ] = true
}
return result
}
2017-06-07 13:32:53 +00:00
2020-09-07 11:32:40 +00:00
// nolint:gocyclo
func ( t * txnReq ) processEDUs ( ctx context . Context ) {
for _ , e := range t . EDUs {
2020-03-30 15:40:28 +00:00
switch e . Type {
case gomatrixserverlib . MTyping :
// https://matrix.org/docs/spec/server_server/latest#typing-notifications
var typingPayload struct {
RoomID string ` json:"room_id" `
UserID string ` json:"user_id" `
Typing bool ` json:"typing" `
}
if err := json . Unmarshal ( e . Content , & typingPayload ) ; err != nil {
2020-09-07 11:32:40 +00:00
util . GetLogger ( ctx ) . WithError ( err ) . Error ( "Failed to unmarshal typing event" )
2020-03-30 15:40:28 +00:00
continue
}
2020-09-07 11:32:40 +00:00
if err := eduserverAPI . SendTyping ( ctx , t . eduAPI , typingPayload . UserID , typingPayload . RoomID , typingPayload . Typing , 30 * 1000 ) ; err != nil {
util . GetLogger ( ctx ) . WithError ( err ) . Error ( "Failed to send typing event to edu server" )
2020-03-30 15:40:28 +00:00
}
Send-to-device support (#1072)
* Groundwork for send-to-device messaging
* Update sample config
* Add unstable routing for now
* Send to device consumer in sync API
* Start the send-to-device consumer
* fix indentation in dendrite-config.yaml
* Create send-to-device database tables, other tweaks
* Add some logic for send-to-device messages, add them into sync stream
* Handle incoming send-to-device messages, count them with EDU stream pos
* Undo changes to test
* pq.Array
* Fix sync
* Logging
* Fix a couple of transaction things, fix client API
* Add send-to-device test, hopefully fix bugs
* Comments
* Refactor a bit
* Fix schema
* Fix queries
* Debug logging
* Fix storing and retrieving of send-to-device messages
* Try to avoid database locks
* Update sync position
* Use latest sync position
* Jiggle about sync a bit
* Fix tests
* Break out the retrieval from the update/delete behaviour
* Comments
* nolint on getResponseWithPDUsForCompleteSync
* Try to line up sync tokens again
* Implement wildcard
* Add all send-to-device tests to whitelist, what could possibly go wrong?
* Only care about wildcard when targeted locally
* Deduplicate transactions
* Handle tokens properly, return immediately if waiting send-to-device messages
* Fix sync
* Update sytest-whitelist
* Fix copyright notice (need to do more of this)
* Comments, copyrights
* Return errors from Do, fix dendritejs
* Review comments
* Comments
* Constructor for TransactionWriter
* defletions
* Update gomatrixserverlib, sytest-blacklist
2020-06-01 16:50:19 +00:00
case gomatrixserverlib . MDirectToDevice :
// https://matrix.org/docs/spec/server_server/r0.1.3#m-direct-to-device-schema
var directPayload gomatrixserverlib . ToDeviceMessage
if err := json . Unmarshal ( e . Content , & directPayload ) ; err != nil {
2020-09-07 11:32:40 +00:00
util . GetLogger ( ctx ) . WithError ( err ) . Error ( "Failed to unmarshal send-to-device events" )
Send-to-device support (#1072)
* Groundwork for send-to-device messaging
* Update sample config
* Add unstable routing for now
* Send to device consumer in sync API
* Start the send-to-device consumer
* fix indentation in dendrite-config.yaml
* Create send-to-device database tables, other tweaks
* Add some logic for send-to-device messages, add them into sync stream
* Handle incoming send-to-device messages, count them with EDU stream pos
* Undo changes to test
* pq.Array
* Fix sync
* Logging
* Fix a couple of transaction things, fix client API
* Add send-to-device test, hopefully fix bugs
* Comments
* Refactor a bit
* Fix schema
* Fix queries
* Debug logging
* Fix storing and retrieving of send-to-device messages
* Try to avoid database locks
* Update sync position
* Use latest sync position
* Jiggle about sync a bit
* Fix tests
* Break out the retrieval from the update/delete behaviour
* Comments
* nolint on getResponseWithPDUsForCompleteSync
* Try to line up sync tokens again
* Implement wildcard
* Add all send-to-device tests to whitelist, what could possibly go wrong?
* Only care about wildcard when targeted locally
* Deduplicate transactions
* Handle tokens properly, return immediately if waiting send-to-device messages
* Fix sync
* Update sytest-whitelist
* Fix copyright notice (need to do more of this)
* Comments, copyrights
* Return errors from Do, fix dendritejs
* Review comments
* Comments
* Constructor for TransactionWriter
* defletions
* Update gomatrixserverlib, sytest-blacklist
2020-06-01 16:50:19 +00:00
continue
}
for userID , byUser := range directPayload . Messages {
for deviceID , message := range byUser {
// TODO: check that the user and the device actually exist here
2020-09-07 11:32:40 +00:00
if err := eduserverAPI . SendToDevice ( ctx , t . eduAPI , directPayload . Sender , userID , deviceID , directPayload . Type , message ) ; err != nil {
util . GetLogger ( ctx ) . WithError ( err ) . WithFields ( logrus . Fields {
Send-to-device support (#1072)
* Groundwork for send-to-device messaging
* Update sample config
* Add unstable routing for now
* Send to device consumer in sync API
* Start the send-to-device consumer
* fix indentation in dendrite-config.yaml
* Create send-to-device database tables, other tweaks
* Add some logic for send-to-device messages, add them into sync stream
* Handle incoming send-to-device messages, count them with EDU stream pos
* Undo changes to test
* pq.Array
* Fix sync
* Logging
* Fix a couple of transaction things, fix client API
* Add send-to-device test, hopefully fix bugs
* Comments
* Refactor a bit
* Fix schema
* Fix queries
* Debug logging
* Fix storing and retrieving of send-to-device messages
* Try to avoid database locks
* Update sync position
* Use latest sync position
* Jiggle about sync a bit
* Fix tests
* Break out the retrieval from the update/delete behaviour
* Comments
* nolint on getResponseWithPDUsForCompleteSync
* Try to line up sync tokens again
* Implement wildcard
* Add all send-to-device tests to whitelist, what could possibly go wrong?
* Only care about wildcard when targeted locally
* Deduplicate transactions
* Handle tokens properly, return immediately if waiting send-to-device messages
* Fix sync
* Update sytest-whitelist
* Fix copyright notice (need to do more of this)
* Comments, copyrights
* Return errors from Do, fix dendritejs
* Review comments
* Comments
* Constructor for TransactionWriter
* defletions
* Update gomatrixserverlib, sytest-blacklist
2020-06-01 16:50:19 +00:00
"sender" : directPayload . Sender ,
"user_id" : userID ,
"device_id" : deviceID ,
} ) . Error ( "Failed to send send-to-device event to edu server" )
}
}
}
2020-08-05 12:41:16 +00:00
case gomatrixserverlib . MDeviceListUpdate :
2020-09-07 11:32:40 +00:00
t . processDeviceListUpdate ( ctx , e )
2020-03-30 15:40:28 +00:00
default :
2020-09-07 11:32:40 +00:00
util . GetLogger ( ctx ) . WithField ( "type" , e . Type ) . Debug ( "Unhandled EDU" )
2020-03-30 15:40:28 +00:00
}
}
}
2020-09-07 11:32:40 +00:00
func ( t * txnReq ) processDeviceListUpdate ( ctx context . Context , e gomatrixserverlib . EDU ) {
2020-08-05 12:41:16 +00:00
var payload gomatrixserverlib . DeviceListUpdateEvent
if err := json . Unmarshal ( e . Content , & payload ) ; err != nil {
2020-09-07 11:32:40 +00:00
util . GetLogger ( ctx ) . WithError ( err ) . Error ( "Failed to unmarshal device list update event" )
2020-08-05 12:41:16 +00:00
return
}
var inputRes keyapi . InputDeviceListUpdateResponse
t . keyAPI . InputDeviceListUpdate ( context . Background ( ) , & keyapi . InputDeviceListUpdateRequest {
Event : payload ,
} , & inputRes )
if inputRes . Error != nil {
2020-09-07 11:32:40 +00:00
util . GetLogger ( ctx ) . WithError ( inputRes . Error ) . WithField ( "user_id" , payload . UserID ) . Error ( "failed to InputDeviceListUpdate" )
2020-08-05 12:41:16 +00:00
}
}
2020-09-07 11:32:40 +00:00
func ( t * txnReq ) processEvent ( ctx context . Context , e gomatrixserverlib . Event , isInboundTxn bool ) error {
2020-09-29 12:40:29 +00:00
logger := util . GetLogger ( ctx ) . WithField ( "event_id" , e . EventID ( ) ) . WithField ( "room_id" , e . RoomID ( ) )
2017-06-07 13:32:53 +00:00
2020-09-29 12:40:29 +00:00
// Work out if the roomserver knows everything it needs to know to auth
// the event.
stateReq := api . QueryMissingAuthPrevEventsRequest {
2017-06-07 13:32:53 +00:00
RoomID : e . RoomID ( ) ,
2020-09-29 12:40:29 +00:00
AuthEventIDs : e . AuthEventIDs ( ) ,
PrevEventIDs : e . PrevEventIDs ( ) ,
2017-06-07 13:32:53 +00:00
}
2020-09-29 12:40:29 +00:00
var stateResp api . QueryMissingAuthPrevEventsResponse
if err := t . rsAPI . QueryMissingAuthPrevEvents ( ctx , & stateReq , & stateResp ) ; err != nil {
2017-06-07 13:32:53 +00:00
return err
}
if ! stateResp . RoomExists {
// TODO: When synapse receives a message for a room it is not in it
// asks the remote server for the state of the room so that it can
// check if the remote server knows of a join "m.room.member" event
// that this server is unaware of.
// However generally speaking we should reject events for rooms we
// aren't a member of.
2020-04-16 16:59:55 +00:00
return roomNotFoundError { e . RoomID ( ) }
2017-06-07 13:32:53 +00:00
}
2020-09-29 12:40:29 +00:00
if len ( stateResp . MissingAuthEventIDs ) > 0 {
logger . Infof ( "Event refers to %d unknown auth_events" , len ( stateResp . MissingAuthEventIDs ) )
servers := [ ] gomatrixserverlib . ServerName { t . Origin }
serverReq := & api . QueryServerJoinedToRoomRequest {
RoomID : e . RoomID ( ) ,
}
serverRes := & api . QueryServerJoinedToRoomResponse { }
if err := t . rsAPI . QueryServerJoinedToRoom ( ctx , serverReq , serverRes ) ; err == nil {
servers = append ( servers , serverRes . ServerNames ... )
logger . Infof ( "Found %d server(s) to query for missing events" , len ( servers ) )
}
getAuthEvent :
for _ , missingAuthEventID := range stateResp . MissingAuthEventIDs {
for _ , server := range servers {
logger . Infof ( "Retrieving missing auth event %q from %q" , missingAuthEventID , server )
tx , err := t . federation . GetEvent ( ctx , server , missingAuthEventID )
if err != nil {
continue // try the next server
}
ev , err := gomatrixserverlib . NewEventFromUntrustedJSON ( tx . PDUs [ 0 ] , stateResp . RoomVersion )
if err != nil {
logger . WithError ( err ) . Errorf ( "Failed to unmarshal auth event %q" , missingAuthEventID )
continue // try the next server
}
if err = api . SendInputRoomEvents (
context . Background ( ) ,
t . rsAPI ,
[ ] api . InputRoomEvent {
{
Kind : api . KindOutlier ,
Event : ev . Headered ( stateResp . RoomVersion ) ,
AuthEventIDs : ev . AuthEventIDs ( ) ,
SendAsServer : api . DoNotSendToOtherServers ,
} ,
} ,
) ; err != nil {
logger . WithError ( err ) . Errorf ( "Failed to send auth event %q to roomserver" , missingAuthEventID )
continue getAuthEvent // move onto the next event
}
}
}
}
if len ( stateResp . MissingPrevEventIDs ) > 0 {
logger . Infof ( "Event refers to %d unknown prev_events" , len ( stateResp . MissingPrevEventIDs ) )
2020-09-07 11:32:40 +00:00
return t . processEventWithMissingState ( ctx , e , stateResp . RoomVersion , isInboundTxn )
2017-06-07 13:32:53 +00:00
}
2020-09-16 12:00:52 +00:00
// pass the event to the roomserver which will do auth checks
// If the event fail auth checks, gmsl.NotAllowed error will be returned which we be silently
// discarded by the caller of this function
2020-09-03 14:22:16 +00:00
return api . SendEvents (
2020-09-07 11:32:40 +00:00
context . Background ( ) ,
t . rsAPI ,
2020-03-27 16:28:22 +00:00
[ ] gomatrixserverlib . HeaderedEvent {
e . Headered ( stateResp . RoomVersion ) ,
} ,
api . DoNotSendToOtherServers ,
nil ,
)
2017-06-07 13:32:53 +00:00
}
func checkAllowedByState ( e gomatrixserverlib . Event , stateEvents [ ] gomatrixserverlib . Event ) error {
authUsingState := gomatrixserverlib . NewAuthEvents ( nil )
for i := range stateEvents {
2017-09-20 09:59:19 +00:00
err := authUsingState . AddEvent ( & stateEvents [ i ] )
if err != nil {
return err
}
2017-06-07 13:32:53 +00:00
}
return gomatrixserverlib . Allowed ( e , & authUsingState )
}
2020-09-07 11:32:40 +00:00
func ( t * txnReq ) processEventWithMissingState ( ctx context . Context , e gomatrixserverlib . Event , roomVersion gomatrixserverlib . RoomVersion , isInboundTxn bool ) error {
// Do this with a fresh context, so that we keep working even if the
// original request times out. With any luck, by the time the remote
// side retries, we'll have fetched the missing state.
gmectx , cancel := context . WithTimeout ( context . Background ( ) , time . Minute * 5 )
defer cancel ( )
2017-06-07 13:32:53 +00:00
// We are missing the previous events for this events.
// This means that there is a gap in our view of the history of the
// room. There two ways that we can handle such a gap:
// 1) We can fill in the gap using /get_missing_events
// 2) We can leave the gap and request the state of the room at
// this event from the remote server using either /state_ids
// or /state.
// Synapse will attempt to do 1 and if that fails or if the gap is
// too large then it will attempt 2.
2017-06-12 17:30:47 +00:00
// Synapse will use /state_ids if possible since usually the state
2017-06-07 13:32:53 +00:00
// is largely unchanged and it is more efficient to fetch a list of
// event ids and then use /event to fetch the individual events.
// However not all version of synapse support /state_ids so you may
// need to fallback to /state.
2020-05-05 14:48:37 +00:00
2020-05-12 15:24:28 +00:00
// Attempt to fill in the gap using /get_missing_events
// This will either:
// - fill in the gap completely then process event `e` returning no backwards extremity
// - fail to fill in the gap and tell us to terminate the transaction err=not nil
// - fail to fill in the gap and tell us to fetch state at the new backwards extremity, and to not terminate the transaction
2020-09-07 11:32:40 +00:00
backwardsExtremity , err := t . getMissingEvents ( gmectx , e , roomVersion , isInboundTxn )
2017-06-07 13:32:53 +00:00
if err != nil {
2020-05-12 15:24:28 +00:00
return err
}
if backwardsExtremity == nil {
// we filled in the gap!
return nil
}
// at this point we know we're going to have a gap: we need to work out the room state at the new backwards extremity.
// security: we have to do state resolution on the new backwards extremity (TODO: WHY)
// Therefore, we cannot just query /state_ids with this event to get the state before. Instead, we need to query
2020-06-29 13:39:21 +00:00
// the state AFTER all the prev_events for this event, then apply state resolution to that to get the state before the event.
2020-05-12 15:24:28 +00:00
var states [ ] * gomatrixserverlib . RespState
needed := gomatrixserverlib . StateNeededForAuth ( [ ] gomatrixserverlib . Event { * backwardsExtremity } ) . Tuples ( )
for _ , prevEventID := range backwardsExtremity . PrevEventIDs ( ) {
var prevState * gomatrixserverlib . RespState
2020-09-07 11:32:40 +00:00
prevState , err = t . lookupStateAfterEvent ( gmectx , roomVersion , backwardsExtremity . RoomID ( ) , prevEventID , needed )
2020-05-05 14:48:37 +00:00
if err != nil {
2020-09-07 11:32:40 +00:00
util . GetLogger ( ctx ) . WithError ( err ) . Errorf ( "Failed to lookup state after prev_event: %s" , prevEventID )
2020-05-05 14:48:37 +00:00
return err
}
2020-05-12 15:24:28 +00:00
states = append ( states , prevState )
}
2020-09-07 11:32:40 +00:00
resolvedState , err := t . resolveStatesAndCheck ( gmectx , roomVersion , states , backwardsExtremity )
2020-05-12 15:24:28 +00:00
if err != nil {
2020-09-07 11:32:40 +00:00
util . GetLogger ( ctx ) . WithError ( err ) . Errorf ( "Failed to resolve state conflicts for event %s" , backwardsExtremity . EventID ( ) )
2020-05-12 15:24:28 +00:00
return err
}
// pass the event along with the state to the roomserver using a background context so we don't
// needlessly expire
2020-06-10 11:17:54 +00:00
return api . SendEventWithState ( context . Background ( ) , t . rsAPI , resolvedState , e . Headered ( roomVersion ) , t . haveEventIDs ( ) )
2020-05-12 15:24:28 +00:00
}
// lookupStateAfterEvent returns the room state after `eventID`, which is the state before eventID with the state of `eventID` (if it's a state event)
// added into the mix.
2020-09-07 11:32:40 +00:00
func ( t * txnReq ) lookupStateAfterEvent ( ctx context . Context , roomVersion gomatrixserverlib . RoomVersion , roomID , eventID string , needed [ ] gomatrixserverlib . StateKeyTuple ) ( * gomatrixserverlib . RespState , error ) {
2020-05-12 15:24:28 +00:00
// try doing all this locally before we resort to querying federation
2020-09-07 11:32:40 +00:00
respState := t . lookupStateAfterEventLocally ( ctx , roomID , eventID , needed )
2020-05-12 15:24:28 +00:00
if respState != nil {
return respState , nil
}
2020-09-07 11:32:40 +00:00
respState , err := t . lookupStateBeforeEvent ( ctx , roomVersion , roomID , eventID )
2020-05-12 15:24:28 +00:00
if err != nil {
return nil , err
}
// fetch the event we're missing and add it to the pile
2020-09-07 11:32:40 +00:00
h , err := t . lookupEvent ( ctx , roomVersion , eventID , false )
2020-09-08 09:28:13 +00:00
switch err . ( type ) {
case verifySigError :
return respState , nil
case nil :
// do nothing
default :
2020-05-12 15:24:28 +00:00
return nil , err
}
t . haveEvents [ h . EventID ( ) ] = h
if h . StateKey ( ) != nil {
addedToState := false
for i := range respState . StateEvents {
se := respState . StateEvents [ i ]
if se . Type ( ) == h . Type ( ) && se . StateKeyEquals ( * h . StateKey ( ) ) {
respState . StateEvents [ i ] = h . Unwrap ( )
addedToState = true
break
}
}
if ! addedToState {
respState . StateEvents = append ( respState . StateEvents , h . Unwrap ( ) )
}
}
return respState , nil
}
2020-09-07 11:32:40 +00:00
func ( t * txnReq ) lookupStateAfterEventLocally ( ctx context . Context , roomID , eventID string , needed [ ] gomatrixserverlib . StateKeyTuple ) * gomatrixserverlib . RespState {
2020-05-12 15:24:28 +00:00
var res api . QueryStateAfterEventsResponse
2020-09-07 11:32:40 +00:00
err := t . rsAPI . QueryStateAfterEvents ( ctx , & api . QueryStateAfterEventsRequest {
2020-05-12 15:24:28 +00:00
RoomID : roomID ,
PrevEventIDs : [ ] string { eventID } ,
StateToFetch : needed ,
} , & res )
if err != nil || ! res . PrevEventsExist {
2020-09-07 11:32:40 +00:00
util . GetLogger ( ctx ) . WithError ( err ) . Warnf ( "failed to query state after %s locally" , eventID )
2020-05-12 15:24:28 +00:00
return nil
}
for i , ev := range res . StateEvents {
t . haveEvents [ ev . EventID ( ) ] = & res . StateEvents [ i ]
}
var authEvents [ ] gomatrixserverlib . Event
missingAuthEvents := make ( map [ string ] bool )
for _ , ev := range res . StateEvents {
for _ , ae := range ev . AuthEventIDs ( ) {
aev , ok := t . haveEvents [ ae ]
if ok {
authEvents = append ( authEvents , aev . Unwrap ( ) )
} else {
missingAuthEvents [ ae ] = true
}
}
}
// QueryStateAfterEvents does not return the auth events, so fetch them now. We know the roomserver has them else it wouldn't
// have stored the event.
var missingEventList [ ] string
for evID := range missingAuthEvents {
missingEventList = append ( missingEventList , evID )
}
queryReq := api . QueryEventsByIDRequest {
EventIDs : missingEventList ,
}
2020-09-07 11:32:40 +00:00
util . GetLogger ( ctx ) . Infof ( "Fetching missing auth events: %v" , missingEventList )
2020-05-12 15:24:28 +00:00
var queryRes api . QueryEventsByIDResponse
2020-09-07 11:32:40 +00:00
if err = t . rsAPI . QueryEventsByID ( ctx , & queryReq , & queryRes ) ; err != nil {
2020-05-12 15:24:28 +00:00
return nil
}
for i := range queryRes . Events {
evID := queryRes . Events [ i ] . EventID ( )
t . haveEvents [ evID ] = & queryRes . Events [ i ]
authEvents = append ( authEvents , queryRes . Events [ i ] . Unwrap ( ) )
}
evs := gomatrixserverlib . UnwrapEventHeaders ( res . StateEvents )
return & gomatrixserverlib . RespState {
StateEvents : evs ,
AuthEvents : authEvents ,
}
}
// lookuptStateBeforeEvent returns the room state before the event e, which is just /state_ids and/or /state depending on what
// the server supports.
2020-09-07 11:32:40 +00:00
func ( t * txnReq ) lookupStateBeforeEvent ( ctx context . Context , roomVersion gomatrixserverlib . RoomVersion , roomID , eventID string ) (
2020-09-28 09:03:18 +00:00
* gomatrixserverlib . RespState , error ) {
2020-05-12 15:24:28 +00:00
2020-09-07 11:32:40 +00:00
util . GetLogger ( ctx ) . Infof ( "lookupStateBeforeEvent %s" , eventID )
2020-05-12 15:24:28 +00:00
// Attempt to fetch the missing state using /state_ids and /events
2020-09-28 09:03:18 +00:00
return t . lookupMissingStateViaStateIDs ( ctx , roomID , eventID , roomVersion )
2020-05-12 15:24:28 +00:00
}
2020-05-05 14:48:37 +00:00
2020-09-07 11:32:40 +00:00
func ( t * txnReq ) resolveStatesAndCheck ( ctx context . Context , roomVersion gomatrixserverlib . RoomVersion , states [ ] * gomatrixserverlib . RespState , backwardsExtremity * gomatrixserverlib . Event ) ( * gomatrixserverlib . RespState , error ) {
2020-05-12 15:24:28 +00:00
var authEventList [ ] gomatrixserverlib . Event
var stateEventList [ ] gomatrixserverlib . Event
for _ , state := range states {
authEventList = append ( authEventList , state . AuthEvents ... )
stateEventList = append ( stateEventList , state . StateEvents ... )
}
resolvedStateEvents , err := gomatrixserverlib . ResolveConflicts ( roomVersion , stateEventList , authEventList )
if err != nil {
return nil , err
}
// apply the current event
2020-03-06 16:58:10 +00:00
retryAllowedState :
2020-05-12 15:24:28 +00:00
if err = checkAllowedByState ( * backwardsExtremity , resolvedStateEvents ) ; err != nil {
2020-03-06 16:58:10 +00:00
switch missing := err . ( type ) {
case gomatrixserverlib . MissingAuthEventError :
2020-09-07 11:32:40 +00:00
h , err2 := t . lookupEvent ( ctx , roomVersion , missing . AuthEventID , true )
2020-09-08 09:28:13 +00:00
switch err2 . ( type ) {
case verifySigError :
return & gomatrixserverlib . RespState {
AuthEvents : authEventList ,
StateEvents : resolvedStateEvents ,
} , nil
case nil :
// do nothing
default :
2020-05-12 15:24:28 +00:00
return nil , fmt . Errorf ( "missing auth event %s and failed to look it up: %w" , missing . AuthEventID , err2 )
2020-03-06 16:58:10 +00:00
}
2020-09-07 11:32:40 +00:00
util . GetLogger ( ctx ) . Infof ( "fetched event %s" , missing . AuthEventID )
2020-05-12 15:24:28 +00:00
resolvedStateEvents = append ( resolvedStateEvents , h . Unwrap ( ) )
goto retryAllowedState
2020-03-06 16:58:10 +00:00
default :
}
2020-05-12 15:24:28 +00:00
return nil , err
2017-06-07 13:32:53 +00:00
}
2020-05-12 15:24:28 +00:00
return & gomatrixserverlib . RespState {
AuthEvents : authEventList ,
StateEvents : resolvedStateEvents ,
} , nil
}
2020-03-27 16:28:22 +00:00
2020-05-12 15:24:28 +00:00
// getMissingEvents returns a nil backwardsExtremity if missing events were fetched and handled, else returns the new backwards extremity which we should
// begin from. Returns an error only if we should terminate the transaction which initiated /get_missing_events
// This function recursively calls txnReq.processEvent with the missing events, which will be processed before this function returns.
2020-09-29 13:07:59 +00:00
// This means that we may recursively call this function, as we spider back up prev_events.
2020-09-29 12:40:29 +00:00
// nolint:gocyclo
2020-09-07 11:32:40 +00:00
func ( t * txnReq ) getMissingEvents ( ctx context . Context , e gomatrixserverlib . Event , roomVersion gomatrixserverlib . RoomVersion , isInboundTxn bool ) ( backwardsExtremity * gomatrixserverlib . Event , err error ) {
2020-05-12 15:24:28 +00:00
if ! isInboundTxn {
// we've recursed here, so just take a state snapshot please!
return & e , nil
}
2020-09-07 11:32:40 +00:00
logger := util . GetLogger ( ctx ) . WithField ( "event_id" , e . EventID ( ) ) . WithField ( "room_id" , e . RoomID ( ) )
2020-05-12 15:24:28 +00:00
needed := gomatrixserverlib . StateNeededForAuth ( [ ] gomatrixserverlib . Event { e } )
// query latest events (our trusted forward extremities)
req := api . QueryLatestEventsAndStateRequest {
RoomID : e . RoomID ( ) ,
StateToFetch : needed . Tuples ( ) ,
}
var res api . QueryLatestEventsAndStateResponse
2020-09-07 11:32:40 +00:00
if err = t . rsAPI . QueryLatestEventsAndState ( ctx , & req , & res ) ; err != nil {
2020-05-12 15:24:28 +00:00
logger . WithError ( err ) . Warn ( "Failed to query latest events" )
return & e , nil
}
latestEvents := make ( [ ] string , len ( res . LatestEvents ) )
for i := range res . LatestEvents {
latestEvents [ i ] = res . LatestEvents [ i ] . EventID
}
2020-09-29 12:40:29 +00:00
servers := [ ] gomatrixserverlib . ServerName { t . Origin }
serverReq := & api . QueryServerJoinedToRoomRequest {
RoomID : e . RoomID ( ) ,
}
serverRes := & api . QueryServerJoinedToRoomResponse { }
if err = t . rsAPI . QueryServerJoinedToRoom ( ctx , serverReq , serverRes ) ; err == nil {
servers = append ( servers , serverRes . ServerNames ... )
logger . Infof ( "Found %d server(s) to query for missing events" , len ( servers ) )
}
var missingResp * gomatrixserverlib . RespMissingEvents
for _ , server := range servers {
var m gomatrixserverlib . RespMissingEvents
if m , err = t . federation . LookupMissingEvents ( ctx , server , e . RoomID ( ) , gomatrixserverlib . MissingEvents {
Limit : 20 ,
// The latest event IDs that the sender already has. These are skipped when retrieving the previous events of latest_events.
EarliestEvents : latestEvents ,
// The event IDs to retrieve the previous events for.
LatestEvents : [ ] string { e . EventID ( ) } ,
} , roomVersion ) ; err == nil {
missingResp = & m
break
} else {
logger . WithError ( err ) . Errorf ( "%s pushed us an event but %q did not respond to /get_missing_events" , t . Origin , server )
}
}
if missingResp == nil {
logger . WithError ( err ) . Errorf (
"%s pushed us an event but %d server(s) couldn't give us details about prev_events via /get_missing_events - dropping this event until it can" ,
t . Origin , len ( servers ) ,
)
return nil , missingPrevEventsError {
eventID : e . EventID ( ) ,
err : err ,
}
}
2020-05-12 15:24:28 +00:00
// security: how we handle failures depends on whether or not this event will become the new forward extremity for the room.
// There's 2 scenarios to consider:
// - Case A: We got pushed an event and are now fetching missing prev_events. (isInboundTxn=true)
// - Case B: We are fetching missing prev_events already and now fetching some more (isInboundTxn=false)
// In Case B, we know for sure that the event we are currently processing will not become the new forward extremity for the room,
// as it was called in response to an inbound txn which had it as a prev_event.
// In Case A, the event is a forward extremity, and could eventually become the _only_ forward extremity in the room. This is bad
// because it means we would trust the state at that event to be the state for the entire room, and allows rooms to be hijacked.
// https://github.com/matrix-org/synapse/pull/3456
// https://github.com/matrix-org/synapse/blob/229eb81498b0fe1da81e9b5b333a0285acde9446/synapse/handlers/federation.py#L335
// For now, we do not allow Case B, so reject the event.
logger . Infof ( "get_missing_events returned %d events" , len ( missingResp . Events ) )
// topologically sort and sanity check that we are making forward progress
newEvents := gomatrixserverlib . ReverseTopologicalOrdering ( missingResp . Events , gomatrixserverlib . TopologicalOrderByPrevEvents )
shouldHaveSomeEventIDs := e . PrevEventIDs ( )
hasPrevEvent := false
Event :
for _ , pe := range shouldHaveSomeEventIDs {
for _ , ev := range newEvents {
if ev . EventID ( ) == pe {
hasPrevEvent = true
break Event
}
}
}
if ! hasPrevEvent {
err = fmt . Errorf ( "called /get_missing_events but server %s didn't return any prev_events with IDs %v" , t . Origin , shouldHaveSomeEventIDs )
logger . WithError ( err ) . Errorf (
"%s pushed us an event but couldn't give us details about prev_events via /get_missing_events - dropping this event until it can" ,
t . Origin ,
)
return nil , missingPrevEventsError {
eventID : e . EventID ( ) ,
err : err ,
}
}
// process the missing events then the event which started this whole thing
for _ , ev := range append ( newEvents , e ) {
2020-09-07 11:32:40 +00:00
err := t . processEvent ( ctx , ev , false )
2020-05-12 15:24:28 +00:00
if err != nil {
return nil , err
}
}
// we processed everything!
return nil , nil
2020-05-05 14:48:37 +00:00
}
2020-09-28 10:32:59 +00:00
func ( t * txnReq ) lookupMissingStateViaState ( ctx context . Context , roomID , eventID string , roomVersion gomatrixserverlib . RoomVersion ) (
respState * gomatrixserverlib . RespState , err error ) {
state , err := t . federation . LookupState ( ctx , t . Origin , roomID , eventID , roomVersion )
if err != nil {
return nil , err
}
// Check that the returned state is valid.
if err := state . Check ( ctx , t . keys , nil ) ; err != nil {
return nil , err
}
return & state , nil
}
// nolint:gocyclo
2020-09-07 11:32:40 +00:00
func ( t * txnReq ) lookupMissingStateViaStateIDs ( ctx context . Context , roomID , eventID string , roomVersion gomatrixserverlib . RoomVersion ) (
2020-05-12 15:24:28 +00:00
* gomatrixserverlib . RespState , error ) {
2020-09-07 11:32:40 +00:00
util . GetLogger ( ctx ) . Infof ( "lookupMissingStateViaStateIDs %s" , eventID )
2020-05-05 14:48:37 +00:00
// fetch the state event IDs at the time of the event
2020-09-07 11:32:40 +00:00
stateIDs , err := t . federation . LookupStateIDs ( ctx , t . Origin , roomID , eventID )
2020-05-05 14:48:37 +00:00
if err != nil {
2020-05-12 15:24:28 +00:00
return nil , err
2020-05-05 15:46:22 +00:00
}
2020-05-05 14:48:37 +00:00
// work out which auth/state IDs are missing
wantIDs := append ( stateIDs . StateEventIDs , stateIDs . AuthEventIDs ... )
missing := make ( map [ string ] bool )
2020-05-12 15:24:28 +00:00
var missingEventList [ ] string
2020-05-05 14:48:37 +00:00
for _ , sid := range wantIDs {
2020-05-12 15:24:28 +00:00
if _ , ok := t . haveEvents [ sid ] ; ! ok {
if ! missing [ sid ] {
missing [ sid ] = true
missingEventList = append ( missingEventList , sid )
}
}
}
// fetch as many as we can from the roomserver
queryReq := api . QueryEventsByIDRequest {
EventIDs : missingEventList ,
}
var queryRes api . QueryEventsByIDResponse
2020-09-07 11:32:40 +00:00
if err = t . rsAPI . QueryEventsByID ( ctx , & queryReq , & queryRes ) ; err != nil {
2020-05-12 15:24:28 +00:00
return nil , err
}
for i := range queryRes . Events {
evID := queryRes . Events [ i ] . EventID ( )
t . haveEvents [ evID ] = & queryRes . Events [ i ]
if missing [ evID ] {
delete ( missing , evID )
2020-05-05 14:48:37 +00:00
}
}
2020-05-12 15:24:28 +00:00
2020-09-28 10:32:59 +00:00
concurrentRequests := 8
missingCount := len ( missing )
// If over 50% of the auth/state events from /state_ids are missing
// then we'll just call /state instead, otherwise we'll just end up
// hammering the remote side with /event requests unnecessarily.
if missingCount > concurrentRequests && missingCount > len ( wantIDs ) / 2 {
util . GetLogger ( ctx ) . WithFields ( logrus . Fields {
"missing" : missingCount ,
"event_id" : eventID ,
"room_id" : roomID ,
"total_state" : len ( stateIDs . StateEventIDs ) ,
"total_auth_events" : len ( stateIDs . AuthEventIDs ) ,
} ) . Info ( "Fetching all state at event" )
return t . lookupMissingStateViaState ( ctx , roomID , eventID , roomVersion )
}
2020-09-07 11:32:40 +00:00
util . GetLogger ( ctx ) . WithFields ( logrus . Fields {
2020-09-28 10:32:59 +00:00
"missing" : missingCount ,
"event_id" : eventID ,
"room_id" : roomID ,
"total_state" : len ( stateIDs . StateEventIDs ) ,
"total_auth_events" : len ( stateIDs . AuthEventIDs ) ,
"concurrent_requests" : concurrentRequests ,
2020-05-05 14:48:37 +00:00
} ) . Info ( "Fetching missing state at event" )
2020-09-28 10:32:59 +00:00
// Create a queue containing all of the missing event IDs that we want
// to retrieve.
pending := make ( chan string , missingCount )
2020-05-05 14:48:37 +00:00
for missingEventID := range missing {
2020-09-28 10:32:59 +00:00
pending <- missingEventID
}
close ( pending )
// Define how many workers we should start to do this.
if missingCount < concurrentRequests {
concurrentRequests = missingCount
}
// Create the wait group.
var fetchgroup sync . WaitGroup
fetchgroup . Add ( concurrentRequests )
// This is the only place where we'll write to t.haveEvents from
// multiple goroutines, and everywhere else is blocked on this
// synchronous function anyway.
var haveEventsMutex sync . Mutex
// Define what we'll do in order to fetch the missing event ID.
fetch := func ( missingEventID string ) {
2020-05-12 15:24:28 +00:00
var h * gomatrixserverlib . HeaderedEvent
2020-09-07 11:32:40 +00:00
h , err = t . lookupEvent ( ctx , roomVersion , missingEventID , false )
2020-09-08 09:28:13 +00:00
switch err . ( type ) {
case verifySigError :
2020-09-28 10:32:59 +00:00
break
2020-09-08 09:28:13 +00:00
case nil :
2020-09-28 10:32:59 +00:00
break
2020-09-08 09:28:13 +00:00
default :
2020-09-28 10:32:59 +00:00
util . GetLogger ( ctx ) . WithFields ( logrus . Fields {
"event_id" : missingEventID ,
"room_id" : roomID ,
} ) . Info ( "Failed to fetch missing event" )
return
2020-05-05 14:48:37 +00:00
}
2020-09-28 10:32:59 +00:00
haveEventsMutex . Lock ( )
2020-05-12 15:24:28 +00:00
t . haveEvents [ h . EventID ( ) ] = h
2020-09-28 10:32:59 +00:00
haveEventsMutex . Unlock ( )
2020-05-05 14:48:37 +00:00
}
2020-09-28 10:32:59 +00:00
// Create the worker.
worker := func ( ch <- chan string ) {
defer fetchgroup . Done ( )
for missingEventID := range ch {
fetch ( missingEventID )
}
}
// Start the workers.
for i := 0 ; i < concurrentRequests ; i ++ {
go worker ( pending )
}
// Wait for the workers to finish.
fetchgroup . Wait ( )
2020-05-12 15:24:28 +00:00
resp , err := t . createRespStateFromStateIDs ( stateIDs )
return resp , err
2020-05-05 14:48:37 +00:00
}
2020-05-12 15:24:28 +00:00
func ( t * txnReq ) createRespStateFromStateIDs ( stateIDs gomatrixserverlib . RespStateIDs ) (
2020-05-05 14:48:37 +00:00
* gomatrixserverlib . RespState , error ) {
// create a RespState response using the response to /state_ids as a guide
respState := gomatrixserverlib . RespState {
AuthEvents : make ( [ ] gomatrixserverlib . Event , len ( stateIDs . AuthEventIDs ) ) ,
StateEvents : make ( [ ] gomatrixserverlib . Event , len ( stateIDs . StateEventIDs ) ) ,
}
for i := range stateIDs . StateEventIDs {
2020-05-12 15:24:28 +00:00
ev , ok := t . haveEvents [ stateIDs . StateEventIDs [ i ] ]
2020-05-05 14:48:37 +00:00
if ! ok {
return nil , fmt . Errorf ( "missing state event %s" , stateIDs . StateEventIDs [ i ] )
}
respState . StateEvents [ i ] = ev . Unwrap ( )
}
for i := range stateIDs . AuthEventIDs {
2020-05-12 15:24:28 +00:00
ev , ok := t . haveEvents [ stateIDs . AuthEventIDs [ i ] ]
2020-05-05 14:48:37 +00:00
if ! ok {
return nil , fmt . Errorf ( "missing auth event %s" , stateIDs . AuthEventIDs [ i ] )
}
respState . AuthEvents [ i ] = ev . Unwrap ( )
}
2020-05-12 15:24:28 +00:00
// We purposefully do not do auth checks on the returned events, as they will still
// be processed in the exact same way, just as a 'rejected' event
// TODO: Add a field to HeaderedEvent to indicate if the event is rejected.
return & respState , nil
}
2020-09-07 11:32:40 +00:00
func ( t * txnReq ) lookupEvent ( ctx context . Context , roomVersion gomatrixserverlib . RoomVersion , missingEventID string , localFirst bool ) ( * gomatrixserverlib . HeaderedEvent , error ) {
2020-05-12 15:24:28 +00:00
if localFirst {
// fetch from the roomserver
queryReq := api . QueryEventsByIDRequest {
EventIDs : [ ] string { missingEventID } ,
}
var queryRes api . QueryEventsByIDResponse
2020-09-07 11:32:40 +00:00
if err := t . rsAPI . QueryEventsByID ( ctx , & queryReq , & queryRes ) ; err != nil {
util . GetLogger ( ctx ) . Warnf ( "Failed to query roomserver for missing event %s: %s - falling back to remote" , missingEventID , err )
2020-05-12 15:24:28 +00:00
} else if len ( queryRes . Events ) == 1 {
return & queryRes . Events [ 0 ] , nil
}
}
2020-09-07 11:32:40 +00:00
txn , err := t . federation . GetEvent ( ctx , t . Origin , missingEventID )
2020-05-12 15:24:28 +00:00
if err != nil || len ( txn . PDUs ) == 0 {
2020-09-07 11:32:40 +00:00
util . GetLogger ( ctx ) . WithError ( err ) . WithField ( "event_id" , missingEventID ) . Warn ( "failed to get missing /event for event ID" )
2020-05-05 14:48:37 +00:00
return nil , err
}
2020-05-12 15:24:28 +00:00
pdu := txn . PDUs [ 0 ]
var event gomatrixserverlib . Event
event , err = gomatrixserverlib . NewEventFromUntrustedJSON ( pdu , roomVersion )
if err != nil {
2020-09-07 11:32:40 +00:00
util . GetLogger ( ctx ) . WithError ( err ) . Warnf ( "Transaction: Failed to parse event JSON of event %q" , event . EventID ( ) )
2020-05-12 15:24:28 +00:00
return nil , unmarshalError { err }
}
2020-09-07 11:32:40 +00:00
if err = gomatrixserverlib . VerifyAllEventSignatures ( ctx , [ ] gomatrixserverlib . Event { event } , t . keys ) ; err != nil {
util . GetLogger ( ctx ) . WithError ( err ) . Warnf ( "Transaction: Couldn't validate signature of event %q" , event . EventID ( ) )
2020-05-12 15:24:28 +00:00
return nil , verifySigError { event . EventID ( ) , err }
}
h := event . Headered ( roomVersion )
t . newEvents [ h . EventID ( ) ] = true
return & h , nil
2017-06-07 13:32:53 +00:00
}