@@ -198,24 +198,28 @@ func (dm *DiscoveryManager) feedEvent(ev *discovery.Event) {
198
198
dm .watchersMutex .Lock ()
199
199
defer dm .watchersMutex .Unlock ()
200
200
201
+ sendToAllWatchers := func (ev * discovery.Event ) {
202
+ // Send the event to all watchers
203
+ for watcher := range dm .watchers {
204
+ select {
205
+ case watcher .feed <- ev :
206
+ // OK
207
+ case <- time .After (time .Millisecond * 500 ):
208
+ // If the watcher is not able to process event fast enough
209
+ // remove the watcher from the list of watchers
210
+ logrus .Info ("Watcher is not able to process events fast enough, removing it from the list of watchers" )
211
+ delete (dm .watchers , watcher )
212
+ }
213
+ }
214
+ }
215
+
201
216
if ev .Type == "stop" {
202
217
// Remove all the cached events for the terminating discovery
203
218
delete (dm .watchersCache , ev .DiscoveryID )
204
219
return
205
220
}
206
221
207
- // Send the event to all watchers
208
- for watcher := range dm .watchers {
209
- select {
210
- case watcher .feed <- ev :
211
- // OK
212
- case <- time .After (time .Millisecond * 500 ):
213
- // If the watcher is not able to process event fast enough
214
- // remove the watcher from the list of watchers
215
- logrus .Info ("Watcher is not able to process events fast enough, removing it from the list of watchers" )
216
- delete (dm .watchers , watcher )
217
- }
218
- }
222
+ sendToAllWatchers (ev )
219
223
220
224
// Cache the event for the discovery
221
225
cache := dm .watchersCache [ev .DiscoveryID ]
0 commit comments