Don't start more goroutines than needed on RS input, increase input worker buffer size (#1638)
parent
d7824ed5b1
commit
f5869daaab
|
@ -54,10 +54,8 @@ type inputWorker struct {
|
||||||
input chan *inputTask
|
input chan *inputTask
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Guarded by a CAS on w.running
|
||||||
func (w *inputWorker) start() {
|
func (w *inputWorker) start() {
|
||||||
if !w.running.CAS(false, true) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
defer w.running.Store(false)
|
defer w.running.Store(false)
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
|
@ -142,7 +140,7 @@ func (r *Inputer) InputRoomEvents(
|
||||||
// room - the channel will be quite small as it's just pointer types.
|
// room - the channel will be quite small as it's just pointer types.
|
||||||
w, _ := r.workers.LoadOrStore(roomID, &inputWorker{
|
w, _ := r.workers.LoadOrStore(roomID, &inputWorker{
|
||||||
r: r,
|
r: r,
|
||||||
input: make(chan *inputTask, 10),
|
input: make(chan *inputTask, 32),
|
||||||
})
|
})
|
||||||
worker := w.(*inputWorker)
|
worker := w.(*inputWorker)
|
||||||
|
|
||||||
|
@ -156,7 +154,9 @@ func (r *Inputer) InputRoomEvents(
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send the task to the worker.
|
// Send the task to the worker.
|
||||||
|
if worker.running.CAS(false, true) {
|
||||||
go worker.start()
|
go worker.start()
|
||||||
|
}
|
||||||
worker.input <- tasks[i]
|
worker.input <- tasks[i]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue