Skip to content

Commit 20037b0

Browse files
committed
io.Reader in CreateResponse
1 parent 0a92aa3 commit 20037b0

File tree

12 files changed

+223
-90
lines changed

12 files changed

+223
-90
lines changed

connection.go

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -832,15 +832,11 @@ func (conn *Connection) reader(r io.Reader, c Conn) {
832832
continue
833833
} else if header.Code == PushCode {
834834
if fut = conn.peekFuture(header.RequestId); fut != nil {
835-
resp := &PushResponse{BaseResponse{header: header}}
836-
resp.SetBuf(buf)
837-
fut.AppendPush(resp)
835+
fut.AppendPush(header, &buf)
838836
}
839837
} else {
840838
if fut = conn.fetchFuture(header.RequestId); fut != nil {
841-
resp := fut.req.CreateResponse(header)
842-
resp.SetBuf(buf)
843-
fut.SetResponse(resp)
839+
fut.SetResponse(header, &buf)
844840
conn.markDone(fut)
845841
}
846842
}
@@ -1058,11 +1054,11 @@ func (conn *Connection) putFuture(fut *Future, req Request, streamId uint64) {
10581054

10591055
if req.Async() {
10601056
if fut = conn.fetchFuture(reqid); fut != nil {
1061-
resp := req.CreateResponse(Header{
1057+
header := Header{
10621058
RequestId: reqid,
10631059
Code: OkCode,
1064-
})
1065-
fut.SetResponse(resp)
1060+
}
1061+
fut.SetResponse(header, nil)
10661062
conn.markDone(fut)
10671063
}
10681064
}

crud/common.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ package crud
5555

5656
import (
5757
"context"
58+
"io"
5859

5960
"github.com/tarantool/go-iproto"
6061

@@ -85,8 +86,9 @@ func (req baseRequest) Async() bool {
8586
}
8687

8788
// CreateResponse creates a response for the baseRequest.
88-
func (req baseRequest) CreateResponse(header tarantool.Header) tarantool.Response {
89-
return req.impl.CreateResponse(header)
89+
func (req baseRequest) CreateResponse(header tarantool.Header,
90+
body io.Reader) (tarantool.Response, error) {
91+
return req.impl.CreateResponse(header, body)
9092
}
9193

9294
type spaceRequest struct {

crud/select.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package crud
22

33
import (
44
"context"
5+
"io"
56

67
"github.com/vmihailenco/msgpack/v5"
78

@@ -135,6 +136,7 @@ func (req SelectRequest) Context(ctx context.Context) SelectRequest {
135136
}
136137

137138
// CreateResponse creates a response for the SelectRequest.
138-
func (req SelectRequest) CreateResponse(header tarantool.Header) tarantool.Response {
139-
return req.impl.CreateResponse(header)
139+
func (req SelectRequest) CreateResponse(header tarantool.Header,
140+
body io.Reader) (tarantool.Response, error) {
141+
return req.impl.CreateResponse(header, body)
140142
}

future.go

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package tarantool
22

33
import (
4+
"io"
45
"sync"
56
"time"
67
)
@@ -139,16 +140,21 @@ func NewFuture() (fut *Future) {
139140
//
140141
// Deprecated: the method will be removed in the next major version,
141142
// use Connector.NewWatcher() instead of box.session.push().
142-
func (fut *Future) AppendPush(resp Response) {
143+
func (fut *Future) AppendPush(header Header, body io.Reader) error {
143144
fut.mutex.Lock()
144145
defer fut.mutex.Unlock()
145146

146147
if fut.isDone() {
147-
return
148+
return nil
149+
}
150+
resp, err := createPushResponse(header, body)
151+
if err != nil {
152+
return err
148153
}
149154
fut.pushes = append(fut.pushes, resp)
150155

151156
fut.ready <- struct{}{}
157+
return nil
152158
}
153159

154160
// SetRequest sets a request, for which the future was created.
@@ -157,17 +163,23 @@ func (fut *Future) SetRequest(req Request) {
157163
}
158164

159165
// SetResponse sets a response for the future and finishes the future.
160-
func (fut *Future) SetResponse(resp Response) {
166+
func (fut *Future) SetResponse(header Header, body io.Reader) error {
161167
fut.mutex.Lock()
162168
defer fut.mutex.Unlock()
163169

164170
if fut.isDone() {
165-
return
171+
return nil
172+
}
173+
174+
resp, err := fut.req.CreateResponse(header, body)
175+
if err != nil {
176+
return err
166177
}
167178
fut.resp = resp
168179

169180
close(fut.ready)
170181
close(fut.done)
182+
return nil
171183
}
172184

173185
// SetError sets an error for the future and finishes the future.

future_test.go

Lines changed: 46 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"testing"
77
"time"
88

9+
"github.com/stretchr/testify/assert"
910
. "github.com/tarantool/go-tarantool/v2"
1011
)
1112

@@ -20,9 +21,7 @@ func assertResponseIteratorValue(t testing.TB, it ResponseIterator, resp Respons
2021
t.Errorf("An unexpected nil value")
2122
}
2223

23-
if it.Value() != resp {
24-
t.Errorf("An unexpected response %v, expected %v", it.Value(), resp)
25-
}
24+
assert.Equalf(t, it.Value(), resp, "An unexpected response %v, expected %v", it.Value(), resp)
2625
}
2726

2827
func assertResponseIteratorFinished(t testing.TB, it ResponseIterator) {
@@ -48,9 +47,12 @@ func TestFutureGetIteratorNoItems(t *testing.T) {
4847
}
4948

5049
func TestFutureGetIteratorNoResponse(t *testing.T) {
51-
push := &BaseResponse{}
50+
pushHeader := Header{}
51+
push := &PushResponse{}
5252
fut := NewFuture()
53-
fut.AppendPush(push)
53+
fut.AppendPush(pushHeader, nil)
54+
55+
push.Decode()
5456

5557
if it := fut.GetIterator(); it.Next() {
5658
assertResponseIteratorValue(t, it, push)
@@ -64,9 +66,12 @@ func TestFutureGetIteratorNoResponse(t *testing.T) {
6466
}
6567

6668
func TestFutureGetIteratorNoResponseTimeout(t *testing.T) {
67-
push := &BaseResponse{}
69+
pushHeader := Header{}
70+
push := &PushResponse{}
6871
fut := NewFuture()
69-
fut.AppendPush(push)
72+
fut.AppendPush(pushHeader, nil)
73+
74+
push.Decode()
7075

7176
if it := fut.GetIterator().WithTimeout(1 * time.Nanosecond); it.Next() {
7277
assertResponseIteratorValue(t, it, push)
@@ -80,10 +85,15 @@ func TestFutureGetIteratorNoResponseTimeout(t *testing.T) {
8085
}
8186

8287
func TestFutureGetIteratorResponseOnTimeout(t *testing.T) {
83-
push := &BaseResponse{}
88+
pushHeader := Header{}
89+
respHeader := Header{}
90+
push := &PushResponse{}
8491
resp := &BaseResponse{}
8592
fut := NewFuture()
86-
fut.AppendPush(push)
93+
fut.AppendPush(pushHeader, nil)
94+
95+
push.Decode()
96+
resp.Decode()
8797

8898
var done sync.WaitGroup
8999
var wait sync.WaitGroup
@@ -96,7 +106,8 @@ func TestFutureGetIteratorResponseOnTimeout(t *testing.T) {
96106
var it ResponseIterator
97107
var cnt = 0
98108
for it = fut.GetIterator().WithTimeout(5 * time.Second); it.Next(); {
99-
r := push
109+
var r Response
110+
r = push
100111
if cnt == 1 {
101112
r = resp
102113
}
@@ -114,19 +125,23 @@ func TestFutureGetIteratorResponseOnTimeout(t *testing.T) {
114125
}()
115126

116127
wait.Wait()
117-
fut.SetResponse(resp)
128+
129+
fut.SetRequest(&InsertRequest{})
130+
fut.SetResponse(respHeader, nil)
118131
done.Wait()
119132
}
120133

121134
func TestFutureGetIteratorFirstResponse(t *testing.T) {
122-
resp1 := &BaseResponse{}
123-
resp2 := &BaseResponse{}
135+
resp := &BaseResponse{}
124136
fut := NewFuture()
125-
fut.SetResponse(resp1)
126-
fut.SetResponse(resp2)
137+
fut.SetRequest(&InsertRequest{})
138+
fut.SetResponse(Header{}, nil)
139+
fut.SetResponse(Header{}, nil)
140+
141+
resp.Decode()
127142

128143
if it := fut.GetIterator(); it.Next() {
129-
assertResponseIteratorValue(t, it, resp1)
144+
assertResponseIteratorValue(t, it, resp)
130145
if it.Next() == true {
131146
t.Errorf("An unexpected next value.")
132147
}
@@ -155,17 +170,20 @@ func TestFutureGetIteratorFirstError(t *testing.T) {
155170
}
156171

157172
func TestFutureGetIteratorResponse(t *testing.T) {
158-
responses := []*BaseResponse{
159-
{},
160-
{},
161-
{},
173+
responses := []Response{
174+
&PushResponse{},
175+
&PushResponse{},
176+
&BaseResponse{},
162177
}
178+
header := Header{}
163179
fut := NewFuture()
180+
fut.SetRequest(&InsertRequest{})
164181
for i, resp := range responses {
182+
resp.Decode()
165183
if i == len(responses)-1 {
166-
fut.SetResponse(resp)
184+
fut.SetResponse(header, nil)
167185
} else {
168-
fut.AppendPush(resp)
186+
fut.AppendPush(header, nil)
169187
}
170188
}
171189

@@ -189,14 +207,15 @@ func TestFutureGetIteratorResponse(t *testing.T) {
189207

190208
func TestFutureGetIteratorError(t *testing.T) {
191209
const errMsg = "error message"
192-
responses := []*BaseResponse{
210+
responses := []*PushResponse{
193211
{},
194212
{},
195213
}
196214
err := errors.New(errMsg)
197215
fut := NewFuture()
198216
for _, resp := range responses {
199-
fut.AppendPush(resp)
217+
fut.AppendPush(Header{}, nil)
218+
resp.Decode()
200219
}
201220
fut.SetError(err)
202221

@@ -226,19 +245,18 @@ func TestFutureGetIteratorError(t *testing.T) {
226245

227246
func TestFutureSetStateRaceCondition(t *testing.T) {
228247
err := errors.New("any error")
229-
resp := &BaseResponse{}
230248

231249
for i := 0; i < 1000; i++ {
232250
fut := NewFuture()
251+
fut.SetRequest(&InsertRequest{})
233252
for j := 0; j < 9; j++ {
234253
go func(opt int) {
235254
if opt%3 == 0 {
236-
respAppend := &BaseResponse{}
237-
fut.AppendPush(respAppend)
255+
fut.AppendPush(Header{}, nil)
238256
} else if opt%3 == 1 {
239257
fut.SetError(err)
240258
} else {
241-
fut.SetResponse(resp)
259+
fut.SetResponse(Header{}, nil)
242260
}
243261
}(j)
244262
}

prepared.go

Lines changed: 35 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package tarantool
33
import (
44
"context"
55
"fmt"
6+
"io"
67

78
"github.com/tarantool/go-iproto"
89
"github.com/vmihailenco/msgpack/v5"
@@ -96,10 +97,23 @@ func (req *PrepareRequest) Context(ctx context.Context) *PrepareRequest {
9697
}
9798

9899
// CreateResponse creates a response for the PrepareRequest.
99-
func (req *PrepareRequest) CreateResponse(header Header) Response {
100-
resp := PrepareResponse{}
101-
resp.SetHeader(header)
102-
return &resp
100+
func (req *PrepareRequest) CreateResponse(header Header, body io.Reader) (Response, error) {
101+
if body == nil {
102+
baseResp := BaseResponse{header: header}
103+
return &PrepareResponse{BaseResponse: baseResp}, nil
104+
}
105+
if buf, ok := body.(*smallBuf); ok {
106+
baseResp := BaseResponse{header: header, buf: *buf}
107+
return &PrepareResponse{BaseResponse: baseResp}, nil
108+
}
109+
data, err := io.ReadAll(body)
110+
if err != nil {
111+
return nil, err
112+
}
113+
baseResp := BaseResponse{
114+
header: header, buf: smallBuf{b: data},
115+
}
116+
return &PrepareResponse{BaseResponse: baseResp}, nil
103117
}
104118

105119
// UnprepareRequest helps you to create an unprepare request object for
@@ -184,8 +198,21 @@ func (req *ExecutePreparedRequest) Context(ctx context.Context) *ExecutePrepared
184198
}
185199

186200
// CreateResponse creates a response for the ExecutePreparedRequest.
187-
func (req *ExecutePreparedRequest) CreateResponse(header Header) Response {
188-
resp := PrepareResponse{}
189-
resp.SetHeader(header)
190-
return &resp
201+
func (req *ExecutePreparedRequest) CreateResponse(header Header, body io.Reader) (Response, error) {
202+
if body == nil {
203+
baseResp := BaseResponse{header: header}
204+
return &PrepareResponse{BaseResponse: baseResp}, nil
205+
}
206+
if buf, ok := body.(*smallBuf); ok {
207+
baseResp := BaseResponse{header: header, buf: *buf}
208+
return &PrepareResponse{BaseResponse: baseResp}, nil
209+
}
210+
data, err := io.ReadAll(body)
211+
if err != nil {
212+
return nil, err
213+
}
214+
baseResp := BaseResponse{
215+
header: header, buf: smallBuf{b: data},
216+
}
217+
return &PrepareResponse{BaseResponse: baseResp}, nil
191218
}

0 commit comments

Comments
 (0)