Skip to content

Commit d0912d4

Browse files
committed
quic: add pipe type
Streams (including CRYPTO streams) are an ordered byte sequence. Both outgoing and incoming streams require random access to a portion of that sequence. Outbound packets may be lost, requiring us to resend the data in the lost packet. Inbound packets may arrive out of order. Add a "pipe" type as a building block for both inbound and outbound streams. A pipe is a window into a portion of a stream, permitting random read and write access within that window (unlike bufio.Reader or bufio.Writer). Pipes are implemented as a linked list of blocks. Block sizes are uniform and allocations are pooled, avoiding non-pool allocations in the steady state. Pipe memory consumption is proportional to the current window, and goes to zero when the window has been fully consumed (unlike bytes.Buffer). For golang/go#58547 Change-Id: I0c16707552c9c46f31055daea2396590a924fc60 Reviewed-on: https://go-review.googlesource.com/c/net/+/510615 Run-TryBot: Damien Neil <dneil@google.com> Reviewed-by: Jonathan Amsterdam <jba@google.com> TryBot-Result: Gopher Robot <gobot@golang.org>
1 parent 8db2ead commit d0912d4

File tree

2 files changed

+244
-0
lines changed

2 files changed

+244
-0
lines changed

internal/quic/pipe.go

Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
// Copyright 2023 The Go Authors. All rights reserved.
2+
// Use of this source code is governed by a BSD-style
3+
// license that can be found in the LICENSE file.
4+
5+
//go:build go1.21
6+
7+
package quic
8+
9+
import (
10+
"sync"
11+
)
12+
13+
// A pipe is a byte buffer used in implementing streams.
14+
//
15+
// A pipe contains a window of stream data.
16+
// Random access reads and writes are supported within the window.
17+
// Writing past the end of the window extends it.
18+
// Data may be discarded from the start of the pipe, advancing the window.
19+
type pipe struct {
20+
start int64
21+
end int64
22+
head *pipebuf
23+
tail *pipebuf
24+
}
25+
26+
type pipebuf struct {
27+
off int64
28+
b []byte
29+
next *pipebuf
30+
}
31+
32+
func (pb *pipebuf) end() int64 {
33+
return pb.off + int64(len(pb.b))
34+
}
35+
36+
var pipebufPool = sync.Pool{
37+
New: func() any {
38+
return &pipebuf{
39+
b: make([]byte, 4096),
40+
}
41+
},
42+
}
43+
44+
func newPipebuf() *pipebuf {
45+
return pipebufPool.Get().(*pipebuf)
46+
}
47+
48+
func (b *pipebuf) recycle() {
49+
b.off = 0
50+
b.next = nil
51+
pipebufPool.Put(b)
52+
}
53+
54+
// writeAt writes len(b) bytes to the pipe at offset off.
55+
//
56+
// Writes to offsets before p.start are discarded.
57+
// Writes to offsets after p.end extend the pipe window.
58+
func (p *pipe) writeAt(b []byte, off int64) {
59+
end := off + int64(len(b))
60+
if end > p.end {
61+
p.end = end
62+
} else if end <= p.start {
63+
return
64+
}
65+
66+
if off < p.start {
67+
// Discard the portion of b which falls before p.start.
68+
trim := p.start - off
69+
b = b[trim:]
70+
off = p.start
71+
}
72+
73+
if p.head == nil {
74+
p.head = newPipebuf()
75+
p.head.off = p.start
76+
p.tail = p.head
77+
}
78+
pb := p.head
79+
if off >= p.tail.off {
80+
// Common case: Writing past the end of the pipe.
81+
pb = p.tail
82+
}
83+
for {
84+
pboff := off - pb.off
85+
if pboff < int64(len(pb.b)) {
86+
n := copy(pb.b[pboff:], b)
87+
if n == len(b) {
88+
return
89+
}
90+
off += int64(n)
91+
b = b[n:]
92+
}
93+
if pb.next == nil {
94+
pb.next = newPipebuf()
95+
pb.next.off = pb.off + int64(len(pb.b))
96+
p.tail = pb.next
97+
}
98+
pb = pb.next
99+
}
100+
}
101+
102+
// copy copies len(b) bytes into b starting from off.
103+
// The pipe must contain [off, off+len(b)).
104+
func (p *pipe) copy(off int64, b []byte) {
105+
dst := b[:0]
106+
p.read(off, len(b), func(c []byte) error {
107+
dst = append(dst, c...)
108+
return nil
109+
})
110+
}
111+
112+
// read calls f with the data in [off, off+n)
113+
// The data may be provided sequentially across multiple calls to f.
114+
func (p *pipe) read(off int64, n int, f func([]byte) error) error {
115+
if off < p.start {
116+
panic("invalid read range")
117+
}
118+
for pb := p.head; pb != nil && n > 0; pb = pb.next {
119+
if off >= pb.end() {
120+
continue
121+
}
122+
b := pb.b[off-pb.off:]
123+
if len(b) > n {
124+
b = b[:n]
125+
}
126+
off += int64(len(b))
127+
n -= len(b)
128+
if err := f(b); err != nil {
129+
return err
130+
}
131+
}
132+
if n > 0 {
133+
panic("invalid read range")
134+
}
135+
return nil
136+
}
137+
138+
// discardBefore discards all data prior to off.
139+
func (p *pipe) discardBefore(off int64) {
140+
for p.head != nil && p.head.end() < off {
141+
head := p.head
142+
p.head = p.head.next
143+
head.recycle()
144+
}
145+
if p.head == nil {
146+
p.tail = nil
147+
}
148+
p.start = off
149+
}

