Skip to content

Commit bf84c17

Browse files
committed
api: add ConnectorAdapter to connection_pool
ConnectorAdapter allows to use ConnectionPool as Connector. All requests to a pool will be executed in a specified mode. Part of #176
1 parent 17fb94d commit bf84c17

File tree

6 files changed

+1586
-0
lines changed

6 files changed

+1586
-0
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
1414
- ConnectionHandler interface for handling changes of connections in
1515
ConnectionPool (#178)
1616
- Execute, ExecuteTyped and ExecuteAsync methods to ConnectionPool (#176)
17+
- ConnectorAdapter type to use ConnectionPool as Connector interface (#176)
1718

1819
### Changed
1920

connection_pool/connection_pool.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,8 @@ type ConnectionPool struct {
9999
poolsMutex sync.RWMutex
100100
}
101101

102+
var _ Pooler = (*ConnectionPool)(nil)
103+
102104
type connState struct {
103105
addr string
104106
notify chan tarantool.ConnEvent

connection_pool/connector.go

Lines changed: 305 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,305 @@
1+
package connection_pool
2+
3+
import (
4+
"errors"
5+
"fmt"
6+
"time"
7+
8+
"github.com/tarantool/go-tarantool"
9+
)
10+
11+
// ConnectorAdapter allows to use Pooler as Connector.
12+
type ConnectorAdapter struct {
13+
pool Pooler
14+
mode Mode
15+
}
16+
17+
var _ tarantool.Connector = (*ConnectorAdapter)(nil)
18+
19+
// NewConnectorAdapter creates a new ConnectorAdapter object for a pool
20+
// and with a mode. All requests to the pool will be executed in the
21+
// specified mode.
22+
func NewConnectorAdapter(pool Pooler, mode Mode) *ConnectorAdapter {
23+
return &ConnectorAdapter{pool: pool, mode: mode}
24+
}
25+
26+
// ConnectedNow reports if connections is established at the moment.
27+
func (c *ConnectorAdapter) ConnectedNow() bool {
28+
ret, err := c.pool.ConnectedNow(c.mode)
29+
if err != nil {
30+
return false
31+
}
32+
return ret
33+
}
34+
35+
// ClosedNow reports if the connector is closed by user or all connections
36+
// in the specified mode closed.
37+
func (c *ConnectorAdapter) Close() error {
38+
errs := c.pool.Close()
39+
if len(errs) == 0 {
40+
return nil
41+
}
42+
43+
err := errors.New("failed to close connection pool")
44+
for _, e := range errs {
45+
err = fmt.Errorf("%s: %w", err.Error(), e)
46+
}
47+
return err
48+
}
49+
50+
// Ping sends empty request to Tarantool to check connection.
51+
func (c *ConnectorAdapter) Ping() (*tarantool.Response, error) {
52+
return c.pool.Ping(c.mode)
53+
}
54+
55+
// ConfiguredTimeout returns a timeout from connections config.
56+
func (c *ConnectorAdapter) ConfiguredTimeout() time.Duration {
57+
ret, err := c.pool.ConfiguredTimeout(c.mode)
58+
if err != nil {
59+
return 0 * time.Second
60+
}
61+
return ret
62+
}
63+
64+
// Select performs select to box space.
65+
func (c *ConnectorAdapter) Select(space, index interface{},
66+
offset, limit, iterator uint32,
67+
key interface{}) (*tarantool.Response, error) {
68+
return c.pool.Select(space, index, offset, limit, iterator, key, c.mode)
69+
}
70+
71+
// Insert performs insertion to box space.
72+
func (c *ConnectorAdapter) Insert(space interface{},
73+
tuple interface{}) (*tarantool.Response, error) {
74+
return c.pool.Insert(space, tuple, c.mode)
75+
}
76+
77+
// Replace performs "insert or replace" action to box space.
78+
func (c *ConnectorAdapter) Replace(space interface{},
79+
tuple interface{}) (*tarantool.Response, error) {
80+
return c.pool.Replace(space, tuple, c.mode)
81+
}
82+
83+
// Delete performs deletion of a tuple by key.
84+
func (c *ConnectorAdapter) Delete(space, index interface{},
85+
key interface{}) (*tarantool.Response, error) {
86+
return c.pool.Delete(space, index, key, c.mode)
87+
}
88+
89+
// Update performs update of a tuple by key.
90+
func (c *ConnectorAdapter) Update(space, index interface{},
91+
key, ops interface{}) (*tarantool.Response, error) {
92+
return c.pool.Update(space, index, key, ops, c.mode)
93+
}
94+
95+
// Upsert performs "update or insert" action of a tuple by key.
96+
func (c *ConnectorAdapter) Upsert(space interface{},
97+
tuple, ops interface{}) (*tarantool.Response, error) {
98+
return c.pool.Upsert(space, tuple, ops, c.mode)
99+
}
100+
101+
// Call calls registered Tarantool function.
102+
// It uses request code for Tarantool >= 1.7 if go-tarantool
103+
// was build with go_tarantool_call_17 tag.
104+
// Otherwise, uses request code for Tarantool 1.6.
105+
func (c *ConnectorAdapter) Call(functionName string,
106+
args interface{}) (*tarantool.Response, error) {
107+
return c.pool.Call(functionName, args, c.mode)
108+
}
109+
110+
// Call16 calls registered Tarantool function.
111+
// It uses request code for Tarantool 1.6, so result is converted to array of arrays
112+
// Deprecated since Tarantool 1.7.2.
113+
func (c *ConnectorAdapter) Call16(functionName string,
114+
args interface{}) (*tarantool.Response, error) {
115+
return c.pool.Call16(functionName, args, c.mode)
116+
}
117+
118+
// Call17 calls registered Tarantool function.
119+
// It uses request code for Tarantool >= 1.7, so result is not converted
120+
// (though, keep in mind, result is always array)
121+
func (c *ConnectorAdapter) Call17(functionName string,
122+
args interface{}) (*tarantool.Response, error) {
123+
return c.pool.Call17(functionName, args, c.mode)
124+
}
125+
126+
// Eval passes Lua expression for evaluation.
127+
func (c *ConnectorAdapter) Eval(expr string,
128+
args interface{}) (*tarantool.Response, error) {
129+
return c.pool.Eval(expr, args, c.mode)
130+
}
131+
132+
// Execute passes sql expression to Tarantool for execution.
133+
func (c *ConnectorAdapter) Execute(expr string,
134+
args interface{}) (*tarantool.Response, error) {
135+
return c.pool.Execute(expr, args, c.mode)
136+
}
137+
138+
// GetTyped performs select (with limit = 1 and offset = 0)
139+
// to box space and fills typed result.
140+
func (c *ConnectorAdapter) GetTyped(space, index interface{},
141+
key interface{}, result interface{}) error {
142+
return c.pool.GetTyped(space, index, key, result, c.mode)
143+
}
144+
145+
// SelectTyped performs select to box space and fills typed result.
146+
func (c *ConnectorAdapter) SelectTyped(space, index interface{},
147+
offset, limit, iterator uint32,
148+
key interface{}, result interface{}) error {
149+
return c.pool.SelectTyped(space, index, offset, limit, iterator, key, result, c.mode)
150+
}
151+
152+
// InsertTyped performs insertion to box space.
153+
func (c *ConnectorAdapter) InsertTyped(space interface{},
154+
tuple interface{}, result interface{}) error {
155+
return c.pool.InsertTyped(space, tuple, result, c.mode)
156+
}
157+
158+
// ReplaceTyped performs "insert or replace" action to box space.
159+
func (c *ConnectorAdapter) ReplaceTyped(space interface{},
160+
tuple interface{}, result interface{}) error {
161+
return c.pool.ReplaceTyped(space, tuple, result, c.mode)
162+
}
163+
164+
// DeleteTyped performs deletion of a tuple by key and fills result with deleted tuple.
165+
func (c *ConnectorAdapter) DeleteTyped(space, index interface{},
166+
key interface{}, result interface{}) error {
167+
return c.pool.DeleteTyped(space, index, key, result, c.mode)
168+
}
169+
170+
// UpdateTyped performs update of a tuple by key and fills result with updated tuple.
171+
func (c *ConnectorAdapter) UpdateTyped(space, index interface{},
172+
key, ops interface{}, result interface{}) error {
173+
return c.pool.UpdateTyped(space, index, key, ops, result, c.mode)
174+
}
175+
176+
// CallTyped calls registered function.
177+
// It uses request code for Tarantool >= 1.7 if go-tarantool
178+
// was build with go_tarantool_call_17 tag.
179+
// Otherwise, uses request code for Tarantool 1.6.
180+
func (c *ConnectorAdapter) CallTyped(functionName string,
181+
args interface{}, result interface{}) error {
182+
return c.pool.CallTyped(functionName, args, result, c.mode)
183+
}
184+
185+
// Call16Typed calls registered function.
186+
// It uses request code for Tarantool 1.6, so result is converted to array of arrays
187+
// Deprecated since Tarantool 1.7.2.
188+
func (c *ConnectorAdapter) Call16Typed(functionName string,
189+
args interface{}, result interface{}) error {
190+
return c.pool.Call16Typed(functionName, args, result, c.mode)
191+
}
192+
193+
// Call17Typed calls registered function.
194+
// It uses request code for Tarantool >= 1.7, so result is not converted
195+
// (though, keep in mind, result is always array)
196+
func (c *ConnectorAdapter) Call17Typed(functionName string,
197+
args interface{}, result interface{}) error {
198+
return c.pool.Call17Typed(functionName, args, result, c.mode)
199+
}
200+
201+
// EvalTyped passes Lua expression for evaluation.
202+
func (c *ConnectorAdapter) EvalTyped(expr string, args interface{},
203+
result interface{}) error {
204+
return c.pool.EvalTyped(expr, args, result, c.mode)
205+
}
206+
207+
// ExecuteTyped passes sql expression to Tarantool for execution.
208+
func (c *ConnectorAdapter) ExecuteTyped(expr string, args interface{},
209+
result interface{}) (tarantool.SQLInfo, []tarantool.ColumnMetaData, error) {
210+
return c.pool.ExecuteTyped(expr, args, result, c.mode)
211+
}
212+
213+
// SelectAsync sends select request to Tarantool and returns Future.
214+
func (c *ConnectorAdapter) SelectAsync(space, index interface{},
215+
offset, limit, iterator uint32, key interface{}) *tarantool.Future {
216+
return c.pool.SelectAsync(space, index, offset, limit, iterator, key, c.mode)
217+
}
218+
219+
// InsertAsync sends insert action to Tarantool and returns Future.
220+
func (c *ConnectorAdapter) InsertAsync(space interface{},
221+
tuple interface{}) *tarantool.Future {
222+
return c.pool.InsertAsync(space, tuple, c.mode)
223+
}
224+
225+
// ReplaceAsync sends "insert or replace" action to Tarantool and returns Future.
226+
func (c *ConnectorAdapter) ReplaceAsync(space interface{},
227+
tuple interface{}) *tarantool.Future {
228+
return c.pool.ReplaceAsync(space, tuple, c.mode)
229+
}
230+
231+
// DeleteAsync sends deletion action to Tarantool and returns Future.
232+
func (c *ConnectorAdapter) DeleteAsync(space, index interface{},
233+
key interface{}) *tarantool.Future {
234+
return c.pool.DeleteAsync(space, index, key, c.mode)
235+
}
236+
237+
// Update sends deletion of a tuple by key and returns Future.
238+
func (c *ConnectorAdapter) UpdateAsync(space, index interface{},
239+
key, ops interface{}) *tarantool.Future {
240+
return c.pool.UpdateAsync(space, index, key, ops, c.mode)
241+
}
242+
243+
// UpsertAsync sends "update or insert" action to Tarantool and returns Future.
244+
func (c *ConnectorAdapter) UpsertAsync(space interface{}, tuple interface{},
245+
ops interface{}) *tarantool.Future {
246+
return c.pool.UpsertAsync(space, tuple, ops, c.mode)
247+
}
248+
249+
// CallAsync sends a call to registered Tarantool function and returns Future.
250+
// It uses request code for Tarantool >= 1.7 if go-tarantool
251+
// was build with go_tarantool_call_17 tag.
252+
// Otherwise, uses request code for Tarantool 1.6.
253+
func (c *ConnectorAdapter) CallAsync(functionName string,
254+
args interface{}) *tarantool.Future {
255+
return c.pool.CallAsync(functionName, args, c.mode)
256+
}
257+
258+
// Call16Async sends a call to registered Tarantool function and returns Future.
259+
// It uses request code for Tarantool 1.6, so future's result is always array of arrays.
260+
// Deprecated since Tarantool 1.7.2.
261+
func (c *ConnectorAdapter) Call16Async(functionName string,
262+
args interface{}) *tarantool.Future {
263+
return c.pool.Call16Async(functionName, args, c.mode)
264+
}
265+
266+
// Call17Async sends a call to registered Tarantool function and returns Future.
267+
// It uses request code for Tarantool >= 1.7, so future's result will not be converted
268+
// (though, keep in mind, result is always array)
269+
func (c *ConnectorAdapter) Call17Async(functionName string,
270+
args interface{}) *tarantool.Future {
271+
return c.pool.Call17Async(functionName, args, c.mode)
272+
}
273+
274+
// EvalAsync sends a Lua expression for evaluation and returns Future.
275+
func (c *ConnectorAdapter) EvalAsync(expr string,
276+
args interface{}) *tarantool.Future {
277+
return c.pool.EvalAsync(expr, args, c.mode)
278+
}
279+
280+
// ExecuteAsync sends a sql expression for execution and returns Future.
281+
func (c *ConnectorAdapter) ExecuteAsync(expr string,
282+
args interface{}) *tarantool.Future {
283+
return c.pool.ExecuteAsync(expr, args, c.mode)
284+
}
285+
286+
// NewPrepared passes a sql statement to Tarantool for preparation
287+
// synchronously.
288+
func (c *ConnectorAdapter) NewPrepared(expr string) (*tarantool.Prepared, error) {
289+
return c.pool.NewPrepared(expr, c.mode)
290+
}
291+
292+
// NewStream creates new Stream object for connection.
293+
//
294+
// Since v. 2.10.0, Tarantool supports streams and interactive transactions over
295+
// them. To use interactive transactions, memtx_use_mvcc_engine box option
296+
// should be set to true.
297+
// Since 1.7.0
298+
func (c *ConnectorAdapter) NewStream() (*tarantool.Stream, error) {
299+
return c.pool.NewStream(c.mode)
300+
}
301+
302+
// Do performs a request asynchronously on the connection.
303+
func (c *ConnectorAdapter) Do(req tarantool.Request) *tarantool.Future {
304+
return c.pool.Do(req, c.mode)
305+
}

0 commit comments

Comments
 (0)