Skip to content

Commit 82c96e2

Browse files
committed
fluent: pending channel thread saftey test
Added a new test to ensure that pending channel is accessed in a thread safe manner. The code can be simplified by removing the pendingMutex lock. There's a risk in this codebase wrt how muconn and pendingMutex are acquired and released in different orders in different methods. This test is a precursor to the change to remove the pendingMutex to ensure that nothing is broken. Signed-off-by: Anirudh Aithal <aithal@amazon.com>
1 parent 200b3e5 commit 82c96e2

File tree

1 file changed

+93
-41
lines changed

1 file changed

+93
-41
lines changed

fluent/fluent_test.go

Lines changed: 93 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,9 @@ import (
1010
"net"
1111
"reflect"
1212
"runtime"
13+
"strconv"
14+
"strings"
15+
"sync"
1316
"testing"
1417
"time"
1518

@@ -45,18 +48,18 @@ func newTestDialer() *testDialer {
4548
// For instance, to test an async logger that have to dial 4 times before succeeding,
4649
// the test should look like this:
4750
//
48-
// d := newTestDialer() // Create a new stubbed dialer
49-
// cfg := Config{
50-
// Async: true,
51-
// // ...
52-
// }
53-
// f := newWithDialer(cfg, d) // Create a fluent logger using the stubbed dialer
54-
// f.EncodeAndPostData("tag_name", time.Unix(1482493046, 0), map[string]string{"foo": "bar"})
51+
// d := newTestDialer() // Create a new stubbed dialer
52+
// cfg := Config{
53+
// Async: true,
54+
// // ...
55+
// }
56+
// f := newWithDialer(cfg, d) // Create a fluent logger using the stubbed dialer
57+
// f.EncodeAndPostData("tag_name", time.Unix(1482493046, 0), map[string]string{"foo": "bar"})
5558
//
56-
// d.waitForNextDialing(false, false) // 1st dialing attempt fails
57-
// d.waitForNextDialing(false, false) // 2nd attempt fails too
58-
// d.waitForNextDialing(false, false) // 3rd attempt fails too
59-
// d.waitForNextDialing(true, false) // Finally the 4th attempt succeeds
59+
// d.waitForNextDialing(false, false) // 1st dialing attempt fails
60+
// d.waitForNextDialing(false, false) // 2nd attempt fails too
61+
// d.waitForNextDialing(false, false) // 3rd attempt fails too
62+
// d.waitForNextDialing(true, false) // Finally the 4th attempt succeeds
6063
//
6164
// Note that in the above example, the logger operates in async mode. As such,
6265
// a call to Post, PostWithTime or EncodeAndPostData is needed *before* calling
@@ -67,20 +70,20 @@ func newTestDialer() *testDialer {
6770
// case, you have to put the calls to newWithDialer() and to EncodeAndPostData()
6871
// into their own goroutine. An example:
6972
//
70-
// d := newTestDialer() // Create a new stubbed dialer
71-
// cfg := Config{
72-
// Async: false,
73-
// // ...
74-
// }
75-
// go func() {
76-
// f := newWithDialer(cfg, d) // Create a fluent logger using the stubbed dialer
77-
// f.Close()
78-
// }()
73+
// d := newTestDialer() // Create a new stubbed dialer
74+
// cfg := Config{
75+
// Async: false,
76+
// // ...
77+
// }
78+
// go func() {
79+
// f := newWithDialer(cfg, d) // Create a fluent logger using the stubbed dialer
80+
// f.Close()
81+
// }()
7982
//
80-
// d.waitForNextDialing(false, false) // 1st dialing attempt fails
81-
// d.waitForNextDialing(false, false) // 2nd attempt fails too
82-
// d.waitForNextDialing(false, false) // 3rd attempt fails too
83-
// d.waitForNextDialing(true, false) // Finally the 4th attempt succeeds
83+
// d.waitForNextDialing(false, false) // 1st dialing attempt fails
84+
// d.waitForNextDialing(false, false) // 2nd attempt fails too
85+
// d.waitForNextDialing(false, false) // 3rd attempt fails too
86+
// d.waitForNextDialing(true, false) // Finally the 4th attempt succeeds
8487
//
8588
// Moreover, waitForNextDialing() returns a *Conn which extends net.Conn to provide testing
8689
// facilities. For instance, you can call waitForNextWrite() on these connections, to
@@ -91,24 +94,24 @@ func newTestDialer() *testDialer {
9194
//
9295
// Here's a full example:
9396
//
94-
// d := newTestDialer()
95-
// cfg := Config{Async: true}
97+
// d := newTestDialer()
98+
// cfg := Config{Async: true}
9699
//
97-
// f := newWithDialer(cfg, d)
98-
// f.EncodeAndPostData("tag_name", time.Unix(1482493046, 0), map[string]string{"foo": "bar"})
100+
// f := newWithDialer(cfg, d)
101+
// f.EncodeAndPostData("tag_name", time.Unix(1482493046, 0), map[string]string{"foo": "bar"})
99102
//
100-
// conn := d.waitForNextDialing(true, false) // Accept the dialing
101-
// conn.waitForNextWrite(false, "") // Discard the 1st attempt to write the message
103+
// conn := d.waitForNextDialing(true, false) // Accept the dialing
104+
// conn.waitForNextWrite(false, "") // Discard the 1st attempt to write the message
102105
//
103-
// conn := d.waitForNextDialing(true, false)
104-
// assertReceived(t, // t is *testing.T
105-
// conn.waitForNextWrite(true, ""),
106-
// "[\"tag_name\",1482493046,{\"foo\":\"bar\"},{}]")
106+
// conn := d.waitForNextDialing(true, false)
107+
// assertReceived(t, // t is *testing.T
108+
// conn.waitForNextWrite(true, ""),
109+
// "[\"tag_name\",1482493046,{\"foo\":\"bar\"},{}]")
107110
//
108-
// f.EncodeAndPostData("something_else", time.Unix(1482493050, 0), map[string]string{"bar": "baz"})
109-
// assertReceived(t, // t is *testing.T
110-
// conn.waitForNextWrite(true, ""),
111-
// "[\"something_else\",1482493050,{\"bar\":\"baz\"},{}]")
111+
// f.EncodeAndPostData("something_else", time.Unix(1482493050, 0), map[string]string{"bar": "baz"})
112+
// assertReceived(t, // t is *testing.T
113+
// conn.waitForNextWrite(true, ""),
114+
// "[\"something_else\",1482493050,{\"bar\":\"baz\"},{}]")
112115
//
113116
// In this example, the 1st connection dialing succeeds but the 1st attempt to write the
114117
// message is discarded. As the logger discards the connection whenever a message
@@ -472,7 +475,10 @@ func TestPostWithTime(t *testing.T) {
472475
_ = f.PostWithTime("tag_name", time.Unix(1482493046, 0), map[string]string{"foo": "bar"})
473476
_ = f.PostWithTime("tag_name", time.Unix(1482493050, 0), map[string]string{"fluentd": "is awesome"})
474477
_ = f.PostWithTime("tag_name", time.Unix(1634263200, 0),
475-
struct {Welcome string `msg:"welcome"`; cannot string}{"to use", "see me"})
478+
struct {
479+
Welcome string `msg:"welcome"`
480+
cannot string
481+
}{"to use", "see me"})
476482
}()
477483

478484
conn := d.waitForNextDialing(true, false)
@@ -755,16 +761,62 @@ func TestSyncWriteAfterCloseFails(t *testing.T) {
755761
err = f.PostWithTime("tag_name", time.Unix(1482493050, 0), map[string]string{"foo": "buzz"})
756762

757763
// The event submission must fail,
758-
assert.NotEqual(t, err, nil);
764+
assert.NotEqual(t, err, nil)
759765

760766
// and also must keep Fluentd closed.
761-
assert.NotEqual(t, f.closed, false);
767+
assert.NotEqual(t, f.closed, false)
762768
}()
763769

764770
conn := d.waitForNextDialing(true, false)
765771
conn.waitForNextWrite(true, "")
766772
}
767773

774+
func TestPendingChannelThreadSafety(t *testing.T) {
775+
f, err := New(Config{
776+
Async: true,
777+
ForceStopAsyncSend: true,
778+
})
779+
if err != nil {
780+
t.Fatalf("Failed to create logger: %v", err)
781+
}
782+
783+
// Start multiple goroutines posting messages
784+
const numGoroutines = 10
785+
const messagesPerGoroutine = 100
786+
var wg sync.WaitGroup
787+
wg.Add(numGoroutines)
788+
789+
for i := 0; i < numGoroutines; i++ {
790+
go func(id int) {
791+
defer wg.Done()
792+
for j := 0; j < messagesPerGoroutine; j++ {
793+
// Post a message
794+
err := f.Post("tag", map[string]string{
795+
"goroutine": strconv.Itoa(id),
796+
"message": strconv.Itoa(j),
797+
})
798+
799+
// If the logger is closed, we expect an error
800+
if err != nil && !strings.Contains(err.Error(), "already closed") {
801+
t.Errorf("Unexpected error: %v", err)
802+
}
803+
804+
// Add a small delay to increase the chance of race conditions
805+
time.Sleep(time.Millisecond)
806+
}
807+
}(i)
808+
}
809+
810+
// Wait a bit to let some messages be posted
811+
time.Sleep(10 * time.Millisecond)
812+
813+
// Close the logger while goroutines are still posting
814+
f.Close()
815+
816+
// Wait for all goroutines to finish
817+
wg.Wait()
818+
}
819+
768820
func Benchmark_PostWithShortMessage(b *testing.B) {
769821
b.StopTimer()
770822
d := newTestDialer()

0 commit comments

Comments
 (0)