parent
597350a67f
commit
05324b6861
|
@ -84,7 +84,7 @@ func Send(
|
||||||
|
|
||||||
util.GetLogger(httpReq.Context()).Infof("Received transaction %q from %q containing %d PDUs, %d EDUs", txnID, request.Origin(), len(t.PDUs), len(t.EDUs))
|
util.GetLogger(httpReq.Context()).Infof("Received transaction %q from %q containing %d PDUs, %d EDUs", txnID, request.Origin(), len(t.PDUs), len(t.EDUs))
|
||||||
|
|
||||||
resp, jsonErr := t.processTransaction(httpReq.Context())
|
resp, jsonErr := t.processTransaction(context.Background())
|
||||||
if jsonErr != nil {
|
if jsonErr != nil {
|
||||||
util.GetLogger(httpReq.Context()).WithField("jsonErr", jsonErr).Error("t.processTransaction failed")
|
util.GetLogger(httpReq.Context()).WithField("jsonErr", jsonErr).Error("t.processTransaction failed")
|
||||||
return *jsonErr
|
return *jsonErr
|
||||||
|
@ -1005,79 +1005,82 @@ func (t *txnReq) lookupMissingStateViaStateIDs(ctx context.Context, roomID, even
|
||||||
return t.lookupMissingStateViaState(ctx, roomID, eventID, roomVersion)
|
return t.lookupMissingStateViaState(ctx, roomID, eventID, roomVersion)
|
||||||
}
|
}
|
||||||
|
|
||||||
util.GetLogger(ctx).WithFields(logrus.Fields{
|
if missingCount > 0 {
|
||||||
"missing": missingCount,
|
util.GetLogger(ctx).WithFields(logrus.Fields{
|
||||||
"event_id": eventID,
|
"missing": missingCount,
|
||||||
"room_id": roomID,
|
"event_id": eventID,
|
||||||
"total_state": len(stateIDs.StateEventIDs),
|
"room_id": roomID,
|
||||||
"total_auth_events": len(stateIDs.AuthEventIDs),
|
"total_state": len(stateIDs.StateEventIDs),
|
||||||
"concurrent_requests": concurrentRequests,
|
"total_auth_events": len(stateIDs.AuthEventIDs),
|
||||||
}).Info("Fetching missing state at event")
|
"concurrent_requests": concurrentRequests,
|
||||||
|
}).Info("Fetching missing state at event")
|
||||||
|
|
||||||
// Get a list of servers to fetch from.
|
// Get a list of servers to fetch from.
|
||||||
servers := t.getServers(ctx, roomID)
|
servers := t.getServers(ctx, roomID)
|
||||||
if len(servers) > 5 {
|
if len(servers) > 5 {
|
||||||
servers = servers[:5]
|
servers = servers[:5]
|
||||||
}
|
|
||||||
|
|
||||||
// Create a queue containing all of the missing event IDs that we want
|
|
||||||
// to retrieve.
|
|
||||||
pending := make(chan string, missingCount)
|
|
||||||
for missingEventID := range missing {
|
|
||||||
pending <- missingEventID
|
|
||||||
}
|
|
||||||
close(pending)
|
|
||||||
|
|
||||||
// Define how many workers we should start to do this.
|
|
||||||
if missingCount < concurrentRequests {
|
|
||||||
concurrentRequests = missingCount
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create the wait group.
|
|
||||||
var fetchgroup sync.WaitGroup
|
|
||||||
fetchgroup.Add(concurrentRequests)
|
|
||||||
|
|
||||||
// This is the only place where we'll write to t.haveEvents from
|
|
||||||
// multiple goroutines, and everywhere else is blocked on this
|
|
||||||
// synchronous function anyway.
|
|
||||||
var haveEventsMutex sync.Mutex
|
|
||||||
|
|
||||||
// Define what we'll do in order to fetch the missing event ID.
|
|
||||||
fetch := func(missingEventID string) {
|
|
||||||
var h *gomatrixserverlib.HeaderedEvent
|
|
||||||
h, err = t.lookupEvent(ctx, roomVersion, missingEventID, false, servers)
|
|
||||||
switch err.(type) {
|
|
||||||
case verifySigError:
|
|
||||||
return
|
|
||||||
case nil:
|
|
||||||
break
|
|
||||||
default:
|
|
||||||
util.GetLogger(ctx).WithFields(logrus.Fields{
|
|
||||||
"event_id": missingEventID,
|
|
||||||
"room_id": roomID,
|
|
||||||
}).Info("Failed to fetch missing event")
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
haveEventsMutex.Lock()
|
|
||||||
t.haveEvents[h.EventID()] = h
|
|
||||||
haveEventsMutex.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create the worker.
|
// Create a queue containing all of the missing event IDs that we want
|
||||||
worker := func(ch <-chan string) {
|
// to retrieve.
|
||||||
defer fetchgroup.Done()
|
pending := make(chan string, missingCount)
|
||||||
for missingEventID := range ch {
|
for missingEventID := range missing {
|
||||||
fetch(missingEventID)
|
pending <- missingEventID
|
||||||
}
|
}
|
||||||
|
close(pending)
|
||||||
|
|
||||||
|
// Define how many workers we should start to do this.
|
||||||
|
if missingCount < concurrentRequests {
|
||||||
|
concurrentRequests = missingCount
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create the wait group.
|
||||||
|
var fetchgroup sync.WaitGroup
|
||||||
|
fetchgroup.Add(concurrentRequests)
|
||||||
|
|
||||||
|
// This is the only place where we'll write to t.haveEvents from
|
||||||
|
// multiple goroutines, and everywhere else is blocked on this
|
||||||
|
// synchronous function anyway.
|
||||||
|
var haveEventsMutex sync.Mutex
|
||||||
|
|
||||||
|
// Define what we'll do in order to fetch the missing event ID.
|
||||||
|
fetch := func(missingEventID string) {
|
||||||
|
var h *gomatrixserverlib.HeaderedEvent
|
||||||
|
h, err = t.lookupEvent(ctx, roomVersion, missingEventID, false, servers)
|
||||||
|
switch err.(type) {
|
||||||
|
case verifySigError:
|
||||||
|
return
|
||||||
|
case nil:
|
||||||
|
break
|
||||||
|
default:
|
||||||
|
util.GetLogger(ctx).WithFields(logrus.Fields{
|
||||||
|
"event_id": missingEventID,
|
||||||
|
"room_id": roomID,
|
||||||
|
}).Info("Failed to fetch missing event")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
haveEventsMutex.Lock()
|
||||||
|
t.haveEvents[h.EventID()] = h
|
||||||
|
haveEventsMutex.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create the worker.
|
||||||
|
worker := func(ch <-chan string) {
|
||||||
|
defer fetchgroup.Done()
|
||||||
|
for missingEventID := range ch {
|
||||||
|
fetch(missingEventID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start the workers.
|
||||||
|
for i := 0; i < concurrentRequests; i++ {
|
||||||
|
go worker(pending)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait for the workers to finish.
|
||||||
|
fetchgroup.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start the workers.
|
|
||||||
for i := 0; i < concurrentRequests; i++ {
|
|
||||||
go worker(pending)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Wait for the workers to finish.
|
|
||||||
fetchgroup.Wait()
|
|
||||||
resp, err := t.createRespStateFromStateIDs(stateIDs)
|
resp, err := t.createRespStateFromStateIDs(stateIDs)
|
||||||
return resp, err
|
return resp, err
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue