Skip to content

Commit 10e7785

Browse files
committed
fluent: make connection closed variable thread-safe
Changed the "closed" variable to be an atomic int. There is a race condition b/w appendBuffer and Close() method wrt how this variable is accessed. This could be an atomic.Bool, but the CI/CD config uses golang version 1.17. Since atomic.Bool is supported 1.19 onwards, using that would fail tests and build. Data race trace without this fix (using `go test -race`): ================== WARNING: DATA RACE Write at 0x00c0003324e0 by goroutine 95: github.com/fluent/fluent-logger-golang/fluent.(*Fluent).close() /Users/aithal/workspace/src/github.com/aaithal/fluent-logger-golang/fluent/fluent.go:426 +0x13c github.com/fluent/fluent-logger-golang/fluent.(*Fluent).Close() /Users/aithal/workspace/src/github.com/aaithal/fluent-logger-golang/fluent/fluent.go:393 +0x108 github.com/fluent/fluent-logger-golang/fluent.TestCloseWhileWaitingForAckResponse.func1() /Users/aithal/workspace/src/github.com/aaithal/fluent-logger-golang/fluent/fluent_test.go:737 +0x2c github.com/fluent/fluent-logger-golang/fluent.timeout.func1() /Users/aithal/workspace/src/github.com/aaithal/fluent-logger-golang/fluent/fluent_test.go:557 +0x38 Previous read at 0x00c0003324e0 by goroutine 94: github.com/fluent/fluent-logger-golang/fluent.(*Fluent).write() /Users/aithal/workspace/src/github.com/aaithal/fluent-logger-golang/fluent/fluent.go:619 +0x1ac github.com/fluent/fluent-logger-golang/fluent.(*Fluent).writeWithRetry() /Users/aithal/workspace/src/github.com/aaithal/fluent-logger-golang/fluent/fluent.go:553 +0x78 github.com/fluent/fluent-logger-golang/fluent.(*Fluent).run() /Users/aithal/workspace/src/github.com/aaithal/fluent-logger-golang/fluent/fluent.go:527 +0x1d0 github.com/fluent/fluent-logger-golang/fluent.newWithDialer.gowrap1() /Users/aithal/workspace/src/github.com/aaithal/fluent-logger-golang/fluent/fluent.go:184 +0x4c Goroutine 95 (running) created at: github.com/fluent/fluent-logger-golang/fluent.timeout() /Users/aithal/workspace/src/github.com/aaithal/fluent-logger-golang/fluent/fluent_test.go:556 +0xe8 github.com/fluent/fluent-logger-golang/fluent.TestCloseWhileWaitingForAckResponse() /Users/aithal/workspace/src/github.com/aaithal/fluent-logger-golang/fluent/fluent_test.go:736 +0x444 testing.tRunner() /opt/homebrew/Cellar/go/1.24.2/libexec/src/testing/testing.go:1792 +0x180 testing.(*T).Run.gowrap1() /opt/homebrew/Cellar/go/1.24.2/libexec/src/testing/testing.go:1851 +0x40 Goroutine 94 (running) created at: github.com/fluent/fluent-logger-golang/fluent.newWithDialer() /Users/aithal/workspace/src/github.com/aaithal/fluent-logger-golang/fluent/fluent.go:184 +0x368 github.com/fluent/fluent-logger-golang/fluent.TestCloseWhileWaitingForAckResponse() /Users/aithal/workspace/src/github.com/aaithal/fluent-logger-golang/fluent/fluent_test.go:722 +0xd8 testing.tRunner() /opt/homebrew/Cellar/go/1.24.2/libexec/src/testing/testing.go:1792 +0x180 testing.(*T).Run.gowrap1() /opt/homebrew/Cellar/go/1.24.2/libexec/src/testing/testing.go:1851 +0x40 ================== [2025-05-06T08:42:08-07:00] Discarding queued events... testing.go:1490: race detected during execution of test Signed-off-by: Anirudh Aithal <aithal@amazon.com>
1 parent f1b233b commit 10e7785

File tree

1 file changed

+37
-17
lines changed

1 file changed

+37
-17
lines changed

fluent/fluent.go

Lines changed: 37 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"reflect"
1414
"strconv"
1515
"sync"
16+
"sync/atomic"
1617
"time"
1718

1819
"bytes"
@@ -107,15 +108,19 @@ type Fluent struct {
107108
// cancelDialings is used by Close() to stop any in-progress dialing.
108109
cancelDialings context.CancelFunc
109110
pending chan *msgToSend
110-
pendingMutex sync.RWMutex
111-
closed bool
112-
wg sync.WaitGroup
111+
// closed indicates if the connection is open or closed.
112+
// 0 = open (false), 1 = closed (true). Since the code is built in CI with
113+
// golang < 1.19, we're using atomic int32 here. Otherwise, atomic.Bool
114+
// could have been used.
115+
closed int32
116+
wg sync.WaitGroup
113117

114118
// time at which the most recent connection to fluentd-address was established.
115119
latestReconnectTime time.Time
116120

117-
muconn sync.RWMutex
118-
conn net.Conn
121+
muconn sync.RWMutex
122+
pendingMutex sync.RWMutex
123+
conn net.Conn
119124
}
120125

