Skip to content

Commit 200b3e5

Browse files
committed
fluent: remove stopRunning channel.
The channel and its usage is redundant since we already wire Fluent object with a cancellable context. Removed stopRunning to use the context instead. Instead of waiting for stopRunning in run(), we can just wait for context to be done as well. Signed-off-by: Anirudh Aithal <aithal@amazon.com>
1 parent 5538e90 commit 200b3e5

File tree

1 file changed

+21
-26
lines changed

1 file changed

+21
-26
lines changed

fluent/fluent.go

Lines changed: 21 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -104,8 +104,6 @@ type Fluent struct {
104104
Config
105105

106106
dialer dialer
107-
// stopRunning is used in async mode to signal to run() it should abort.
108-
stopRunning chan struct{}
109107
// cancelDialings is used by Close() to stop any in-progress dialing.
110108
cancelDialings context.CancelFunc
111109
pending chan *msgToSend
@@ -176,7 +174,6 @@ func newWithDialer(config Config, d dialer) (f *Fluent, err error) {
176174
f = &Fluent{
177175
Config: config,
178176
dialer: d,
179-
stopRunning: make(chan struct{}),
180177
cancelDialings: cancel,
181178
pending: make(chan *msgToSend, config.BufferLimit),
182179
pendingMutex: sync.RWMutex{},
@@ -200,27 +197,26 @@ func newWithDialer(config Config, d dialer) (f *Fluent, err error) {
200197
//
201198
// Examples:
202199
//
203-
// // send map[string]
204-
// mapStringData := map[string]string{
205-
// "foo": "bar",
206-
// }
207-
// f.Post("tag_name", mapStringData)
200+
// // send map[string]
201+
// mapStringData := map[string]string{
202+
// "foo": "bar",
203+
// }
204+
// f.Post("tag_name", mapStringData)
208205
//
209-
// // send message with specified time
210-
// mapStringData := map[string]string{
211-
// "foo": "bar",
212-
// }
213-
// tm := time.Now()
214-
// f.PostWithTime("tag_name", tm, mapStringData)
215-
//
216-
// // send struct
217-
// structData := struct {
218-
// Name string `msg:"name"`
219-
// } {
220-
// "john smith",
221-
// }
222-
// f.Post("tag_name", structData)
206+
// // send message with specified time
207+
// mapStringData := map[string]string{
208+
// "foo": "bar",
209+
// }
210+
// tm := time.Now()
211+
// f.PostWithTime("tag_name", tm, mapStringData)
223212
//
213+
// // send struct
214+
// structData := struct {
215+
// Name string `msg:"name"`
216+
// } {
217+
// "john smith",
218+
// }
219+
// f.Post("tag_name", structData)
224220
func (f *Fluent) Post(tag string, message interface{}) error {
225221
timeNow := time.Now()
226222
return f.PostWithTime(tag, timeNow, message)
@@ -380,7 +376,6 @@ func (f *Fluent) Close() (err error) {
380376
f.pendingMutex.Unlock()
381377

382378
if f.Config.ForceStopAsyncSend {
383-
close(f.stopRunning)
384379
f.cancelDialings()
385380
}
386381

@@ -513,7 +508,7 @@ func (f *Fluent) run(ctx context.Context) {
513508
for {
514509
select {
515510
case entry, ok := <-f.pending:
516-
// f.stopRunning is closed before f.pending only when ForceStopAsyncSend
511+
// The context is cancelled before f.pending only when ForceStopAsyncSend
517512
// is enabled. Otherwise, f.pending is closed when Close() is called.
518513
if !ok {
519514
f.wg.Done()
@@ -540,9 +535,9 @@ func (f *Fluent) run(ctx context.Context) {
540535
}
541536
f.AsyncResultCallback(data, err)
542537
}
543-
case <-f.stopRunning:
538+
case <-ctx.Done():
539+
// Context was canceled, which means ForceStopAsyncSend was enabled
544540
fmt.Fprintf(os.Stderr, "[%s] Discarding queued events...\n", time.Now().Format(time.RFC3339))
545-
546541
f.wg.Done()
547542
return
548543
}

0 commit comments

Comments
 (0)