Skip to content

Commit 1b3a847

Browse files
committed
Expose some of Connection methods directly on the Pool object.
Specifically Pool now implements the following methods: * Pool.execute() * Pool.executemany() * Pool.fetch() * Pool.fetchrow() * Pool.fetchval() Fixes: #39.
1 parent 424760d commit 1b3a847

File tree

2 files changed

+141
-1
lines changed

2 files changed

+141
-1
lines changed

asyncpg/pool.py

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -306,6 +306,67 @@ async def _async__init__(self):
306306
self._initialized = True
307307
return self
308308

309+
async def execute(self, query: str, *args, timeout: float=None) -> str:
310+
"""Execute an SQL command (or commands).
311+
312+
Pool performs this operation using one of its connections. Other than
313+
that, it behaves identically to
314+
:meth:`Connection.execute() <connection.Connection.execute>`.
315+
316+
.. versionadded:: 0.10.0
317+
"""
318+
async with self.acquire() as con:
319+
return await con.execute(query, *args, timeout=timeout)
320+
321+
async def executemany(self, command: str, args, *, timeout: float=None):
322+
"""Execute an SQL *command* for each sequence of arguments in *args*.
323+
324+
Pool performs this operation using one of its connections. Other than
325+
that, it behaves identically to
326+
:meth:`Connection.executemany() <connection.Connection.executemany>`.
327+
328+
.. versionadded:: 0.10.0
329+
"""
330+
async with self.acquire() as con:
331+
return await con.executemany(command, args, timeout=timeout)
332+
333+
async def fetch(self, query, *args, timeout=None) -> list:
334+
"""Run a query and return the results as a list of :class:`Record`.
335+
336+
Pool performs this operation using one of its connections. Other than
337+
that, it behaves identically to
338+
:meth:`Connection.fetch() <connection.Connection.fetch>`.
339+
340+
.. versionadded:: 0.10.0
341+
"""
342+
async with self.acquire() as con:
343+
return await con.fetch(query, *args, timeout=timeout)
344+
345+
async def fetchval(self, query, *args, column=0, timeout=None):
346+
"""Run a query and return a value in the first row.
347+
348+
Pool performs this operation using one of its connections. Other than
349+
that, it behaves identically to
350+
:meth:`Connection.fetchval() <connection.Connection.fetchval>`.
351+
352+
.. versionadded:: 0.10.0
353+
"""
354+
async with self.acquire() as con:
355+
return await con.fetchval(
356+
query, *args, column=column, timeout=timeout)
357+
358+
async def fetchrow(self, query, *args, timeout=None):
359+
"""Run a query and return the first row.
360+
361+
Pool performs this operation using one of its connections. Other than
362+
that, it behaves identically to
363+
:meth:`Connection.fetchrow() <connection.Connection.fetchrow>`.
364+
365+
.. versionadded:: 0.10.0
366+
"""
367+
async with self.acquire() as con:
368+
return await con.fetchrow(query, *args, timeout=timeout)
369+
309370
def acquire(self, *, timeout=None):
310371
"""Acquire a database connection from the pool.
311372

tests/test_pool.py

Lines changed: 80 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,9 @@
88
import asyncio
99
import asyncpg
1010
import inspect
11-
import platform
1211
import os
12+
import platform
13+
import random
1314
import unittest
1415

1516
from asyncpg import _testbase as tb
@@ -378,6 +379,84 @@ async def get_xact_id(con):
378379
id3 = await get_xact_id(con)
379380
self.assertNotEqual(id2, id3)
380381

382+
async def test_pool_connection_methods(self):
383+
async def test_fetch(pool):
384+
i = random.randint(0, 20)
385+
await asyncio.sleep(random.random() / 100)
386+
r = await pool.fetch('SELECT {}::int'.format(i))
387+
self.assertEqual(r, [(i,)])
388+
return 1
389+
390+
async def test_fetchrow(pool):
391+
i = random.randint(0, 20)
392+
await asyncio.sleep(random.random() / 100)
393+
r = await pool.fetchrow('SELECT {}::int'.format(i))
394+
self.assertEqual(r, (i,))
395+
return 1
396+
397+
async def test_fetchval(pool):
398+
i = random.randint(0, 20)
399+
await asyncio.sleep(random.random() / 100)
400+
r = await pool.fetchval('SELECT {}::int'.format(i))
401+
self.assertEqual(r, i)
402+
return 1
403+
404+
async def test_execute(pool):
405+
await asyncio.sleep(random.random() / 100)
406+
r = await pool.execute('SELECT generate_series(0, 10)')
407+
self.assertEqual(r, 'SELECT {}'.format(11))
408+
return 1
409+
410+
async def test_execute_with_arg(pool):
411+
i = random.randint(0, 20)
412+
await asyncio.sleep(random.random() / 100)
413+
r = await pool.execute('SELECT generate_series(0, $1)', i)
414+
self.assertEqual(r, 'SELECT {}'.format(i + 1))
415+
return 1
416+
417+
async def run(N, meth):
418+
async with self.create_pool(database='postgres',
419+
min_size=5, max_size=10) as pool:
420+
421+
coros = [meth(pool) for _ in range(N)]
422+
res = await asyncio.gather(*coros, loop=self.loop)
423+
self.assertEqual(res, [1] * N)
424+
425+
methods = [test_fetch, test_fetchrow, test_fetchval,
426+
test_execute, test_execute_with_arg]
427+
428+
for method in methods:
429+
with self.subTest(method=method.__name__):
430+
await run(200, method)
431+
432+
async def test_pool_connection_execute_many(self):
433+
async def worker(pool):
434+
await asyncio.sleep(random.random() / 100)
435+
await pool.executemany('''
436+
INSERT INTO exmany VALUES($1, $2)
437+
''', [
438+
('a', 1), ('b', 2), ('c', 3), ('d', 4)
439+
])
440+
return 1
441+
442+
N = 200
443+
444+
async with self.create_pool(database='postgres',
445+
min_size=5, max_size=10) as pool:
446+
447+
await pool.execute('CREATE TABLE exmany (a text, b int)')
448+
try:
449+
450+
coros = [worker(pool) for _ in range(N)]
451+
res = await asyncio.gather(*coros, loop=self.loop)
452+
self.assertEqual(res, [1] * N)
453+
454+
n_rows = await pool.fetchval('SELECT count(*) FROM exmany')
455+
self.assertEqual(n_rows, N * 4)
456+
457+
finally:
458+
await pool.execute('DROP TABLE exmany')
459+
381460

382461
@unittest.skipIf(os.environ.get('PGHOST'), 'using remote cluster for testing')
383462
class TestHostStandby(tb.ConnectedTestCase):

0 commit comments

Comments
 (0)