internal/quic/pipe_test.go

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
// Copyright 2023 The Go Authors. All rights reserved.
2+
// Use of this source code is governed by a BSD-style
3+
// license that can be found in the LICENSE file.
4+
5+
//go:build go1.21
6+
7+
package quic
8+
9+
import (
10+
"bytes"
11+
"math/rand"
12+
"testing"
13+
)
14+
15+
func TestPipeWrites(t *testing.T) {
16+
type writeOp struct {
17+
start, end int64
18+
}
19+
type discardBeforeOp struct {
20+
off int64
21+
}
22+
type op any
23+
src := make([]byte, 65536)
24+
rand.New(rand.NewSource(0)).Read(src)
25+
for _, test := range []struct {
26+
desc string
27+
ops []op
28+
}{{
29+
desc: "sequential writes",
30+
ops: []op{
31+
writeOp{0, 1024},
32+
writeOp{1024, 4096},
33+
writeOp{4096, 65536},
34+
},
35+
}, {
36+
desc: "disordered overlapping writes",
37+
ops: []op{
38+
writeOp{2000, 8000},
39+
writeOp{0, 3000},
40+
writeOp{7000, 12000},
41+
},
42+
}, {
43+
desc: "write to discarded region",
44+
ops: []op{
45+
writeOp{0, 65536},
46+
discardBeforeOp{32768},
47+
writeOp{0, 1000},
48+
writeOp{3000, 5000},
49+
writeOp{0, 32768},
50+
},
51+
}, {
52+
desc: "write overlaps discarded region",
53+
ops: []op{
54+
discardBeforeOp{10000},
55+
writeOp{0, 20000},
56+
},
57+
}, {
58+
desc: "discard everything",
59+
ops: []op{
60+
writeOp{0, 10000},
61+
discardBeforeOp{10000},
62+
writeOp{10000, 20000},
63+
},
64+
}} {
65+
var p pipe
66+
var wantset rangeset[int64]
67+
var wantStart, wantEnd int64
68+
for i, o := range test.ops {
69+
switch o := o.(type) {
70+
case writeOp:
71+
p.writeAt(src[o.start:o.end], o.start)
72+
wantset.add(o.start, o.end)
73+
wantset.sub(0, wantStart)
74+
if o.end > wantEnd {
75+
wantEnd = o.end
76+
}
77+
case discardBeforeOp:
78+
p.discardBefore(o.off)
79+
wantset.sub(0, o.off)
80+
wantStart = o.off
81+
}
82+
if p.start != wantStart || p.end != wantEnd {
83+
t.Errorf("%v: after %#v p contains [%v,%v), want [%v,%v)", test.desc, test.ops[:i+1], p.start, p.end, wantStart, wantEnd)
84+
}
85+
for _, r := range wantset {
86+
want := src[r.start:][:r.size()]
87+
got := make([]byte, r.size())
88+
p.copy(r.start, got)
89+
if !bytes.Equal(got, want) {
90+
t.Errorf("%v after %#v, mismatch in data in %v", test.desc, test.ops[:i+1], r)
91+
}
92+
}
93+
}
94+
}
95+
}

0 commit comments

Comments
 (0)