Skip to content

Commit 4569ac0

Browse files
committed
In sync mode output events after command acknowledge
1 parent ac9e703 commit 4569ac0

File tree

1 file changed

+18
-3
lines changed

1 file changed

+18
-3
lines changed

discovery_server.go

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ type DiscoveryServer struct {
9494
initialized bool
9595
started bool
9696
syncStarted bool
97+
syncChannel chan interface{}
9798
}
9899

99100
// NewDiscoveryServer creates a new discovery server backed by the
@@ -228,12 +229,22 @@ func (d *DiscoveryServer) startSync() {
228229
d.outputError("start_sync", "Discovery already STARTed, cannot START_SYNC")
229230
return
230231
}
232+
d.syncChannel = make(chan interface{}, 10) // buffer up to 10 events
231233
if err := d.impl.StartSync(d.syncEvent); err != nil {
232234
d.outputError("start_sync", "Cannot START_SYNC: "+err.Error())
235+
close(d.syncChannel) // do not leak channel...
236+
d.syncChannel = nil
233237
return
234238
}
235239
d.syncStarted = true
236240
d.outputOk("start_sync")
241+
go d.consumeEvents(d.syncChannel)
242+
}
243+
244+
func (d *DiscoveryServer) consumeEvents(c <-chan interface{}) {
245+
for e := range c {
246+
d.output(e)
247+
}
237248
}
238249

239250
func (d *DiscoveryServer) stop() {
@@ -246,7 +257,11 @@ func (d *DiscoveryServer) stop() {
246257
return
247258
}
248259
d.started = false
249-
d.syncStarted = false
260+
if d.syncStarted {
261+
close(d.syncChannel)
262+
d.syncChannel = nil
263+
d.syncStarted = false
264+
}
250265
d.outputOk("stop")
251266
}
252267

@@ -255,10 +270,10 @@ func (d *DiscoveryServer) syncEvent(event string, port *Port) {
255270
EventType string `json:"eventType"`
256271
Port *Port `json:"port"`
257272
}
258-
d.output(&syncOutputJSON{
273+
d.syncChannel <- &syncOutputJSON{
259274
EventType: event,
260275
Port: port,
261-
})
276+
}
262277
}
263278

264279
type genericMessageJSON struct {

0 commit comments

Comments
 (0)