Fix concurrent map read/write on haveEvents (#1893)

main
Neil Alexander 2021-06-30 12:32:20 +01:00 committed by GitHub
parent b7a2d369c0
commit 2647f6e9c5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 12 additions and 2 deletions

View File

@ -175,8 +175,9 @@ type txnReq struct {
hadEvents map[string]bool
// local cache of events for auth checks, etc - this may include events
// which the roomserver is unaware of.
haveEvents map[string]*gomatrixserverlib.HeaderedEvent
work string // metrics
haveEvents map[string]*gomatrixserverlib.HeaderedEvent
haveEventsMutex sync.Mutex
work string // metrics
}
// A subset of FederationClient functionality that txn requires. Useful for testing.
@ -817,6 +818,8 @@ func (t *txnReq) lookupStateAfterEvent(ctx context.Context, roomVersion gomatrix
}
func (t *txnReq) cacheAndReturn(ev *gomatrixserverlib.HeaderedEvent) *gomatrixserverlib.HeaderedEvent {
t.haveEventsMutex.Lock()
defer t.haveEventsMutex.Unlock()
if cached, exists := t.haveEvents[ev.EventID()]; exists {
return cached
}
@ -847,6 +850,7 @@ func (t *txnReq) lookupStateAfterEventLocally(ctx context.Context, roomID, event
var authEvents []*gomatrixserverlib.Event
missingAuthEvents := map[string]bool{}
for _, ev := range stateEvents {
t.haveEventsMutex.Lock()
for _, ae := range ev.AuthEventIDs() {
if aev, ok := t.haveEvents[ae]; ok {
authEvents = append(authEvents, aev.Unwrap())
@ -854,6 +858,7 @@ func (t *txnReq) lookupStateAfterEventLocally(ctx context.Context, roomID, event
missingAuthEvents[ae] = true
}
}
t.haveEventsMutex.Unlock()
}
// QueryStateAfterEvents does not return the auth events, so fetch them now. We know the roomserver has them else it wouldn't
// have stored the event.
@ -1064,6 +1069,7 @@ func (t *txnReq) lookupMissingStateViaStateIDs(ctx context.Context, roomID, even
wantIDs := append(stateIDs.StateEventIDs, stateIDs.AuthEventIDs...)
missing := make(map[string]bool)
var missingEventList []string
t.haveEventsMutex.Lock()
for _, sid := range wantIDs {
if _, ok := t.haveEvents[sid]; !ok {
if !missing[sid] {
@ -1072,6 +1078,7 @@ func (t *txnReq) lookupMissingStateViaStateIDs(ctx context.Context, roomID, even
}
}
}
t.haveEventsMutex.Unlock()
// fetch as many as we can from the roomserver
queryReq := api.QueryEventsByIDRequest{
@ -1185,6 +1192,9 @@ func (t *txnReq) lookupMissingStateViaStateIDs(ctx context.Context, roomID, even
func (t *txnReq) createRespStateFromStateIDs(stateIDs gomatrixserverlib.RespStateIDs) (
*gomatrixserverlib.RespState, error) { // nolint:unparam
t.haveEventsMutex.Lock()
defer t.haveEventsMutex.Unlock()
// create a RespState response using the response to /state_ids as a guide
respState := gomatrixserverlib.RespState{}