diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 8d8e905..779043d 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -15,19 +15,18 @@ jobs: matrix: os: [ubuntu, macos, windows] golang: ['1.13', '1.16', '1.17'] - # currently, we cannot run non-x86_64 machines on Github Actions cloud env. + # currently, we cannot run non-x86_64 machines on GitHub Actions cloud env. runs-on: ${{ matrix.os }}-latest name: CI golang ${{ matrix.golang }} on ${{ matrix.os }} steps: - - uses: actions/checkout@v2 - - uses: actions/setup-go@v2 + - uses: actions/checkout@v3 + - uses: actions/setup-go@v3 with: go-version: ${{ matrix.golang }} - name: Change GO11MODULES run: go env -w GO111MODULE=auto - name: Install requirements run: | - go get github.com/bmizerany/assert go get github.com/philhofer/fwd go get github.com/tinylib/msgp - name: Test diff --git a/README.md b/README.md index d0b6b23..f76c8b2 100644 --- a/README.md +++ b/README.md @@ -8,15 +8,15 @@ fluent-logger-golang ## How to install -``` -go get github.com/fluent/fluent-logger-golang/fluent +```bash +go get github.com/fluent/fluent-logger-golang/fluent@latest ``` ## Usage Install the package with `go get` and use `import` to include it in your project. -``` +```go import "github.com/fluent/fluent-logger-golang/fluent" ``` @@ -26,27 +26,32 @@ import "github.com/fluent/fluent-logger-golang/fluent" package main import ( - "github.com/fluent/fluent-logger-golang/fluent" - "fmt" - //"time" + "fmt" + "time" + + "github.com/fluent/fluent-logger-golang/fluent" ) func main() { - logger, err := fluent.New(fluent.Config{}) - if err != nil { - fmt.Println(err) - } - defer logger.Close() - tag := "myapp.access" - var data = map[string]string{ - "foo": "bar", - "hoge": "hoge", - } - error := logger.Post(tag, data) - // error := logger.PostWithTime(tag, time.Now(), data) - if error != nil { - panic(error) - } + logger, err := fluent.New(fluent.Config{}) + if err != nil { + fmt.Println(err) + } + defer logger.Close() + tag := "myapp.access" + data := map[string]string{ + "foo": "bar", + "hoge": "hoge", + } + err = logger.Post(tag, data) + if err != nil { + panic(err) + } + + err = logger.PostWithTime(tag, time.Now(), data) + if err != nil { + panic(err) + } } ``` @@ -181,7 +186,7 @@ were involved. Starting v1.8.0, the logger no longer accepts `Fluent.Post()` after `Fluent.Close()`, and instead returns a "Logger already closed" error. ## Tests -``` +```bash go test ``` diff --git a/_examples/main.go b/_examples/main.go index cb3e2e2..77d8d90 100644 --- a/_examples/main.go +++ b/_examples/main.go @@ -5,7 +5,7 @@ import ( "log" "time" - "../fluent" + "github.com/fluent/fluent-logger-golang/fluent" ) func main() { @@ -15,9 +15,10 @@ func main() { } defer logger.Close() tag := "myapp.access" - var data = map[string]string{ + data := map[string]string{ "foo": "bar", - "hoge": "hoge"} + "hoge": "hoge", + } for i := 0; i < 100; i++ { e := logger.Post(tag, data) if e != nil { diff --git a/fluent/fluent.go b/fluent/fluent.go index 7216991..b00c3c6 100644 --- a/fluent/fluent.go +++ b/fluent/fluent.go @@ -1,8 +1,11 @@ package fluent import ( + "bytes" "context" "crypto/tls" + "encoding/base64" + "encoding/binary" "encoding/json" "errors" "fmt" @@ -15,10 +18,6 @@ import ( "sync" "time" - "bytes" - "encoding/base64" - "encoding/binary" - "github.com/tinylib/msgp/msgp" ) @@ -200,27 +199,26 @@ func newWithDialer(config Config, d dialer) (f *Fluent, err error) { // // Examples: // -// // send map[string] -// mapStringData := map[string]string{ -// "foo": "bar", -// } -// f.Post("tag_name", mapStringData) -// -// // send message with specified time -// mapStringData := map[string]string{ -// "foo": "bar", -// } -// tm := time.Now() -// f.PostWithTime("tag_name", tm, mapStringData) +// // send map[string] +// mapStringData := map[string]string{ +// "foo": "bar", +// } +// f.Post("tag_name", mapStringData) // -// // send struct -// structData := struct { -// Name string `msg:"name"` -// } { -// "john smith", -// } -// f.Post("tag_name", structData) +// // send message with specified time +// mapStringData := map[string]string{ +// "foo": "bar", +// } +// tm := time.Now() +// f.PostWithTime("tag_name", tm, mapStringData) // +// // send struct +// structData := struct { +// Name string `msg:"name"` +// } { +// "john smith", +// } +// f.Post("tag_name", structData) func (f *Fluent) Post(tag string, message interface{}) error { timeNow := time.Now() return f.PostWithTime(tag, timeNow, message) @@ -317,7 +315,7 @@ func (chunk *MessageChunk) MarshalJSON() ([]byte, error) { if err != nil { return nil, err } - return []byte(fmt.Sprintf("[\"%s\",%d,%s,%s]", chunk.message.Tag, + return []byte(fmt.Sprintf(`["%s",%d,%s,%s]`, chunk.message.Tag, chunk.message.Time, data, option)), err } diff --git a/fluent/fluent_test.go b/fluent/fluent_test.go index 00a6086..f7cde0d 100644 --- a/fluent/fluent_test.go +++ b/fluent/fluent_test.go @@ -13,7 +13,6 @@ import ( "testing" "time" - "github.com/bmizerany/assert" "github.com/tinylib/msgp/msgp" ) @@ -45,18 +44,18 @@ func newTestDialer() *testDialer { // For instance, to test an async logger that have to dial 4 times before succeeding, // the test should look like this: // -// d := newTestDialer() // Create a new stubbed dialer -// cfg := Config{ -// Async: true, -// // ... -// } -// f := newWithDialer(cfg, d) // Create a fluent logger using the stubbed dialer -// f.EncodeAndPostData("tag_name", time.Unix(1482493046, 0), map[string]string{"foo": "bar"}) +// d := newTestDialer() // Create a new stubbed dialer +// cfg := Config{ +// Async: true, +// // ... +// } +// f := newWithDialer(cfg, d) // Create a fluent logger using the stubbed dialer +// f.EncodeAndPostData("tag_name", time.Unix(1482493046, 0), map[string]string{"foo": "bar"}) // -// d.waitForNextDialing(false, false) // 1st dialing attempt fails -// d.waitForNextDialing(false, false) // 2nd attempt fails too -// d.waitForNextDialing(false, false) // 3rd attempt fails too -// d.waitForNextDialing(true, false) // Finally the 4th attempt succeeds +// d.waitForNextDialing(false, false) // 1st dialing attempt fails +// d.waitForNextDialing(false, false) // 2nd attempt fails too +// d.waitForNextDialing(false, false) // 3rd attempt fails too +// d.waitForNextDialing(true, false) // Finally the 4th attempt succeeds // // Note that in the above example, the logger operates in async mode. As such, // a call to Post, PostWithTime or EncodeAndPostData is needed *before* calling @@ -67,20 +66,20 @@ func newTestDialer() *testDialer { // case, you have to put the calls to newWithDialer() and to EncodeAndPostData() // into their own goroutine. An example: // -// d := newTestDialer() // Create a new stubbed dialer -// cfg := Config{ -// Async: false, -// // ... -// } -// go func() { -// f := newWithDialer(cfg, d) // Create a fluent logger using the stubbed dialer -// f.Close() -// }() +// d := newTestDialer() // Create a new stubbed dialer +// cfg := Config{ +// Async: false, +// // ... +// } +// go func() { +// f := newWithDialer(cfg, d) // Create a fluent logger using the stubbed dialer +// f.Close() +// }() // -// d.waitForNextDialing(false, false) // 1st dialing attempt fails -// d.waitForNextDialing(false, false) // 2nd attempt fails too -// d.waitForNextDialing(false, false) // 3rd attempt fails too -// d.waitForNextDialing(true, false) // Finally the 4th attempt succeeds +// d.waitForNextDialing(false, false) // 1st dialing attempt fails +// d.waitForNextDialing(false, false) // 2nd attempt fails too +// d.waitForNextDialing(false, false) // 3rd attempt fails too +// d.waitForNextDialing(true, false) // Finally the 4th attempt succeeds // // Moreover, waitForNextDialing() returns a *Conn which extends net.Conn to provide testing // facilities. For instance, you can call waitForNextWrite() on these connections, to @@ -91,24 +90,24 @@ func newTestDialer() *testDialer { // // Here's a full example: // -// d := newTestDialer() -// cfg := Config{Async: true} +// d := newTestDialer() +// cfg := Config{Async: true} // -// f := newWithDialer(cfg, d) -// f.EncodeAndPostData("tag_name", time.Unix(1482493046, 0), map[string]string{"foo": "bar"}) +// f := newWithDialer(cfg, d) +// f.EncodeAndPostData("tag_name", time.Unix(1482493046, 0), map[string]string{"foo": "bar"}) // -// conn := d.waitForNextDialing(true, false) // Accept the dialing -// conn.waitForNextWrite(false, "") // Discard the 1st attempt to write the message +// conn := d.waitForNextDialing(true, false) // Accept the dialing +// conn.waitForNextWrite(false, "") // Discard the 1st attempt to write the message // -// conn := d.waitForNextDialing(true, false) -// assertReceived(t, // t is *testing.T -// conn.waitForNextWrite(true, ""), -// "[\"tag_name\",1482493046,{\"foo\":\"bar\"},{}]") +// conn := d.waitForNextDialing(true, false) +// assertReceived(t, // t is *testing.T +// conn.waitForNextWrite(true, ""), +// `["tag_name",1482493046,{"foo":"bar"},{}]`) // -// f.EncodeAndPostData("something_else", time.Unix(1482493050, 0), map[string]string{"bar": "baz"}) -// assertReceived(t, // t is *testing.T -// conn.waitForNextWrite(true, ""), -// "[\"something_else\",1482493050,{\"bar\":\"baz\"},{}]") +// f.EncodeAndPostData("something_else", time.Unix(1482493050, 0), map[string]string{"bar": "baz"}) +// assertReceived(t, // t is *testing.T +// conn.waitForNextWrite(true, ""), +// `["something_else",1482493050,{"bar":"baz"},{}]`) // // In this example, the 1st connection dialing succeeds but the 1st attempt to write the // message is discarded. As the logger discards the connection whenever a message @@ -118,7 +117,7 @@ func newTestDialer() *testDialer { // using assertReceived() to make sure the logger encodes the messages properly. // // Again, the example above is using async mode thus, calls to f and conn are running in -// the same goroutine. However in sync mode, all calls to f.EncodeAndPostData() as well +// the same goroutine. However, in sync mode, all calls to f.EncodeAndPostData() as well // as the logger initialization shall be placed in a separate goroutine or the code // allowing the dialing and writing attempts (eg. waitForNextDialing() & waitForNextWrite()) // would never be reached. @@ -162,13 +161,21 @@ func (d *testDialer) waitForNextDialing(accept bool, delayReads bool) *Conn { return conn } +// asserEqual asserts that actual and expected are equivalent, and otherwise +// marks the test as failed (t.Error). It uses reflect.DeepEqual internally. +func assertEqual(t *testing.T, actual, expected interface{}) { + t.Helper() + if !reflect.DeepEqual(actual, expected) { + t.Errorf("got: '%+v', expected: '%+v'", actual, expected) + } +} + // assertReceived is used below by test cases to assert the content written to a *Conn // matches an expected string. This is generally used in conjunction with // Conn.waitForNextWrite(). func assertReceived(t *testing.T, rcv []byte, expected string) { - if string(rcv) != expected { - t.Fatalf("got %s, expect %s", string(rcv), expected) - } + t.Helper() + assertEqual(t, string(rcv), expected) } // Conn extends net.Conn to add channels used to synchronise across goroutines, eg. @@ -272,13 +279,13 @@ func (c *Conn) Close() error { func Test_New_itShouldUseDefaultConfigValuesIfNoOtherProvided(t *testing.T) { f, _ := New(Config{}) - assert.Equal(t, f.Config.FluentPort, defaultPort) - assert.Equal(t, f.Config.FluentHost, defaultHost) - assert.Equal(t, f.Config.Timeout, defaultTimeout) - assert.Equal(t, f.Config.WriteTimeout, defaultWriteTimeout) - assert.Equal(t, f.Config.BufferLimit, defaultBufferLimit) - assert.Equal(t, f.Config.FluentNetwork, defaultNetwork) - assert.Equal(t, f.Config.FluentSocketPath, defaultSocketPath) + assertEqual(t, f.Config.FluentPort, defaultPort) + assertEqual(t, f.Config.FluentHost, defaultHost) + assertEqual(t, f.Config.Timeout, defaultTimeout) + assertEqual(t, f.Config.WriteTimeout, defaultWriteTimeout) + assertEqual(t, f.Config.BufferLimit, defaultBufferLimit) + assertEqual(t, f.Config.FluentNetwork, defaultNetwork) + assertEqual(t, f.Config.FluentSocketPath, defaultSocketPath) } func Test_New_itShouldUseUnixDomainSocketIfUnixSocketSpecified(t *testing.T) { @@ -296,20 +303,22 @@ func Test_New_itShouldUseUnixDomainSocketIfUnixSocketSpecified(t *testing.T) { f, err := New(Config{ FluentNetwork: network, - FluentSocketPath: socketFile}) + FluentSocketPath: socketFile, + }) if err != nil { t.Error(err) return } defer f.Close() - assert.Equal(t, f.Config.FluentNetwork, network) - assert.Equal(t, f.Config.FluentSocketPath, socketFile) + assertEqual(t, f.Config.FluentNetwork, network) + assertEqual(t, f.Config.FluentSocketPath, socketFile) socketFile = "/tmp/fluent-logger-golang-xxx.sock" network = "unixxxx" fUnknown, err := New(Config{ FluentNetwork: network, - FluentSocketPath: socketFile}) + FluentSocketPath: socketFile, + }) if _, ok := err.(*ErrUnknownNetwork); !ok { t.Errorf("err type: %T", err) } @@ -322,25 +331,25 @@ func Test_New_itShouldUseUnixDomainSocketIfUnixSocketSpecified(t *testing.T) { func Test_New_itShouldUseConfigValuesFromArguments(t *testing.T) { f, _ := New(Config{FluentPort: 6666, FluentHost: "foobarhost"}) - assert.Equal(t, f.Config.FluentPort, 6666) - assert.Equal(t, f.Config.FluentHost, "foobarhost") + assertEqual(t, f.Config.FluentPort, 6666) + assertEqual(t, f.Config.FluentHost, "foobarhost") } func Test_New_itShouldUseConfigValuesFromMashalAsJSONArgument(t *testing.T) { f, _ := New(Config{MarshalAsJSON: true}) - assert.Equal(t, f.Config.MarshalAsJSON, true) + assertEqual(t, f.Config.MarshalAsJSON, true) } func Test_MarshalAsMsgpack(t *testing.T) { f := &Fluent{Config: Config{}} tag := "tag" - var data = map[string]string{ + data := map[string]string{ "foo": "bar", - "hoge": "hoge"} + "hoge": "hoge", + } tm := time.Unix(1267867237, 0) result, err := f.EncodeData(tag, tm, data) - if err != nil { t.Error(err) } @@ -368,7 +377,6 @@ func Test_SubSecondPrecision(t *testing.T) { encodedData, err := fluent.EncodeData("tag", timestamp, map[string]string{ "foo": "bar", }) - // Assert no encoding errors and that the timestamp has been encoded into // the message as expected. if err != nil { @@ -387,12 +395,12 @@ func Test_SubSecondPrecision(t *testing.T) { func Test_MarshalAsJSON(t *testing.T) { f := &Fluent{Config: Config{MarshalAsJSON: true}} - var data = map[string]string{ + data := map[string]string{ "foo": "bar", - "hoge": "hoge"} + "hoge": "hoge", + } tm := time.Unix(1267867237, 0) result, err := f.EncodeData("tag", tm, data) - if err != nil { t.Error(err) } @@ -431,9 +439,7 @@ func TestJsonConfig(t *testing.T) { t.Error(err) } - if !reflect.DeepEqual(expect, got) { - t.Errorf("got %v, except %v", got, expect) - } + assertEqual(t, got, expect) } func TestPostWithTime(t *testing.T) { @@ -450,9 +456,9 @@ func TestPostWithTime(t *testing.T) { }, } - for tcname := range testcases { + for tcname, tc := range testcases { + tc := tc t.Run(tcname, func(t *testing.T) { - tc := testcases[tcname] t.Parallel() d := newTestDialer() @@ -472,20 +478,23 @@ func TestPostWithTime(t *testing.T) { _ = f.PostWithTime("tag_name", time.Unix(1482493046, 0), map[string]string{"foo": "bar"}) _ = f.PostWithTime("tag_name", time.Unix(1482493050, 0), map[string]string{"fluentd": "is awesome"}) _ = f.PostWithTime("tag_name", time.Unix(1634263200, 0), - struct {Welcome string `msg:"welcome"`; cannot string}{"to use", "see me"}) + struct { + Welcome string `msg:"welcome"` + cannot string + }{"to use", "see me"}) }() conn := d.waitForNextDialing(true, false) assertReceived(t, conn.waitForNextWrite(true, ""), - "[\"acme.tag_name\",1482493046,{\"foo\":\"bar\"},{}]") + `["acme.tag_name",1482493046,{"foo":"bar"},{}]`) assertReceived(t, conn.waitForNextWrite(true, ""), - "[\"acme.tag_name\",1482493050,{\"fluentd\":\"is awesome\"},{}]") + `["acme.tag_name",1482493050,{"fluentd":"is awesome"},{}]`) assertReceived(t, conn.waitForNextWrite(true, ""), - "[\"acme.tag_name\",1634263200,{\"welcome\":\"to use\"},{}]") + `["acme.tag_name",1634263200,{"welcome":"to use"},{}]`) }) } } @@ -502,9 +511,9 @@ func TestReconnectAndResendAfterTransientFailure(t *testing.T) { }, } - for tcname := range testcases { + for tcname, tc := range testcases { + tc := tc t.Run(tcname, func(t *testing.T) { - tc := testcases[tcname] t.Parallel() d := newTestDialer() @@ -529,7 +538,7 @@ func TestReconnectAndResendAfterTransientFailure(t *testing.T) { conn := d.waitForNextDialing(true, false) assertReceived(t, conn.waitForNextWrite(true, ""), - "[\"tag_name\",1482493046,{\"foo\":\"bar\"},{}]") + `["tag_name",1482493046,{"foo":"bar"},{}]`) // The next write will fail and the next connection dialing will be dropped // to test if the logger is reconnecting as expected. @@ -540,7 +549,7 @@ func TestReconnectAndResendAfterTransientFailure(t *testing.T) { conn = d.waitForNextDialing(true, false) assertReceived(t, conn.waitForNextWrite(true, ""), - "[\"tag_name\",1482493050,{\"fluentd\":\"is awesome\"},{}]") + `["tag_name",1482493050,{"fluentd":"is awesome"},{}]`) }) } } @@ -584,9 +593,9 @@ func TestCloseOnFailingAsyncConnect(t *testing.T) { }, } - for tcname := range testcases { + for tcname, tc := range testcases { + tc := tc t.Run(tcname, func(t *testing.T) { - tc := testcases[tcname] t.Parallel() d := newTestDialer() @@ -632,22 +641,23 @@ func TestNoPanicOnAsyncClose(t *testing.T) { shouldError: false, }, } - for _, testcase := range testcases { - t.Run(testcase.name, func(t *testing.T) { + for _, tc := range testcases { + tc := tc + t.Run(tc.name, func(t *testing.T) { t.Parallel() d := newTestDialer() - f, err := newWithDialer(testcase.config, d) + f, err := newWithDialer(tc.config, d) if err != nil { t.Errorf("Unexpected error: %v", err) } - if testcase.shouldError { + if tc.shouldError { f.Close() } - e := f.EncodeAndPostData("tag_name", time.Unix(1482493046, 0), map[string]string{"foo": "bar"}) - if testcase.shouldError { - assert.Equal(t, fmt.Errorf("fluent#appendBuffer: Logger already closed"), e) + err = f.EncodeAndPostData("tag_name", time.Unix(1482493046, 0), map[string]string{"foo": "bar"}) + if tc.shouldError { + assertEqual(t, err, fmt.Errorf("fluent#appendBuffer: Logger already closed")) } else { - assert.Equal(t, nil, e) + assertEqual(t, err, nil) } }) } @@ -680,9 +690,9 @@ func TestCloseOnFailingAsyncReconnect(t *testing.T) { }, } - for tcname := range testcases { + for tcname, tc := range testcases { + tc := tc t.Run(tcname, func(t *testing.T) { - tc := testcases[tcname] t.Parallel() d := newTestDialer() @@ -755,10 +765,14 @@ func TestSyncWriteAfterCloseFails(t *testing.T) { err = f.PostWithTime("tag_name", time.Unix(1482493050, 0), map[string]string{"foo": "buzz"}) // The event submission must fail, - assert.NotEqual(t, err, nil); + if err == nil { + t.Error("expected an error") + } // and also must keep Fluentd closed. - assert.NotEqual(t, f.closed, false); + if f.closed != true { + t.Error("expected Fluentd to be kept closed") + } }() conn := d.waitForNextDialing(true, false)