121126
type dialer interface {
@@ -176,17 +181,18 @@ func newWithDialer(config Config, d dialer) (f *Fluent, err error) {
176181
dialer: d,
177182
cancelDialings: cancel,
178183
pending: make(chan *msgToSend, config.BufferLimit),
179-
pendingMutex: sync.RWMutex{},
180184
muconn: sync.RWMutex{},
185+
pendingMutex: sync.RWMutex{},
181186
}
182187

183188
f.wg.Add(1)
184189
go f.run(ctx)
185190
} else {
186191
f = &Fluent{
187-
Config: config,
188-
dialer: d,
189-
muconn: sync.RWMutex{},
192+
Config: config,
193+
dialer: d,
194+
muconn: sync.RWMutex{},
195+
pendingMutex: sync.RWMutex{},
190196
}
191197
err = f.connect(context.Background())
192198
}
@@ -290,7 +296,7 @@ func (f *Fluent) postRawData(msg *msgToSend) error {
290296
}
291297

292298
// Synchronous write
293-
if f.closed {
299+
if atomic.LoadInt32(&f.closed) == 1 {
294300
return fmt.Errorf("fluent#postRawData: Logger already closed")
295301
}
296302
return f.writeWithRetry(context.Background(), msg)
@@ -367,19 +373,22 @@ func (f *Fluent) EncodeData(tag string, tm time.Time, message interface{}) (msg
367373
// running in async mode, the run() goroutine exits before Close() returns.
368374
func (f *Fluent) Close() (err error) {
369375
if f.Config.Async {
376+
// Use a mutex to ensure thread safety when closing the channel
370377
f.pendingMutex.Lock()
371-
if f.closed {
378+
379+
if atomic.LoadInt32(&f.closed) == 1 {
372380
f.pendingMutex.Unlock()
373381
return nil
374382
}
375-
f.closed = true
376-
f.pendingMutex.Unlock()
383+
atomic.StoreInt32(&f.closed, 1)
377384

378385
if f.Config.ForceStopAsyncSend {
379386
f.cancelDialings()
380387
}
381388

382389
close(f.pending)
390+
f.pendingMutex.Unlock()
391+
383392
// If ForceStopAsyncSend is false, all logs in the channel have to be sent
384393
// before closing the connection. At this point closed is true so no more
385394
// logs are written to the channel and f.pending has been closed, so run()
@@ -391,7 +400,7 @@ func (f *Fluent) Close() (err error) {
391400

392401
f.muconn.Lock()
393402
f.close()
394-
f.closed = true
403+
atomic.StoreInt32(&f.closed, 1)
395404
f.muconn.Unlock()
396405

397406
// If ForceStopAsyncSend is true, we shall close the connection before waiting for
@@ -406,11 +415,19 @@ func (f *Fluent) Close() (err error) {
406415

407416
// appendBuffer appends data to buffer with lock.
408417
func (f *Fluent) appendBuffer(msg *msgToSend) error {
409-
f.pendingMutex.RLock()
410-
defer f.pendingMutex.RUnlock()
411-
if f.closed {
418+
if atomic.LoadInt32(&f.closed) == 1 {
412419
return fmt.Errorf("fluent#appendBuffer: Logger already closed")
413420
}
421+
422+
// Use a mutex to ensure thread safety when writing to the channel
423+
f.pendingMutex.Lock()
424+
defer f.pendingMutex.Unlock()
425+
426+
// Check again after acquiring the lock
427+
if atomic.LoadInt32(&f.closed) == 1 {
428+
return fmt.Errorf("fluent#appendBuffer: Logger already closed")
429+
}
430+
414431
select {
415432
case f.pending <- msg:
416433
default:
@@ -610,6 +627,8 @@ func (f *Fluent) write(ctx context.Context, msg *msgToSend) (bool, error) {
610627

611628
// Acknowledgment check
612629
if msg.ack != "" {
630+
f.muconn.Lock()
631+
613632
resp := &AckResp{}
614633
var err error
615634
if f.Config.MarshalAsJSON {
@@ -619,6 +638,7 @@ func (f *Fluent) write(ctx context.Context, msg *msgToSend) (bool, error) {
619638
r := msgp.NewReader(f.conn)
620639
err = resp.DecodeMsg(r)
621640
}
641+
f.muconn.Unlock()
622642

623643
if err != nil || resp.Ack != msg.ack {
624644
fmt.Fprintf(os.Stderr, "fluent#write: message ack (%s) doesn't match expected one (%s). Closing connection...", resp.Ack, msg.ack)

0 commit comments

Comments
 (0)