Hold the event source when there are no listeners (#15725)
* Hold the event source when there are no listeners The event source does not need to run when there are no listeners. Therefore pause it when there are none. * add some more logging Signed-off-by: Andrew Thornton <art27@cantab.net>
This commit is contained in:
		
							parent
							
								
									f582ec4e53
								
							
						
					
					
						commit
						8e32eeb5de
					
				
					 2 changed files with 35 additions and 0 deletions
				
			
		|  | @ -13,6 +13,7 @@ type Manager struct { | ||||||
| 	mutex sync.Mutex | 	mutex sync.Mutex | ||||||
| 
 | 
 | ||||||
| 	messengers map[int64]*Messenger | 	messengers map[int64]*Messenger | ||||||
|  | 	connection chan struct{} | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| var manager *Manager | var manager *Manager | ||||||
|  | @ -20,6 +21,7 @@ var manager *Manager | ||||||
| func init() { | func init() { | ||||||
| 	manager = &Manager{ | 	manager = &Manager{ | ||||||
| 		messengers: make(map[int64]*Messenger), | 		messengers: make(map[int64]*Messenger), | ||||||
|  | 		connection: make(chan struct{}, 1), | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | @ -36,6 +38,10 @@ func (m *Manager) Register(uid int64) <-chan *Event { | ||||||
| 		messenger = NewMessenger(uid) | 		messenger = NewMessenger(uid) | ||||||
| 		m.messengers[uid] = messenger | 		m.messengers[uid] = messenger | ||||||
| 	} | 	} | ||||||
|  | 	select { | ||||||
|  | 	case m.connection <- struct{}{}: | ||||||
|  | 	default: | ||||||
|  | 	} | ||||||
| 	m.mutex.Unlock() | 	m.mutex.Unlock() | ||||||
| 	return messenger.Register() | 	return messenger.Register() | ||||||
| } | } | ||||||
|  |  | ||||||
|  | @ -34,6 +34,35 @@ loop: | ||||||
| 			timer.Stop() | 			timer.Stop() | ||||||
| 			break loop | 			break loop | ||||||
| 		case <-timer.C: | 		case <-timer.C: | ||||||
|  | 			m.mutex.Lock() | ||||||
|  | 			connectionCount := len(m.messengers) | ||||||
|  | 			if connectionCount == 0 { | ||||||
|  | 				log.Trace("Event source has no listeners") | ||||||
|  | 				// empty the connection channel
 | ||||||
|  | 				select { | ||||||
|  | 				case <-m.connection: | ||||||
|  | 				default: | ||||||
|  | 				} | ||||||
|  | 			} | ||||||
|  | 			m.mutex.Unlock() | ||||||
|  | 			if connectionCount == 0 { | ||||||
|  | 				// No listeners so the source can be paused
 | ||||||
|  | 				log.Trace("Pausing the eventsource") | ||||||
|  | 				select { | ||||||
|  | 				case <-ctx.Done(): | ||||||
|  | 					break loop | ||||||
|  | 				case <-m.connection: | ||||||
|  | 					log.Trace("Connection detected - restarting the eventsource") | ||||||
|  | 					// OK we're back so lets reset the timer and start again
 | ||||||
|  | 					// We won't change the "then" time because there could be concurrency issues
 | ||||||
|  | 					select { | ||||||
|  | 					case <-timer.C: | ||||||
|  | 					default: | ||||||
|  | 					} | ||||||
|  | 					continue | ||||||
|  | 				} | ||||||
|  | 			} | ||||||
|  | 
 | ||||||
| 			now := timeutil.TimeStampNow().Add(-2) | 			now := timeutil.TimeStampNow().Add(-2) | ||||||
| 
 | 
 | ||||||
| 			uidCounts, err := models.GetUIDsAndNotificationCounts(then, now) | 			uidCounts, err := models.GetUIDsAndNotificationCounts(then, now) | ||||||
|  |  | ||||||
		Loading…
	
		Reference in a new issue