From 200b3e5c03dec9df65080f882bfeb7fb9d1e98f8 Mon Sep 17 00:00:00 2001 From: Anirudh Aithal Date: Mon, 5 May 2025 16:39:35 -0700 Subject: [PATCH 1/8] fluent: remove stopRunning channel. The channel and its usage is redundant since we already wire Fluent object with a cancellable context. Removed stopRunning to use the context instead. Instead of waiting for stopRunning in run(), we can just wait for context to be done as well. Signed-off-by: Anirudh Aithal --- fluent/fluent.go | 47 +++++++++++++++++++++-------------------------- 1 file changed, 21 insertions(+), 26 deletions(-) diff --git a/fluent/fluent.go b/fluent/fluent.go index 7216991..1dce881 100644 --- a/fluent/fluent.go +++ b/fluent/fluent.go @@ -104,8 +104,6 @@ type Fluent struct { Config dialer dialer - // stopRunning is used in async mode to signal to run() it should abort. - stopRunning chan struct{} // cancelDialings is used by Close() to stop any in-progress dialing. cancelDialings context.CancelFunc pending chan *msgToSend @@ -176,7 +174,6 @@ func newWithDialer(config Config, d dialer) (f *Fluent, err error) { f = &Fluent{ Config: config, dialer: d, - stopRunning: make(chan struct{}), cancelDialings: cancel, pending: make(chan *msgToSend, config.BufferLimit), pendingMutex: sync.RWMutex{}, @@ -200,27 +197,26 @@ func newWithDialer(config Config, d dialer) (f *Fluent, err error) { // // Examples: // -// // send map[string] -// mapStringData := map[string]string{ -// "foo": "bar", -// } -// f.Post("tag_name", mapStringData) +// // send map[string] +// mapStringData := map[string]string{ +// "foo": "bar", +// } +// f.Post("tag_name", mapStringData) // -// // send message with specified time -// mapStringData := map[string]string{ -// "foo": "bar", -// } -// tm := time.Now() -// f.PostWithTime("tag_name", tm, mapStringData) -// -// // send struct -// structData := struct { -// Name string `msg:"name"` -// } { -// "john smith", -// } -// f.Post("tag_name", structData) +// // send message with specified time +// mapStringData := map[string]string{ +// "foo": "bar", +// } +// tm := time.Now() +// f.PostWithTime("tag_name", tm, mapStringData) // +// // send struct +// structData := struct { +// Name string `msg:"name"` +// } { +// "john smith", +// } +// f.Post("tag_name", structData) func (f *Fluent) Post(tag string, message interface{}) error { timeNow := time.Now() return f.PostWithTime(tag, timeNow, message) @@ -380,7 +376,6 @@ func (f *Fluent) Close() (err error) { f.pendingMutex.Unlock() if f.Config.ForceStopAsyncSend { - close(f.stopRunning) f.cancelDialings() } @@ -513,7 +508,7 @@ func (f *Fluent) run(ctx context.Context) { for { select { case entry, ok := <-f.pending: - // f.stopRunning is closed before f.pending only when ForceStopAsyncSend + // The context is cancelled before f.pending only when ForceStopAsyncSend // is enabled. Otherwise, f.pending is closed when Close() is called. if !ok { f.wg.Done() @@ -540,9 +535,9 @@ func (f *Fluent) run(ctx context.Context) { } f.AsyncResultCallback(data, err) } - case <-f.stopRunning: + case <-ctx.Done(): + // Context was canceled, which means ForceStopAsyncSend was enabled fmt.Fprintf(os.Stderr, "[%s] Discarding queued events...\n", time.Now().Format(time.RFC3339)) - f.wg.Done() return } From f1b233b79e9e6fa3f648a7cc846dc726fe6ade76 Mon Sep 17 00:00:00 2001 From: Anirudh Aithal Date: Mon, 5 May 2025 17:05:12 -0700 Subject: [PATCH 2/8] fluent: pending channel thread saftey test Added a new test to ensure that pending channel is accessed in a thread safe manner. The code can be simplified by removing the pendingMutex lock. There's a risk in this codebase wrt how muconn and pendingMutex are acquired and released in different orders in different methods. This test is a precursor to the change to remove the pendingMutex to ensure that nothing is broken. Signed-off-by: Anirudh Aithal --- fluent/fluent_test.go | 135 +++++++++++++++++++++++++++++------------- 1 file changed, 94 insertions(+), 41 deletions(-) diff --git a/fluent/fluent_test.go b/fluent/fluent_test.go index 00a6086..b05f140 100644 --- a/fluent/fluent_test.go +++ b/fluent/fluent_test.go @@ -10,6 +10,9 @@ import ( "net" "reflect" "runtime" + "strconv" + "strings" + "sync" "testing" "time" @@ -45,18 +48,18 @@ func newTestDialer() *testDialer { // For instance, to test an async logger that have to dial 4 times before succeeding, // the test should look like this: // -// d := newTestDialer() // Create a new stubbed dialer -// cfg := Config{ -// Async: true, -// // ... -// } -// f := newWithDialer(cfg, d) // Create a fluent logger using the stubbed dialer -// f.EncodeAndPostData("tag_name", time.Unix(1482493046, 0), map[string]string{"foo": "bar"}) +// d := newTestDialer() // Create a new stubbed dialer +// cfg := Config{ +// Async: true, +// // ... +// } +// f := newWithDialer(cfg, d) // Create a fluent logger using the stubbed dialer +// f.EncodeAndPostData("tag_name", time.Unix(1482493046, 0), map[string]string{"foo": "bar"}) // -// d.waitForNextDialing(false, false) // 1st dialing attempt fails -// d.waitForNextDialing(false, false) // 2nd attempt fails too -// d.waitForNextDialing(false, false) // 3rd attempt fails too -// d.waitForNextDialing(true, false) // Finally the 4th attempt succeeds +// d.waitForNextDialing(false, false) // 1st dialing attempt fails +// d.waitForNextDialing(false, false) // 2nd attempt fails too +// d.waitForNextDialing(false, false) // 3rd attempt fails too +// d.waitForNextDialing(true, false) // Finally the 4th attempt succeeds // // Note that in the above example, the logger operates in async mode. As such, // a call to Post, PostWithTime or EncodeAndPostData is needed *before* calling @@ -67,20 +70,20 @@ func newTestDialer() *testDialer { // case, you have to put the calls to newWithDialer() and to EncodeAndPostData() // into their own goroutine. An example: // -// d := newTestDialer() // Create a new stubbed dialer -// cfg := Config{ -// Async: false, -// // ... -// } -// go func() { -// f := newWithDialer(cfg, d) // Create a fluent logger using the stubbed dialer -// f.Close() -// }() +// d := newTestDialer() // Create a new stubbed dialer +// cfg := Config{ +// Async: false, +// // ... +// } +// go func() { +// f := newWithDialer(cfg, d) // Create a fluent logger using the stubbed dialer +// f.Close() +// }() // -// d.waitForNextDialing(false, false) // 1st dialing attempt fails -// d.waitForNextDialing(false, false) // 2nd attempt fails too -// d.waitForNextDialing(false, false) // 3rd attempt fails too -// d.waitForNextDialing(true, false) // Finally the 4th attempt succeeds +// d.waitForNextDialing(false, false) // 1st dialing attempt fails +// d.waitForNextDialing(false, false) // 2nd attempt fails too +// d.waitForNextDialing(false, false) // 3rd attempt fails too +// d.waitForNextDialing(true, false) // Finally the 4th attempt succeeds // // Moreover, waitForNextDialing() returns a *Conn which extends net.Conn to provide testing // facilities. For instance, you can call waitForNextWrite() on these connections, to @@ -91,24 +94,24 @@ func newTestDialer() *testDialer { // // Here's a full example: // -// d := newTestDialer() -// cfg := Config{Async: true} +// d := newTestDialer() +// cfg := Config{Async: true} // -// f := newWithDialer(cfg, d) -// f.EncodeAndPostData("tag_name", time.Unix(1482493046, 0), map[string]string{"foo": "bar"}) +// f := newWithDialer(cfg, d) +// f.EncodeAndPostData("tag_name", time.Unix(1482493046, 0), map[string]string{"foo": "bar"}) // -// conn := d.waitForNextDialing(true, false) // Accept the dialing -// conn.waitForNextWrite(false, "") // Discard the 1st attempt to write the message +// conn := d.waitForNextDialing(true, false) // Accept the dialing +// conn.waitForNextWrite(false, "") // Discard the 1st attempt to write the message // -// conn := d.waitForNextDialing(true, false) -// assertReceived(t, // t is *testing.T -// conn.waitForNextWrite(true, ""), -// "[\"tag_name\",1482493046,{\"foo\":\"bar\"},{}]") +// conn := d.waitForNextDialing(true, false) +// assertReceived(t, // t is *testing.T +// conn.waitForNextWrite(true, ""), +// "[\"tag_name\",1482493046,{\"foo\":\"bar\"},{}]") // -// f.EncodeAndPostData("something_else", time.Unix(1482493050, 0), map[string]string{"bar": "baz"}) -// assertReceived(t, // t is *testing.T -// conn.waitForNextWrite(true, ""), -// "[\"something_else\",1482493050,{\"bar\":\"baz\"},{}]") +// f.EncodeAndPostData("something_else", time.Unix(1482493050, 0), map[string]string{"bar": "baz"}) +// assertReceived(t, // t is *testing.T +// conn.waitForNextWrite(true, ""), +// "[\"something_else\",1482493050,{\"bar\":\"baz\"},{}]") // // In this example, the 1st connection dialing succeeds but the 1st attempt to write the // message is discarded. As the logger discards the connection whenever a message @@ -472,7 +475,10 @@ func TestPostWithTime(t *testing.T) { _ = f.PostWithTime("tag_name", time.Unix(1482493046, 0), map[string]string{"foo": "bar"}) _ = f.PostWithTime("tag_name", time.Unix(1482493050, 0), map[string]string{"fluentd": "is awesome"}) _ = f.PostWithTime("tag_name", time.Unix(1634263200, 0), - struct {Welcome string `msg:"welcome"`; cannot string}{"to use", "see me"}) + struct { + Welcome string `msg:"welcome"` + cannot string + }{"to use", "see me"}) }() conn := d.waitForNextDialing(true, false) @@ -755,16 +761,63 @@ func TestSyncWriteAfterCloseFails(t *testing.T) { err = f.PostWithTime("tag_name", time.Unix(1482493050, 0), map[string]string{"foo": "buzz"}) // The event submission must fail, - assert.NotEqual(t, err, nil); + assert.NotEqual(t, err, nil) // and also must keep Fluentd closed. - assert.NotEqual(t, f.closed, false); + assert.NotEqual(t, f.closed, false) }() conn := d.waitForNextDialing(true, false) conn.waitForNextWrite(true, "") } +func TestPendingChannelThreadSafety(t *testing.T) { + f, err := New(Config{ + Async: true, + ForceStopAsyncSend: true, + }) + if err != nil { + t.Fatalf("Failed to create logger: %v", err) + } + + // Start multiple goroutines posting messages + const numGoroutines = 10 + const messagesPerGoroutine = 100 + var wg sync.WaitGroup + wg.Add(numGoroutines) + + for i := 0; i < numGoroutines; i++ { + go func(id int) { + defer wg.Done() + for j := 0; j < messagesPerGoroutine; j++ { + // Post a message + err := f.Post("tag", map[string]string{ + "goroutine": strconv.Itoa(id), + "message": strconv.Itoa(j), + }) + + // If the logger is closed, we expect an error + if err != nil && !strings.Contains(err.Error(), "already closed") { + t.Errorf("Unexpected error: %v", err) + return + } + + // Add a small delay to increase the chance of race conditions + time.Sleep(time.Millisecond) + } + }(i) + } + + // Wait a bit to let some messages be posted + time.Sleep(10 * time.Millisecond) + + // Close the logger while goroutines are still posting + f.Close() + + // Wait for all goroutines to finish + wg.Wait() +} + func Benchmark_PostWithShortMessage(b *testing.B) { b.StopTimer() d := newTestDialer() From 10e77850f25731aafd3d10fd7cab95247266398a Mon Sep 17 00:00:00 2001 From: Anirudh Aithal Date: Tue, 6 May 2025 08:45:37 -0700 Subject: [PATCH 3/8] fluent: make connection closed variable thread-safe Changed the "closed" variable to be an atomic int. There is a race condition b/w appendBuffer and Close() method wrt how this variable is accessed. This could be an atomic.Bool, but the CI/CD config uses golang version 1.17. Since atomic.Bool is supported 1.19 onwards, using that would fail tests and build. Data race trace without this fix (using `go test -race`): ================== WARNING: DATA RACE Write at 0x00c0003324e0 by goroutine 95: github.com/fluent/fluent-logger-golang/fluent.(*Fluent).close() /Users/aithal/workspace/src/github.com/aaithal/fluent-logger-golang/fluent/fluent.go:426 +0x13c github.com/fluent/fluent-logger-golang/fluent.(*Fluent).Close() /Users/aithal/workspace/src/github.com/aaithal/fluent-logger-golang/fluent/fluent.go:393 +0x108 github.com/fluent/fluent-logger-golang/fluent.TestCloseWhileWaitingForAckResponse.func1() /Users/aithal/workspace/src/github.com/aaithal/fluent-logger-golang/fluent/fluent_test.go:737 +0x2c github.com/fluent/fluent-logger-golang/fluent.timeout.func1() /Users/aithal/workspace/src/github.com/aaithal/fluent-logger-golang/fluent/fluent_test.go:557 +0x38 Previous read at 0x00c0003324e0 by goroutine 94: github.com/fluent/fluent-logger-golang/fluent.(*Fluent).write() /Users/aithal/workspace/src/github.com/aaithal/fluent-logger-golang/fluent/fluent.go:619 +0x1ac github.com/fluent/fluent-logger-golang/fluent.(*Fluent).writeWithRetry() /Users/aithal/workspace/src/github.com/aaithal/fluent-logger-golang/fluent/fluent.go:553 +0x78 github.com/fluent/fluent-logger-golang/fluent.(*Fluent).run() /Users/aithal/workspace/src/github.com/aaithal/fluent-logger-golang/fluent/fluent.go:527 +0x1d0 github.com/fluent/fluent-logger-golang/fluent.newWithDialer.gowrap1() /Users/aithal/workspace/src/github.com/aaithal/fluent-logger-golang/fluent/fluent.go:184 +0x4c Goroutine 95 (running) created at: github.com/fluent/fluent-logger-golang/fluent.timeout() /Users/aithal/workspace/src/github.com/aaithal/fluent-logger-golang/fluent/fluent_test.go:556 +0xe8 github.com/fluent/fluent-logger-golang/fluent.TestCloseWhileWaitingForAckResponse() /Users/aithal/workspace/src/github.com/aaithal/fluent-logger-golang/fluent/fluent_test.go:736 +0x444 testing.tRunner() /opt/homebrew/Cellar/go/1.24.2/libexec/src/testing/testing.go:1792 +0x180 testing.(*T).Run.gowrap1() /opt/homebrew/Cellar/go/1.24.2/libexec/src/testing/testing.go:1851 +0x40 Goroutine 94 (running) created at: github.com/fluent/fluent-logger-golang/fluent.newWithDialer() /Users/aithal/workspace/src/github.com/aaithal/fluent-logger-golang/fluent/fluent.go:184 +0x368 github.com/fluent/fluent-logger-golang/fluent.TestCloseWhileWaitingForAckResponse() /Users/aithal/workspace/src/github.com/aaithal/fluent-logger-golang/fluent/fluent_test.go:722 +0xd8 testing.tRunner() /opt/homebrew/Cellar/go/1.24.2/libexec/src/testing/testing.go:1792 +0x180 testing.(*T).Run.gowrap1() /opt/homebrew/Cellar/go/1.24.2/libexec/src/testing/testing.go:1851 +0x40 ================== [2025-05-06T08:42:08-07:00] Discarding queued events... testing.go:1490: race detected during execution of test Signed-off-by: Anirudh Aithal --- fluent/fluent.go | 54 +++++++++++++++++++++++++++++++++--------------- 1 file changed, 37 insertions(+), 17 deletions(-) diff --git a/fluent/fluent.go b/fluent/fluent.go index 1dce881..c5eaade 100644 --- a/fluent/fluent.go +++ b/fluent/fluent.go @@ -13,6 +13,7 @@ import ( "reflect" "strconv" "sync" + "sync/atomic" "time" "bytes" @@ -107,15 +108,19 @@ type Fluent struct { // cancelDialings is used by Close() to stop any in-progress dialing. cancelDialings context.CancelFunc pending chan *msgToSend - pendingMutex sync.RWMutex - closed bool - wg sync.WaitGroup + // closed indicates if the connection is open or closed. + // 0 = open (false), 1 = closed (true). Since the code is built in CI with + // golang < 1.19, we're using atomic int32 here. Otherwise, atomic.Bool + // could have been used. + closed int32 + wg sync.WaitGroup // time at which the most recent connection to fluentd-address was established. latestReconnectTime time.Time - muconn sync.RWMutex - conn net.Conn + muconn sync.RWMutex + pendingMutex sync.RWMutex + conn net.Conn } type dialer interface { @@ -176,17 +181,18 @@ func newWithDialer(config Config, d dialer) (f *Fluent, err error) { dialer: d, cancelDialings: cancel, pending: make(chan *msgToSend, config.BufferLimit), - pendingMutex: sync.RWMutex{}, muconn: sync.RWMutex{}, + pendingMutex: sync.RWMutex{}, } f.wg.Add(1) go f.run(ctx) } else { f = &Fluent{ - Config: config, - dialer: d, - muconn: sync.RWMutex{}, + Config: config, + dialer: d, + muconn: sync.RWMutex{}, + pendingMutex: sync.RWMutex{}, } err = f.connect(context.Background()) } @@ -290,7 +296,7 @@ func (f *Fluent) postRawData(msg *msgToSend) error { } // Synchronous write - if f.closed { + if atomic.LoadInt32(&f.closed) == 1 { return fmt.Errorf("fluent#postRawData: Logger already closed") } return f.writeWithRetry(context.Background(), msg) @@ -367,19 +373,22 @@ func (f *Fluent) EncodeData(tag string, tm time.Time, message interface{}) (msg // running in async mode, the run() goroutine exits before Close() returns. func (f *Fluent) Close() (err error) { if f.Config.Async { + // Use a mutex to ensure thread safety when closing the channel f.pendingMutex.Lock() - if f.closed { + + if atomic.LoadInt32(&f.closed) == 1 { f.pendingMutex.Unlock() return nil } - f.closed = true - f.pendingMutex.Unlock() + atomic.StoreInt32(&f.closed, 1) if f.Config.ForceStopAsyncSend { f.cancelDialings() } close(f.pending) + f.pendingMutex.Unlock() + // If ForceStopAsyncSend is false, all logs in the channel have to be sent // before closing the connection. At this point closed is true so no more // logs are written to the channel and f.pending has been closed, so run() @@ -391,7 +400,7 @@ func (f *Fluent) Close() (err error) { f.muconn.Lock() f.close() - f.closed = true + atomic.StoreInt32(&f.closed, 1) f.muconn.Unlock() // If ForceStopAsyncSend is true, we shall close the connection before waiting for @@ -406,11 +415,19 @@ func (f *Fluent) Close() (err error) { // appendBuffer appends data to buffer with lock. func (f *Fluent) appendBuffer(msg *msgToSend) error { - f.pendingMutex.RLock() - defer f.pendingMutex.RUnlock() - if f.closed { + if atomic.LoadInt32(&f.closed) == 1 { return fmt.Errorf("fluent#appendBuffer: Logger already closed") } + + // Use a mutex to ensure thread safety when writing to the channel + f.pendingMutex.Lock() + defer f.pendingMutex.Unlock() + + // Check again after acquiring the lock + if atomic.LoadInt32(&f.closed) == 1 { + return fmt.Errorf("fluent#appendBuffer: Logger already closed") + } + select { case f.pending <- msg: default: @@ -610,6 +627,8 @@ func (f *Fluent) write(ctx context.Context, msg *msgToSend) (bool, error) { // Acknowledgment check if msg.ack != "" { + f.muconn.Lock() + resp := &AckResp{} var err error if f.Config.MarshalAsJSON { @@ -619,6 +638,7 @@ func (f *Fluent) write(ctx context.Context, msg *msgToSend) (bool, error) { r := msgp.NewReader(f.conn) err = resp.DecodeMsg(r) } + f.muconn.Unlock() if err != nil || resp.Ack != msg.ack { fmt.Fprintf(os.Stderr, "fluent#write: message ack (%s) doesn't match expected one (%s). Closing connection...", resp.Ack, msg.ack) From f81b4e6942f387f954917e6c023e6252b3c64ae6 Mon Sep 17 00:00:00 2001 From: Anirudh Aithal Date: Tue, 6 May 2025 10:16:15 -0700 Subject: [PATCH 4/8] fluent: refactor code for muconn lock acquisition Refactored the code to follow the pattern of releasing the mutex lock in a defer block as much as possible for the muconn lock. This should make the code more maintainable, avoiding issues with not accidentally releasing the lock. Signed-off-by: Anirudh Aithal --- fluent/fluent.go | 124 ++++++++++++++++++++++++++--------------------- 1 file changed, 69 insertions(+), 55 deletions(-) diff --git a/fluent/fluent.go b/fluent/fluent.go index c5eaade..e375a2f 100644 --- a/fluent/fluent.go +++ b/fluent/fluent.go @@ -398,10 +398,7 @@ func (f *Fluent) Close() (err error) { } } - f.muconn.Lock() - f.close() - atomic.StoreInt32(&f.closed, 1) - f.muconn.Unlock() + f.syncClose(true) // If ForceStopAsyncSend is true, we shall close the connection before waiting for // run() goroutine to exit to be sure we aren't waiting on ack message that might @@ -436,6 +433,17 @@ func (f *Fluent) appendBuffer(msg *msgToSend) error { return nil } +func (f *Fluent) syncClose(setClosed bool) { + f.muconn.Lock() + defer f.muconn.Unlock() + + if setClosed { + atomic.StoreInt32(&f.closed, 1) + } + + f.close() +} + // close closes the connection. Callers should take care of locking muconn first. func (f *Fluent) close() { if f.conn != nil { @@ -476,6 +484,17 @@ func (f *Fluent) connect(ctx context.Context) (err error) { var errIsClosing = errors.New("fluent logger is closing") +func (f *Fluent) syncConnectWithRetry(ctx context.Context) error { + f.muconn.Lock() + defer f.muconn.Unlock() + + if f.conn == nil { + return f.connectWithRetry(ctx) + } + + return nil +} + // Caller should take care of locking muconn first. func (f *Fluent) connectWithRetry(ctx context.Context) error { // A Time channel is used instead of time.Sleep() to avoid blocking this @@ -575,75 +594,70 @@ func (f *Fluent) writeWithRetry(ctx context.Context, msg *msgToSend) error { return fmt.Errorf("fluent#write: failed to write after %d attempts", f.Config.MaxRetry) } +func (f *Fluent) syncWriteMessage(msg *msgToSend) error { + f.muconn.RLock() + defer f.muconn.RUnlock() + + if f.conn == nil { + return fmt.Errorf("connection has been closed before writing to it") + } + + t := f.Config.WriteTimeout + if time.Duration(0) < t { + f.conn.SetWriteDeadline(time.Now().Add(t)) + } else { + f.conn.SetWriteDeadline(time.Time{}) + } + + _, err := f.conn.Write(msg.data) + return err +} + +func (f *Fluent) syncReadAck() (*AckResp, error) { + f.muconn.RLock() + defer f.muconn.RUnlock() + + resp := &AckResp{} + var err error + if f.Config.MarshalAsJSON { + dec := json.NewDecoder(f.conn) + err = dec.Decode(resp) + } else { + r := msgp.NewReader(f.conn) + err = resp.DecodeMsg(r) + } + + return resp, err +} + // write writes the provided msg to fluentd server. Its first return values is // a bool indicating whether the write should be retried. // This method relies on function literals to execute muconn.Unlock or // muconn.RUnlock in deferred calls to ensure the mutex is unlocked even in // the case of panic recovering. func (f *Fluent) write(ctx context.Context, msg *msgToSend) (bool, error) { - closer := func() { - f.muconn.Lock() - defer f.muconn.Unlock() - - f.close() - } - - if err := func() (err error) { - f.muconn.Lock() - defer f.muconn.Unlock() - - if f.conn == nil { - err = f.connectWithRetry(ctx) - } - - return err - }(); err != nil { + if err := f.syncConnectWithRetry(ctx); err != nil { // Here, we don't want to retry the write since connectWithRetry already // retries Config.MaxRetry times to connect. return false, fmt.Errorf("fluent#write: %v", err) } - if err := func() (err error) { - f.muconn.RLock() - defer f.muconn.RUnlock() - - if f.conn == nil { - return fmt.Errorf("connection has been closed before writing to it") - } - - t := f.Config.WriteTimeout - if time.Duration(0) < t { - f.conn.SetWriteDeadline(time.Now().Add(t)) - } else { - f.conn.SetWriteDeadline(time.Time{}) - } - - _, err = f.conn.Write(msg.data) - return err - }(); err != nil { - closer() + if err := f.syncWriteMessage(msg); err != nil { + f.syncClose(false) return true, fmt.Errorf("fluent#write: %v", err) } // Acknowledgment check if msg.ack != "" { - f.muconn.Lock() - - resp := &AckResp{} - var err error - if f.Config.MarshalAsJSON { - dec := json.NewDecoder(f.conn) - err = dec.Decode(resp) - } else { - r := msgp.NewReader(f.conn) - err = resp.DecodeMsg(r) + resp, err := f.syncReadAck() + if err != nil { + fmt.Fprintf(os.Stderr, "fluent#write: error reading message response ack %v", err) + f.syncClose(false) + return true, err } - f.muconn.Unlock() - - if err != nil || resp.Ack != msg.ack { + if resp.Ack != msg.ack { fmt.Fprintf(os.Stderr, "fluent#write: message ack (%s) doesn't match expected one (%s). Closing connection...", resp.Ack, msg.ack) - - closer() + f.syncClose(false) return true, err } } From 11bef51c367d33fd43d6368b254b9b961177f7f1 Mon Sep 17 00:00:00 2001 From: Anirudh Aithal Date: Tue, 6 May 2025 10:26:47 -0700 Subject: [PATCH 5/8] fluent: check if context is cancelled before performing I/O on the connection There are a number of places in the code where we do not check if the context is cancelled before reading from or writing to the connection. This commit adds those checks. Signed-off-by: Anirudh Aithal cr: https://code.amazon.com/reviews/CR-194284782 Signed-off-by: Anirudh Aithal --- fluent/fluent.go | 49 +++++++++++++++++++++++++++++++----------------- 1 file changed, 32 insertions(+), 17 deletions(-) diff --git a/fluent/fluent.go b/fluent/fluent.go index e375a2f..2f2d614 100644 --- a/fluent/fluent.go +++ b/fluent/fluent.go @@ -81,7 +81,7 @@ type Config struct { RequestAck bool `json:"request_ack"` // Flag to skip verifying insecure certs on TLS connections - TlsInsecureSkipVerify bool `json: "tls_insecure_skip_verify"` + TlsInsecureSkipVerify bool `json:"tls_insecure_skip_verify"` } type ErrUnknownNetwork struct { @@ -280,7 +280,7 @@ func (f *Fluent) EncodeAndPostData(tag string, tm time.Time, message interface{} var msg *msgToSend var err error if msg, err = f.EncodeData(tag, tm, message); err != nil { - return fmt.Errorf("fluent#EncodeAndPostData: can't convert '%#v' to msgpack:%v", message, err) + return fmt.Errorf("fluent#EncodeAndPostData: can't convert '%#v' to msgpack:%w", message, err) } return f.postRawData(msg) } @@ -594,31 +594,46 @@ func (f *Fluent) writeWithRetry(ctx context.Context, msg *msgToSend) error { return fmt.Errorf("fluent#write: failed to write after %d attempts", f.Config.MaxRetry) } -func (f *Fluent) syncWriteMessage(msg *msgToSend) error { - f.muconn.RLock() - defer f.muconn.RUnlock() +func (f *Fluent) syncWriteMessage(ctx context.Context, msg *msgToSend) error { + f.muconn.Lock() + defer f.muconn.Unlock() + + // Check if context is cancelled. If it is, we can return early here. + if err := ctx.Err(); err != nil { + return errIsClosing + } if f.conn == nil { - return fmt.Errorf("connection has been closed before writing to it") + return fmt.Errorf("fluent#write: connection has been closed before writing to it") } t := f.Config.WriteTimeout + var err error if time.Duration(0) < t { - f.conn.SetWriteDeadline(time.Now().Add(t)) + err = f.conn.SetWriteDeadline(time.Now().Add(t)) } else { - f.conn.SetWriteDeadline(time.Time{}) + err = f.conn.SetWriteDeadline(time.Time{}) } - _, err := f.conn.Write(msg.data) + if err != nil { + return fmt.Errorf("fluent#write: failed to set write deadline: %w", err) + } + _, err = f.conn.Write(msg.data) return err } -func (f *Fluent) syncReadAck() (*AckResp, error) { - f.muconn.RLock() - defer f.muconn.RUnlock() +func (f *Fluent) syncReadAck(ctx context.Context) (*AckResp, error) { + f.muconn.Lock() + defer f.muconn.Unlock() resp := &AckResp{} var err error + + // Check if context is cancelled. If it is, we can return early here. + if err := ctx.Err(); err != nil { + return resp, errIsClosing + } + if f.Config.MarshalAsJSON { dec := json.NewDecoder(f.conn) err = dec.Decode(resp) @@ -639,19 +654,19 @@ func (f *Fluent) write(ctx context.Context, msg *msgToSend) (bool, error) { if err := f.syncConnectWithRetry(ctx); err != nil { // Here, we don't want to retry the write since connectWithRetry already // retries Config.MaxRetry times to connect. - return false, fmt.Errorf("fluent#write: %v", err) + return false, fmt.Errorf("fluent#write: %w", err) } - if err := f.syncWriteMessage(msg); err != nil { + if err := f.syncWriteMessage(ctx, msg); err != nil { f.syncClose(false) - return true, fmt.Errorf("fluent#write: %v", err) + return true, fmt.Errorf("fluent#write: %w", err) } // Acknowledgment check if msg.ack != "" { - resp, err := f.syncReadAck() + resp, err := f.syncReadAck(ctx) if err != nil { - fmt.Fprintf(os.Stderr, "fluent#write: error reading message response ack %v", err) + fmt.Fprintf(os.Stderr, "fluent#write: error reading message response ack %v. Closing connection...", err) f.syncClose(false) return true, err } From 1f759a437ae4745854bc637b75b069e53e77ba1b Mon Sep 17 00:00:00 2001 From: Anirudh Aithal Date: Thu, 8 May 2025 15:22:45 -0700 Subject: [PATCH 6/8] fluent: add read timeout config This can be used by clients to exit early when the connection is unresponsive while reading acks from the server. Signed-off-by: Anirudh Aithal --- fluent/fluent.go | 24 +++++++++++++++++++++--- fluent/fluent_test.go | 6 ++++++ 2 files changed, 27 insertions(+), 3 deletions(-) diff --git a/fluent/fluent.go b/fluent/fluent.go index 2f2d614..d33012f 100644 --- a/fluent/fluent.go +++ b/fluent/fluent.go @@ -35,12 +35,10 @@ const ( defaultMaxRetryWait = 60000 defaultMaxRetry = 13 defaultReconnectWaitIncreRate = 1.5 - // Default sub-second precision value to false since it is only compatible - // with fluentd versions v0.14 and above. - defaultSubSecondPrecision = false // Default value whether to skip checking insecure certs on TLS connections. defaultTlsInsecureSkipVerify = false + defaultReadTimeout = time.Duration(0) // Read() will not time out ) // randomGenerator is used by getUniqueId to generate ack hashes. Its value is replaced @@ -82,6 +80,9 @@ type Config struct { // Flag to skip verifying insecure certs on TLS connections TlsInsecureSkipVerify bool `json:"tls_insecure_skip_verify"` + + // ReadTimeout specifies the timeout on reads. Currently only acks are read. + ReadTimeout time.Duration `json:"read_timeout"` } type ErrUnknownNetwork struct { @@ -153,6 +154,9 @@ func newWithDialer(config Config, d dialer) (f *Fluent, err error) { if config.WriteTimeout == 0 { config.WriteTimeout = defaultWriteTimeout } + if config.ReadTimeout == 0 { + config.ReadTimeout = defaultReadTimeout + } if config.BufferLimit == 0 { config.BufferLimit = defaultBufferLimit } @@ -629,11 +633,25 @@ func (f *Fluent) syncReadAck(ctx context.Context) (*AckResp, error) { resp := &AckResp{} var err error + if f.conn == nil { + return resp, fmt.Errorf("fluent#read: connection has been closed before reading from it") + } + // Check if context is cancelled. If it is, we can return early here. if err := ctx.Err(); err != nil { return resp, errIsClosing } + t := f.Config.ReadTimeout + if time.Duration(0) < t { + err = f.conn.SetReadDeadline(time.Now().Add(t)) + } else { + err = f.conn.SetReadDeadline(time.Time{}) + } + if err != nil { + return resp, fmt.Errorf("fluent#read: failed to set read deadline: %w", err) + } + if f.Config.MarshalAsJSON { dec := json.NewDecoder(f.conn) err = dec.Decode(resp) diff --git a/fluent/fluent_test.go b/fluent/fluent_test.go index b05f140..ed8d6ab 100644 --- a/fluent/fluent_test.go +++ b/fluent/fluent_test.go @@ -265,6 +265,11 @@ func (c *Conn) SetWriteDeadline(t time.Time) error { return nil } +// SetReadDeadline is a nop for our test dialer. +func (c *Conn) SetReadDeadline(time.Time) error { + return nil +} + func (c *Conn) Close() error { if c.delayNextReadCh != nil { close(c.delayNextReadCh) @@ -679,6 +684,7 @@ func TestCloseOnFailingAsyncReconnect(t *testing.T) { ForceStopAsyncSend: true, RequestAck: true, }, + "without RequestAck": { Async: true, ForceStopAsyncSend: true, From ff8e4225bfdc865329c56420b48e6f42406974d6 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Mon, 19 May 2025 19:32:41 +0900 Subject: [PATCH 7/8] test: Follow changes of testing framwwork Signed-off-by: Hiroshi Hatake --- fluent/fluent_test.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/fluent/fluent_test.go b/fluent/fluent_test.go index 0d6b03f..c8b33e7 100644 --- a/fluent/fluent_test.go +++ b/fluent/fluent_test.go @@ -772,10 +772,14 @@ func TestSyncWriteAfterCloseFails(t *testing.T) { err = f.PostWithTime("tag_name", time.Unix(1482493050, 0), map[string]string{"foo": "buzz"}) // The event submission must fail, - assert.NotEqual(t, err, nil) + if err == nil { + t.Error("expected an error") + } - // and also must keep Fluentd closed. - assert.NotEqual(t, f.closed, false) + // and also must keep Fluentd closed. true equals 1. + if f.closed != int32(1) { + t.Error("expected Fluentd to be kept closed") + } }() conn := d.waitForNextDialing(true, false) From 6a76acc3c147b2421581241bf0e3b4be1a447148 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Mon, 19 May 2025 19:36:16 +0900 Subject: [PATCH 8/8] workflows: Use race detector on test Signed-off-by: Hiroshi Hatake --- .github/workflows/ci.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index dfdd6f5..d944099 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -24,5 +24,5 @@ jobs: with: go-version: ${{ matrix.golang }} - name: Test - run: go test -v ./fluent + run: go test -v -race -cover -covermode=atomic ./fluent shell: bash