diff --git a/writer.go b/writer.go index 3817bf538..3d6d95571 100644 --- a/writer.go +++ b/writer.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "errors" + "fmt" "io" "net" "sync" @@ -576,6 +577,10 @@ func (w *Writer) Close() error { w.transport.CloseIdleConnections() } + errorsCount := w.stats().errors.snapshot() + if errorsCount > 0 { + return fmt.Errorf("failed to close gracefully, %d errors occurred", errorsCount) + } return nil } diff --git a/writer_test.go b/writer_test.go index 6f894ecd3..d97117727 100644 --- a/writer_test.go +++ b/writer_test.go @@ -106,6 +106,11 @@ func TestWriter(t *testing.T) { function: testWriterClose, }, + { + scenario: "closing a writer with errors should return error", + function: testWriterCloseWithErrors, + }, + { scenario: "writing 1 message through a writer using round-robin balancing produces 1 message to the first partition", function: testWriterRoundRobin1, @@ -223,6 +228,15 @@ func testWriterClose(t *testing.T) { } } +func testWriterCloseWithErrors(t *testing.T) { + w := newTestWriter(WriterConfig{}) + w.stats().errors.observe(1) + + if err := w.Close(); err == nil { + t.Error("expected error, but got nil") + } +} + func testWriterRequiredAcksNone(t *testing.T) { topic := makeTopic() createTopic(t, topic, 1)