Skip to content

Commit 4cfcb5f

Browse files
committed
429 respin
1 parent a8e5eab commit 4cfcb5f

File tree

2 files changed

+54
-6
lines changed

2 files changed

+54
-6
lines changed

reader.go

Lines changed: 29 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1487,25 +1487,48 @@ func (r *reader) initialize(ctx context.Context, offset int64) (conn *Conn, star
14871487
return
14881488
}
14891489

1490-
func (r *reader) read(ctx context.Context, offset int64, conn *Conn) (int64, error) {
1491-
r.stats.fetches.observe(1)
1492-
r.stats.offset.observe(offset)
1490+
// readBatch wraps the call to conn.ReadBatchWith to make it interruptible.
1491+
// Conn methods are written in a non-interruptible style, so the only way to
1492+
// interrupt them is to close the connection in another goroutine.
1493+
func (r *reader) readBatch(ctx context.Context, conn *Conn) (*Batch, error) {
1494+
done := make(chan struct{})
1495+
defer close(done)
14931496

1494-
t0 := time.Now()
1495-
conn.SetReadDeadline(t0.Add(r.maxWait))
1497+
go func() {
1498+
select {
1499+
case <-ctx.Done():
1500+
conn.Close()
1501+
case <-done:
1502+
return
1503+
}
1504+
}()
14961505

14971506
batch := conn.ReadBatchWith(ReadBatchConfig{
14981507
MinBytes: r.minBytes,
14991508
MaxBytes: r.maxBytes,
15001509
IsolationLevel: r.isolationLevel,
15011510
})
1511+
return batch, ctx.Err()
1512+
}
1513+
1514+
func (r *reader) read(ctx context.Context, offset int64, conn *Conn) (int64, error) {
1515+
r.stats.fetches.observe(1)
1516+
r.stats.offset.observe(offset)
1517+
1518+
t0 := time.Now()
1519+
conn.SetReadDeadline(t0.Add(r.maxWait))
1520+
1521+
batch, err := r.readBatch(ctx, conn)
1522+
if err != nil {
1523+
return offset, err
1524+
}
1525+
15021526
highWaterMark := batch.HighWaterMark()
15031527

15041528
t1 := time.Now()
15051529
r.stats.waitTime.observeDuration(t1.Sub(t0))
15061530

15071531
var msg Message
1508-
var err error
15091532
var size int64
15101533
var bytes int64
15111534

reader_test.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1846,6 +1846,31 @@ func TestReaderReadCompactedMessage(t *testing.T) {
18461846
}
18471847
}
18481848

1849+
func TestReaderClose(t *testing.T) {
1850+
t.Parallel()
1851+
1852+
r := NewReader(ReaderConfig{
1853+
Brokers: []string{"localhost:9092"},
1854+
Topic: makeTopic(),
1855+
MaxWait: 2 * time.Second,
1856+
})
1857+
defer r.Close()
1858+
1859+
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
1860+
defer cancel()
1861+
1862+
_, err := r.FetchMessage(ctx)
1863+
if err != context.DeadlineExceeded {
1864+
t.Errorf("bad err: %v", err)
1865+
}
1866+
1867+
t0 := time.Now()
1868+
r.Close()
1869+
if time.Since(t0) > 100*time.Millisecond {
1870+
t.Errorf("r.Close took too long")
1871+
}
1872+
}
1873+
18491874
// writeMessagesForCompactionCheck writes messages with specific writer configuration.
18501875
func writeMessagesForCompactionCheck(t *testing.T, topic string, msgs []Message) {
18511876
t.Helper()

0 commit comments

Comments
 (0)