Skip to content

Commit 11bef51

Browse files
committed
fluent: check if context is cancelled before performing I/O on the connection
There are a number of places in the code where we do not check if the context is cancelled before reading from or writing to the connection. This commit adds those checks. Signed-off-by: Anirudh Aithal <aithal@amazon.com> cr: https://code.amazon.com/reviews/CR-194284782 Signed-off-by: Anirudh Aithal <aithal@amazon.com>
1 parent f81b4e6 commit 11bef51

File tree

1 file changed

+32
-17
lines changed

1 file changed

+32
-17
lines changed

fluent/fluent.go

Lines changed: 32 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ type Config struct {
8181
RequestAck bool `json:"request_ack"`
8282

8383
// Flag to skip verifying insecure certs on TLS connections
84-
TlsInsecureSkipVerify bool `json: "tls_insecure_skip_verify"`
84+
TlsInsecureSkipVerify bool `json:"tls_insecure_skip_verify"`
8585
}
8686

8787
type ErrUnknownNetwork struct {
@@ -280,7 +280,7 @@ func (f *Fluent) EncodeAndPostData(tag string, tm time.Time, message interface{}
280280
var msg *msgToSend
281281
var err error
282282
if msg, err = f.EncodeData(tag, tm, message); err != nil {
283-
return fmt.Errorf("fluent#EncodeAndPostData: can't convert '%#v' to msgpack:%v", message, err)
283+
return fmt.Errorf("fluent#EncodeAndPostData: can't convert '%#v' to msgpack:%w", message, err)
284284
}
285285
return f.postRawData(msg)
286286
}
@@ -594,31 +594,46 @@ func (f *Fluent) writeWithRetry(ctx context.Context, msg *msgToSend) error {
594594
return fmt.Errorf("fluent#write: failed to write after %d attempts", f.Config.MaxRetry)
595595
}
596596

597-
func (f *Fluent) syncWriteMessage(msg *msgToSend) error {
598-
f.muconn.RLock()
599-
defer f.muconn.RUnlock()
597+
func (f *Fluent) syncWriteMessage(ctx context.Context, msg *msgToSend) error {
598+
f.muconn.Lock()
599+
defer f.muconn.Unlock()
600+
601+
// Check if context is cancelled. If it is, we can return early here.
602+
if err := ctx.Err(); err != nil {
603+
return errIsClosing
604+
}
600605

601606
if f.conn == nil {
602-
return fmt.Errorf("connection has been closed before writing to it")
607+
return fmt.Errorf("fluent#write: connection has been closed before writing to it")
603608
}
604609

605610
t := f.Config.WriteTimeout
611+
var err error
606612
if time.Duration(0) < t {
607-
f.conn.SetWriteDeadline(time.Now().Add(t))
613+
err = f.conn.SetWriteDeadline(time.Now().Add(t))
608614
} else {
609-
f.conn.SetWriteDeadline(time.Time{})
615+
err = f.conn.SetWriteDeadline(time.Time{})
610616
}
611617

612-
_, err := f.conn.Write(msg.data)
618+
if err != nil {
619+
return fmt.Errorf("fluent#write: failed to set write deadline: %w", err)
620+
}
621+
_, err = f.conn.Write(msg.data)
613622
return err
614623
}
615624

616-
func (f *Fluent) syncReadAck() (*AckResp, error) {
617-
f.muconn.RLock()
618-
defer f.muconn.RUnlock()
625+
func (f *Fluent) syncReadAck(ctx context.Context) (*AckResp, error) {
626+
f.muconn.Lock()
627+
defer f.muconn.Unlock()
619628

620629
resp := &AckResp{}
621630
var err error
631+
632+
// Check if context is cancelled. If it is, we can return early here.
633+
if err := ctx.Err(); err != nil {
634+
return resp, errIsClosing
635+
}
636+
622637
if f.Config.MarshalAsJSON {
623638
dec := json.NewDecoder(f.conn)
624639
err = dec.Decode(resp)
@@ -639,19 +654,19 @@ func (f *Fluent) write(ctx context.Context, msg *msgToSend) (bool, error) {
639654
if err := f.syncConnectWithRetry(ctx); err != nil {
640655
// Here, we don't want to retry the write since connectWithRetry already
641656
// retries Config.MaxRetry times to connect.
642-
return false, fmt.Errorf("fluent#write: %v", err)
657+
return false, fmt.Errorf("fluent#write: %w", err)
643658
}
644659

645-
if err := f.syncWriteMessage(msg); err != nil {
660+
if err := f.syncWriteMessage(ctx, msg); err != nil {
646661
f.syncClose(false)
647-
return true, fmt.Errorf("fluent#write: %v", err)
662+
return true, fmt.Errorf("fluent#write: %w", err)
648663
}
649664

650665
// Acknowledgment check
651666
if msg.ack != "" {
652-
resp, err := f.syncReadAck()
667+
resp, err := f.syncReadAck(ctx)
653668
if err != nil {
654-
fmt.Fprintf(os.Stderr, "fluent#write: error reading message response ack %v", err)
669+
fmt.Fprintf(os.Stderr, "fluent#write: error reading message response ack %v. Closing connection...", err)
655670
f.syncClose(false)
656671
return true, err
657672
}

0 commit comments

Comments
 (0)