diff --git a/get_books.py b/get_books.py index 48e54ed..3b7d865 100644 --- a/get_books.py +++ b/get_books.py @@ -1,18 +1,15 @@ -import argparse, signal, asyncio, aioredis +import argparse, asyncio +import redis.asyncio as aioredis from services.shelving_service import ShelvingService from services.lending_service import LendingService, BOOKS_FOR_SHELVING_STREAM_KEY -# Main event loop -loop = asyncio.get_event_loop() - # Configuration LENDING_REQUESTS_STREAM_KEY = "lending_requests_event_stream" BOOK_RETURN_REQUESTS_STREAM_KEY = "book_return_requests_event_stream" async def main(action, user, books, address, db, password): - pool = await aioredis.create_redis_pool(address, db=db, password=password, - minsize=4, maxsize=10, loop=loop, encoding='utf8') + client = aioredis.from_url(address, db=db, password=password, decode_responses=True) # Choose the target stream based on `action` stream_key = None @@ -25,9 +22,12 @@ async def main(action, user, books, address, db, password): exit(1) # Send the request - await pool.xadd(stream_key, {'user_id': user, 'book_ids': ','.join(books)}) + await client.xadd(stream_key, {'user_id': user, 'book_ids': ','.join(books)}) print("OK") + # Utilizing asyncio Redis requires an explicit disconnect of the connection since there is no asyncio deconstructor magic method. + await client.aclose() + if __name__ == '__main__': parser = argparse.ArgumentParser(description='CLI tool to get and return books.') parser.add_argument('action', choices=['request', 'return'], type=str, @@ -44,7 +44,7 @@ async def main(action, user, books, address, db, password): help='redis password') args = parser.parse_args() - loop.run_until_complete(main(action=args.action, user=args.name, books=args.books, + asyncio.run(main(action=args.action, user=args.name, books=args.books, address=args.address, db=args.db, password=args.password)) diff --git a/main.py b/main.py index 5f4fc71..9e67d64 100644 --- a/main.py +++ b/main.py @@ -1,10 +1,8 @@ -import argparse, signal, asyncio, aioredis +import argparse, signal, asyncio +import redis.asyncio as aioredis from services.shelving_service import ShelvingService from services.lending_service import LendingService, BOOKS_FOR_SHELVING_STREAM_KEY -# Main event loop -loop = asyncio.get_event_loop() - # Global list of running services, used by `graceful_shutdown` RUNNING_SERVICES = None SHUTTING_DOWN = False @@ -21,23 +19,28 @@ def graceful_shutdown(): print("\nShutting down (might take up to 10s)...") async def main(instance_name, force, address, db, password): - pool = await aioredis.create_redis_pool(address, db=db, password=password, - minsize=4, maxsize=10, loop=loop, encoding='utf8') + # This is one way to get a client backed by a connection pool. + # This has the same result as the instructions below. + # conn = aioredis.from_url(address, db=db, password=password, decode_responses=True) + + pool = aioredis.ConnectionPool().from_url(address, db=db, password=password, decode_responses=True) + client = aioredis.Redis.from_pool(pool) lock_key = f"instance_lock:{instance_name}" if not force: - if not await pool.setnx(lock_key, 'locked'): + if not await client.setnx(lock_key, 'locked'): print("There might be another instance with the same name running.") print("Use the -f option to force launching anyway.") print("For the service to work correctly, each running instance must have a unique name.") exit(1) - # Setup a singnal handler for graceful shutdown + # Setup a signal handler for graceful shutdown + loop = asyncio.get_running_loop() loop.add_signal_handler(signal.SIGINT, graceful_shutdown) # Instantiate LendingService and ShelvingService - shelving_service = ShelvingService(pool, instance_name, BOOKS_FOR_SHELVING_STREAM_KEY) - lending_service = LendingService(pool, instance_name, shelving_service) + shelving_service = ShelvingService(client, instance_name, BOOKS_FOR_SHELVING_STREAM_KEY) + lending_service = LendingService(client, instance_name, shelving_service) # Add services to `RUNNING_SERVICES` list to enable graceful shutdown global RUNNING_SERVICES @@ -47,7 +50,12 @@ async def main(instance_name, force, address, db, password): await asyncio.gather(lending_service.launch_service(), shelving_service.launch_service()) # Release the instance name lock when shutting down - await pool.delete(lock_key) + await client.delete(lock_key) + + # Utilizing asyncio Redis requires an explicit disconnect of the connection since there is + # no asyncio deconstructor magic method. + print("Closing client") + await client.aclose() if __name__ == '__main__': @@ -64,7 +72,5 @@ async def main(instance_name, force, address, db, password): help='redis password') args = parser.parse_args() - loop.run_until_complete(main(instance_name=args.name, force=args.force, + asyncio.run(main(instance_name=args.name, force=args.force, address=args.address, db=args.db, password=args.password)) - - diff --git a/requirements.txt b/requirements.txt index ba3a7c0..74b362f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1 @@ -aioredis \ No newline at end of file +redis \ No newline at end of file diff --git a/services/lending_service.py b/services/lending_service.py index 2b9e612..39514bf 100644 --- a/services/lending_service.py +++ b/services/lending_service.py @@ -1,6 +1,6 @@ import asyncio -import aioredis -loop = asyncio.get_event_loop() +import redis.asyncio as aioredis +import redis ## -- SERVICE STATE (stored in Redis) -- @@ -46,8 +46,8 @@ class LendingService: - def __init__(self, pool, instance_name, shelving_service): - self.pool = pool + def __init__(self, client, instance_name, shelving_service): + self.client = client self.instance_name = instance_name self.shutting_down = False self.shelving_service = shelving_service @@ -56,41 +56,47 @@ def __init__(self, pool, instance_name, shelving_service): async def launch_service(self): # Ensure Redis has a consumer group defined for each relevant stream try: - await self.pool.execute("XGROUP", "CREATE", LENDING_REQUESTS_STREAM_KEY, LENDING_SERVICE_CONSUMER_GROUP, "$", "MKSTREAM") - except aioredis.errors.ReplyError as e: + await self.client.xgroup_create(name=LENDING_REQUESTS_STREAM_KEY, groupname=LENDING_SERVICE_CONSUMER_GROUP, id='$', mkstream=True) + except redis.exceptions.ResponseError as e: assert e.args[0].startswith("BUSYGROUP") try: - await self.pool.execute("XGROUP", "CREATE", BOOK_RETURN_REQUESTS_STREAM_KEY, LENDING_SERVICE_CONSUMER_GROUP, "$", "MKSTREAM") - except aioredis.errors.ReplyError as e: + await self.client.xgroup_create(name=BOOK_RETURN_REQUESTS_STREAM_KEY, groupname=LENDING_SERVICE_CONSUMER_GROUP, id='$', mkstream=True) + except redis.exceptions.ResponseError as e: assert e.args[0].startswith("BUSYGROUP") # Ensure Redis has the required Lua scripts global REFILL_AUTOMATED_STORAGE_LUA, BOOKS_TO_STREAM_LUA, APPLY_BOOK_RETURN_LUA - REFILL_AUTOMATED_STORAGE_LUA = await self.pool.script_load(open('lua/refill_automated_storage.lua', 'r').read()) - BOOKS_TO_STREAM_LUA = await self.pool.script_load(open('lua/books_to_stream.lua', 'r').read()) - APPLY_BOOK_RETURN_LUA = await self.pool.script_load(open('lua/apply_book_return.lua', 'r').read()) + REFILL_AUTOMATED_STORAGE_LUA = await self.client.script_load(open('lua/refill_automated_storage.lua', 'r').read()) + BOOKS_TO_STREAM_LUA = await self.client.script_load(open('lua/books_to_stream.lua', 'r').read()) + APPLY_BOOK_RETURN_LUA = await self.client.script_load(open('lua/apply_book_return.lua', 'r').read()) # First we retrieve any potential pending message - events = await self.pool.xread_group(LENDING_SERVICE_CONSUMER_GROUP, self.instance_name, - [LENDING_REQUESTS_STREAM_KEY, BOOK_RETURN_REQUESTS_STREAM_KEY], latest_ids=["0", "0"]) - if len(events) > 0: - print("[WARN] Found claimed events that need processing, resuming...") + streams = await self.client.xreadgroup(LENDING_SERVICE_CONSUMER_GROUP, self.instance_name, + {LENDING_REQUESTS_STREAM_KEY: '0', BOOK_RETURN_REQUESTS_STREAM_KEY: '0'}) + + for stream in streams: + events = stream[1] + if len(events) > 0: + print("[WARN] Found claimed events that need processing, resuming...") + break # This is the main loop print("Ready to process events...") - with await self.pool as conn: - while not self.shutting_down: + while not self.shutting_down: + for stream in streams: tasks = [] - for stream_name, event_id, message in events: + stream_name = stream[0] + events = stream[1] + for event_id, message in events: if stream_name == LENDING_REQUESTS_STREAM_KEY: tasks.append(self.process_lending_request(event_id, message)) elif stream_name == BOOK_RETURN_REQUESTS_STREAM_KEY: tasks.append(self.process_returned_books_request(event_id, message)) await asyncio.gather(*tasks) - # Gather new events to process (batch size = 10) - events = await conn.xread_group(LENDING_SERVICE_CONSUMER_GROUP, self.instance_name, - [LENDING_REQUESTS_STREAM_KEY, BOOK_RETURN_REQUESTS_STREAM_KEY], timeout=10000, count=10, latest_ids=[">", ">"]) + # Gather new events to process (batch size = 10) + streams = await self.client.xreadgroup(LENDING_SERVICE_CONSUMER_GROUP, self.instance_name, + {LENDING_REQUESTS_STREAM_KEY: '>', BOOK_RETURN_REQUESTS_STREAM_KEY: '>'}, block=10000, count=10) async def process_lending_request(self, request_id, request): @@ -104,7 +110,7 @@ async def process_lending_request(self, request_id, request): # See if Redis contains already a partial set of reserved books. # This can happen in the case of a crash. In such case, we fetch # the set to resume from where we left off. - books_found = await self.pool.smembers(request_reserved_books_key) + books_found = await self.client.smembers(request_reserved_books_key) if len(books_found) > 0: print("[WARN] Found partially-processed transaction, resuming.") @@ -114,33 +120,33 @@ async def process_lending_request(self, request_id, request): continue # Is the book lent already? - if await self.pool.hexists(LENT_BOOKS_KEY, book_id): + if await self.client.hexists(LENT_BOOKS_KEY, book_id): continue # Try to get the book from automated storage - if await self.pool.smove(AUTOMATED_BOOK_STORAGE_KEY, request_reserved_books_key, book_id): - books_found.append(book_id) + if await self.client.smove(AUTOMATED_BOOK_STORAGE_KEY, request_reserved_books_key, book_id): + books_found.add(book_id) continue # Try to get the book from ShelvingService # get_book() is idempotent, so it's ok to call it again # in case of a crash. if await self.shelving_service.get_book(book_id, request_id): - await self.pool.sadd(request_reserved_books_key, book_id) - books_found.append(book_id) + await self.client.sadd(request_reserved_books_key, book_id) + books_found.add(book_id) continue # Requests for which we can't find any book get denied. # This is an arbitrary choice, but it makes more sense than # to accept a request and then give out 0 books. if len(books_found) == 0: - await self.pool.xack(LENDING_REQUESTS_STREAM_KEY, LENDING_SERVICE_CONSUMER_GROUP, request_id) + await self.client.xack(LENDING_REQUESTS_STREAM_KEY, LENDING_SERVICE_CONSUMER_GROUP, request_id) print(f"Request: [{request_id}] by {user_id} DENIED.") print(f" Cause: none of the requested books is available.\n") return # Reserving a connection for the upcoming transaction - with await self.pool as conn: + async with self.client.pipeline(transaction=True) as conn: # The transaction can fail if another process is changing `user_book_counts_key`. # We retry indefinitely but know that the transaction will eventually succeed @@ -159,34 +165,33 @@ async def process_lending_request(self, request_id, request): # all reservations. await conn.unwatch() - transaction = conn.multi_exec() + conn.multi() # Refill local storage - transaction.evalsha(REFILL_AUTOMATED_STORAGE_LUA, - keys=[AUTOMATED_BOOK_STORAGE_KEY, request_reserved_books_key], - args=[AUTOMATED_BOOK_STORAGE_CAPACITY]) + keys=[AUTOMATED_BOOK_STORAGE_KEY, request_reserved_books_key] + conn.evalsha(REFILL_AUTOMATED_STORAGE_LUA, len(keys), *keys, AUTOMATED_BOOK_STORAGE_CAPACITY) # Return remaining books - transaction.evalsha(BOOKS_TO_STREAM_LUA, - keys=[request_reserved_books_key, BOOKS_FOR_SHELVING_STREAM_KEY]) + keys=[request_reserved_books_key, BOOKS_FOR_SHELVING_STREAM_KEY] + conn.evalsha(BOOKS_TO_STREAM_LUA, len(keys), *keys) # Cleanup - transaction.unlink(request_reserved_books_key) - transaction.xack(LENDING_REQUESTS_STREAM_KEY, LENDING_SERVICE_CONSUMER_GROUP, request_id) - await transaction.execute() + conn.unlink(request_reserved_books_key) + conn.xack(LENDING_REQUESTS_STREAM_KEY, LENDING_SERVICE_CONSUMER_GROUP, request_id) + await conn.execute() print(f"Request: [{request_id}] by {user_id} DENIED.") print(f" Cause: too many books (user has {books_in_hand} books, requested {len(requested_books)} of which {len(books_found)} were found).\n") else: # The user has enough capacity to get all the found books. # All temporarily reserved books will now be committed. - transaction = conn.multi_exec() - transaction.incrby(user_book_counts_key, len(books_found)) + conn.multi() + conn.incrby(user_book_counts_key, len(books_found)) for book_id in books_found: - transaction.hset(LENT_BOOKS_KEY, book_id, user_id) - transaction.unlink(request_reserved_books_key) - transaction.xack(LENDING_REQUESTS_STREAM_KEY, LENDING_SERVICE_CONSUMER_GROUP, request_id) + conn.hset(LENT_BOOKS_KEY, book_id, user_id) + conn.unlink(request_reserved_books_key) + conn.xack(LENDING_REQUESTS_STREAM_KEY, LENDING_SERVICE_CONSUMER_GROUP, request_id) try: - await transaction.execute() + await conn.execute() print(f"Request: [{request_id}] by {user_id} ACCEPTED.") print(f"Books:") for i, book_id in enumerate(books_found): @@ -205,30 +210,29 @@ async def process_returned_books_request(self, return_request_id, return_request user_book_counts_key = BOOK_COUNTS_KEY_TEMPLATE.format(user_id=user_id) temp_set_key = f"return:{return_request_id}:temp" - with await self.pool as conn: + async with self.client.pipeline(transaction=True) as conn: + conn.multi() # Start the transaction and load the data - transaction = conn.multi_exec() - transaction.sadd(temp_set_key, *book_id_list) + conn.sadd(temp_set_key, *book_id_list) # Apply book returns (updates `lent_books` and the user's book count) - transaction.evalsha(APPLY_BOOK_RETURN_LUA, - keys=[LENT_BOOKS_KEY, temp_set_key, user_book_counts_key], - args=[user_id]) + keys=[LENT_BOOKS_KEY, temp_set_key, user_book_counts_key] + conn.evalsha(APPLY_BOOK_RETURN_LUA, len(keys), *keys, user_id) # Refill automated storage - transaction.evalsha(REFILL_AUTOMATED_STORAGE_LUA, - keys=[AUTOMATED_BOOK_STORAGE_KEY, temp_set_key], - args=[AUTOMATED_BOOK_STORAGE_CAPACITY]) + keys=[AUTOMATED_BOOK_STORAGE_KEY, temp_set_key] + conn.evalsha(REFILL_AUTOMATED_STORAGE_LUA, len(keys), *keys, AUTOMATED_BOOK_STORAGE_CAPACITY) # Return remaining books - transaction.evalsha(BOOKS_TO_STREAM_LUA, - keys=[temp_set_key, BOOKS_FOR_SHELVING_STREAM_KEY]) + keys=[temp_set_key, BOOKS_FOR_SHELVING_STREAM_KEY] + conn.evalsha(BOOKS_TO_STREAM_LUA, len(keys), *keys) # Cleanup - transaction.unlink(temp_set_key) - transaction.xack(BOOK_RETURN_REQUESTS_STREAM_KEY, LENDING_SERVICE_CONSUMER_GROUP, return_request_id) - await transaction.execute() - print(f"Book Return [{return_request_id}] PROCESSED \n") + conn.unlink(temp_set_key) + conn.xack(BOOK_RETURN_REQUESTS_STREAM_KEY, LENDING_SERVICE_CONSUMER_GROUP, return_request_id) + await conn.execute() + + print(f"Book Return {return_request_id} PROCESSED") diff --git a/services/shelving_service.py b/services/shelving_service.py index d28252b..b4144f4 100644 --- a/services/shelving_service.py +++ b/services/shelving_service.py @@ -1,5 +1,6 @@ import asyncio -import aioredis +import redis.asyncio as aioredis +import redis SHELVING_SERVICE_STATE_KEY = "shelving_service_state" SHELVING_SERVICE_CONSUMER_GROUP = 'shelving_service' @@ -12,8 +13,8 @@ class ShelvingService: - def __init__(self, pool, instance_name, lending_service_returns_stream_key): - self.pool = pool + def __init__(self, client, instance_name, lending_service_returns_stream_key): + self.client = client self.lending_service_returns_stream_key = lending_service_returns_stream_key self.instance_name = instance_name self.shutting_down = False @@ -23,36 +24,43 @@ def __init__(self, pool, instance_name, lending_service_returns_stream_key): # access this method through an HTTP / gRPC request. async def get_book(self, book_id, context_id): # Try to get the book - if 1 == await self.pool.hsetnx(SHELVING_SERVICE_STATE_KEY, book_id, context_id): + if 1 == await self.client.hsetnx(SHELVING_SERVICE_STATE_KEY, book_id, context_id): return True else: # Book is taken. If it was taken from this same context, return success anyway - return context_id == await self.pool.hget(SHELVING_SERVICE_STATE_KEY, book_id) + return context_id == await self.client.hget(SHELVING_SERVICE_STATE_KEY, book_id) # Internal APIs async def launch_service(self): # Ensure we have a consumer group try: - await self.pool.execute("XGROUP", "CREATE", self.lending_service_returns_stream_key, SHELVING_SERVICE_CONSUMER_GROUP, "$", "MKSTREAM") - except aioredis.errors.ReplyError as e: + await self.client.xgroup_create(name=self.lending_service_returns_stream_key, groupname=SHELVING_SERVICE_CONSUMER_GROUP, id='$', mkstream=True) + except redis.exceptions.ResponseError as e: assert e.args[0].startswith("BUSYGROUP") # Get pending returns - events = await self.pool.xread_group(SHELVING_SERVICE_CONSUMER_GROUP, self.instance_name, - [self.lending_service_returns_stream_key], latest_ids=["0"]) - - with await self.pool as conn: - while not self.shutting_down: - tasks = [self.process_return(event_id, message) for _, event_id, message in events] + streams = await self.client.xreadgroup(SHELVING_SERVICE_CONSUMER_GROUP, self.instance_name, + {self.lending_service_returns_stream_key: '0'}) + + # This is the main loop + while not self.shutting_down: + for stream in streams: + events = stream[1] + tasks = [self.process_return(event_id, message) for event_id, message in events] await asyncio.gather(*tasks) - # Get more returns - events = await conn.xread_group(SHELVING_SERVICE_CONSUMER_GROUP, self.instance_name, - [self.lending_service_returns_stream_key], timeout=10000, latest_ids=[">"]) + # Get more returns + streams = await self.client.xreadgroup(SHELVING_SERVICE_CONSUMER_GROUP, self.instance_name, + {self.lending_service_returns_stream_key: '>'}, block=10000) async def process_return(self, event_id, message): book_list = message['book_ids'].split(',') - transaction = self.pool.multi_exec() - transaction.hdel(SHELVING_SERVICE_STATE_KEY, *book_list) - transaction.xack(self.lending_service_returns_stream_key, SHELVING_SERVICE_CONSUMER_GROUP, event_id) - await transaction.execute() \ No newline at end of file + + async with self.client.pipeline(transaction=True) as conn: + conn.hdel(SHELVING_SERVICE_STATE_KEY, *book_list) + conn.xack(self.lending_service_returns_stream_key, SHELVING_SERVICE_CONSUMER_GROUP, event_id) + await conn.execute() + print(f"Shelving service reshelved Books:") + for i, book_id in enumerate(book_list): + print(f" {i + 1}) {book_id}") + print() \ No newline at end of file