Skip to content

Commit 1f759a4

Browse files
committed
fluent: add read timeout config
This can be used by clients to exit early when the connection is unresponsive while reading acks from the server. Signed-off-by: Anirudh Aithal <aithal@amazon.com>
1 parent 11bef51 commit 1f759a4

File tree

2 files changed

+27
-3
lines changed

2 files changed

+27
-3
lines changed

fluent/fluent.go

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,10 @@ const (
3535
defaultMaxRetryWait = 60000
3636
defaultMaxRetry = 13
3737
defaultReconnectWaitIncreRate = 1.5
38-
// Default sub-second precision value to false since it is only compatible
39-
// with fluentd versions v0.14 and above.
40-
defaultSubSecondPrecision = false
4138

4239
// Default value whether to skip checking insecure certs on TLS connections.
4340
defaultTlsInsecureSkipVerify = false
41+
defaultReadTimeout = time.Duration(0) // Read() will not time out
4442
)
4543

4644
// randomGenerator is used by getUniqueId to generate ack hashes. Its value is replaced
@@ -82,6 +80,9 @@ type Config struct {
8280

8381
// Flag to skip verifying insecure certs on TLS connections
8482
TlsInsecureSkipVerify bool `json:"tls_insecure_skip_verify"`
83+
84+
// ReadTimeout specifies the timeout on reads. Currently only acks are read.
85+
ReadTimeout time.Duration `json:"read_timeout"`
8586
}
8687

8788
type ErrUnknownNetwork struct {
@@ -153,6 +154,9 @@ func newWithDialer(config Config, d dialer) (f *Fluent, err error) {
153154
if config.WriteTimeout == 0 {
154155
config.WriteTimeout = defaultWriteTimeout
155156
}
157+
if config.ReadTimeout == 0 {
158+
config.ReadTimeout = defaultReadTimeout
159+
}
156160
if config.BufferLimit == 0 {
157161
config.BufferLimit = defaultBufferLimit
158162
}
@@ -629,11 +633,25 @@ func (f *Fluent) syncReadAck(ctx context.Context) (*AckResp, error) {
629633
resp := &AckResp{}
630634
var err error
631635

636+
if f.conn == nil {
637+
return resp, fmt.Errorf("fluent#read: connection has been closed before reading from it")
638+
}
639+
632640
// Check if context is cancelled. If it is, we can return early here.
633641
if err := ctx.Err(); err != nil {
634642
return resp, errIsClosing
635643
}
636644

645+
t := f.Config.ReadTimeout
646+
if time.Duration(0) < t {
647+
err = f.conn.SetReadDeadline(time.Now().Add(t))
648+
} else {
649+
err = f.conn.SetReadDeadline(time.Time{})
650+
}
651+
if err != nil {
652+
return resp, fmt.Errorf("fluent#read: failed to set read deadline: %w", err)
653+
}
654+
637655
if f.Config.MarshalAsJSON {
638656
dec := json.NewDecoder(f.conn)
639657
err = dec.Decode(resp)

fluent/fluent_test.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,11 @@ func (c *Conn) SetWriteDeadline(t time.Time) error {
265265
return nil
266266
}
267267

268+
// SetReadDeadline is a nop for our test dialer.
269+
func (c *Conn) SetReadDeadline(time.Time) error {
270+
return nil
271+
}
272+
268273
func (c *Conn) Close() error {
269274
if c.delayNextReadCh != nil {
270275
close(c.delayNextReadCh)
@@ -679,6 +684,7 @@ func TestCloseOnFailingAsyncReconnect(t *testing.T) {
679684
ForceStopAsyncSend: true,
680685
RequestAck: true,
681686
},
687+
682688
"without RequestAck": {
683689
Async: true,
684690
ForceStopAsyncSend: true,

0 commit comments

Comments
 (0)