Skip to content

Update app for Python 3.12.3 and redis 6.1.0 #2

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions get_books.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,15 @@
import argparse, signal, asyncio, aioredis
import argparse, asyncio
import redis.asyncio as aioredis
from services.shelving_service import ShelvingService
from services.lending_service import LendingService, BOOKS_FOR_SHELVING_STREAM_KEY

# Main event loop
loop = asyncio.get_event_loop()

# Configuration
LENDING_REQUESTS_STREAM_KEY = "lending_requests_event_stream"
BOOK_RETURN_REQUESTS_STREAM_KEY = "book_return_requests_event_stream"


async def main(action, user, books, address, db, password):
pool = await aioredis.create_redis_pool(address, db=db, password=password,
minsize=4, maxsize=10, loop=loop, encoding='utf8')
client = aioredis.from_url(address, db=db, password=password, decode_responses=True)

# Choose the target stream based on `action`
stream_key = None
Expand All @@ -25,9 +22,12 @@ async def main(action, user, books, address, db, password):
exit(1)

# Send the request
await pool.xadd(stream_key, {'user_id': user, 'book_ids': ','.join(books)})
await client.xadd(stream_key, {'user_id': user, 'book_ids': ','.join(books)})
print("OK")

# Utilizing asyncio Redis requires an explicit disconnect of the connection since there is no asyncio deconstructor magic method.
await client.aclose()

if __name__ == '__main__':
parser = argparse.ArgumentParser(description='CLI tool to get and return books.')
parser.add_argument('action', choices=['request', 'return'], type=str,
Expand All @@ -44,7 +44,7 @@ async def main(action, user, books, address, db, password):
help='redis password')
args = parser.parse_args()

loop.run_until_complete(main(action=args.action, user=args.name, books=args.books,
asyncio.run(main(action=args.action, user=args.name, books=args.books,
address=args.address, db=args.db, password=args.password))


34 changes: 20 additions & 14 deletions main.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
import argparse, signal, asyncio, aioredis
import argparse, signal, asyncio
import redis.asyncio as aioredis
from services.shelving_service import ShelvingService
from services.lending_service import LendingService, BOOKS_FOR_SHELVING_STREAM_KEY

# Main event loop
loop = asyncio.get_event_loop()

# Global list of running services, used by `graceful_shutdown`
RUNNING_SERVICES = None
SHUTTING_DOWN = False
Expand All @@ -21,23 +19,28 @@ def graceful_shutdown():
print("\nShutting down (might take up to 10s)...")

async def main(instance_name, force, address, db, password):
pool = await aioredis.create_redis_pool(address, db=db, password=password,
minsize=4, maxsize=10, loop=loop, encoding='utf8')
# This is one way to get a client backed by a connection pool.
# This has the same result as the instructions below.
# conn = aioredis.from_url(address, db=db, password=password, decode_responses=True)

pool = aioredis.ConnectionPool().from_url(address, db=db, password=password, decode_responses=True)
client = aioredis.Redis.from_pool(pool)

lock_key = f"instance_lock:{instance_name}"
if not force:
if not await pool.setnx(lock_key, 'locked'):
if not await client.setnx(lock_key, 'locked'):
print("There might be another instance with the same name running.")
print("Use the -f option to force launching anyway.")
print("For the service to work correctly, each running instance must have a unique name.")
exit(1)

# Setup a singnal handler for graceful shutdown
# Setup a signal handler for graceful shutdown
loop = asyncio.get_running_loop()
loop.add_signal_handler(signal.SIGINT, graceful_shutdown)

# Instantiate LendingService and ShelvingService
shelving_service = ShelvingService(pool, instance_name, BOOKS_FOR_SHELVING_STREAM_KEY)
lending_service = LendingService(pool, instance_name, shelving_service)
shelving_service = ShelvingService(client, instance_name, BOOKS_FOR_SHELVING_STREAM_KEY)
lending_service = LendingService(client, instance_name, shelving_service)

# Add services to `RUNNING_SERVICES` list to enable graceful shutdown
global RUNNING_SERVICES
Expand All @@ -47,7 +50,12 @@ async def main(instance_name, force, address, db, password):
await asyncio.gather(lending_service.launch_service(), shelving_service.launch_service())

# Release the instance name lock when shutting down
await pool.delete(lock_key)
await client.delete(lock_key)

# Utilizing asyncio Redis requires an explicit disconnect of the connection since there is
# no asyncio deconstructor magic method.
print("Closing client")
await client.aclose()


if __name__ == '__main__':
Expand All @@ -64,7 +72,5 @@ async def main(instance_name, force, address, db, password):
help='redis password')
args = parser.parse_args()

loop.run_until_complete(main(instance_name=args.name, force=args.force,
asyncio.run(main(instance_name=args.name, force=args.force,
address=args.address, db=args.db, password=args.password))


2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
aioredis
redis
122 changes: 63 additions & 59 deletions services/lending_service.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import asyncio
import aioredis
loop = asyncio.get_event_loop()
import redis.asyncio as aioredis
import redis


## -- SERVICE STATE (stored in Redis) --
Expand Down Expand Up @@ -46,8 +46,8 @@


class LendingService:
def __init__(self, pool, instance_name, shelving_service):
self.pool = pool
def __init__(self, client, instance_name, shelving_service):
self.client = client
self.instance_name = instance_name
self.shutting_down = False
self.shelving_service = shelving_service
Expand All @@ -56,41 +56,47 @@ def __init__(self, pool, instance_name, shelving_service):
async def launch_service(self):
# Ensure Redis has a consumer group defined for each relevant stream
try:
await self.pool.execute("XGROUP", "CREATE", LENDING_REQUESTS_STREAM_KEY, LENDING_SERVICE_CONSUMER_GROUP, "$", "MKSTREAM")
except aioredis.errors.ReplyError as e:
await self.client.xgroup_create(name=LENDING_REQUESTS_STREAM_KEY, groupname=LENDING_SERVICE_CONSUMER_GROUP, id='$', mkstream=True)
except redis.exceptions.ResponseError as e:
assert e.args[0].startswith("BUSYGROUP")
try:
await self.pool.execute("XGROUP", "CREATE", BOOK_RETURN_REQUESTS_STREAM_KEY, LENDING_SERVICE_CONSUMER_GROUP, "$", "MKSTREAM")
except aioredis.errors.ReplyError as e:
await self.client.xgroup_create(name=BOOK_RETURN_REQUESTS_STREAM_KEY, groupname=LENDING_SERVICE_CONSUMER_GROUP, id='$', mkstream=True)
except redis.exceptions.ResponseError as e:
assert e.args[0].startswith("BUSYGROUP")

# Ensure Redis has the required Lua scripts
global REFILL_AUTOMATED_STORAGE_LUA, BOOKS_TO_STREAM_LUA, APPLY_BOOK_RETURN_LUA
REFILL_AUTOMATED_STORAGE_LUA = await self.pool.script_load(open('lua/refill_automated_storage.lua', 'r').read())
BOOKS_TO_STREAM_LUA = await self.pool.script_load(open('lua/books_to_stream.lua', 'r').read())
APPLY_BOOK_RETURN_LUA = await self.pool.script_load(open('lua/apply_book_return.lua', 'r').read())
REFILL_AUTOMATED_STORAGE_LUA = await self.client.script_load(open('lua/refill_automated_storage.lua', 'r').read())
BOOKS_TO_STREAM_LUA = await self.client.script_load(open('lua/books_to_stream.lua', 'r').read())
APPLY_BOOK_RETURN_LUA = await self.client.script_load(open('lua/apply_book_return.lua', 'r').read())

# First we retrieve any potential pending message
events = await self.pool.xread_group(LENDING_SERVICE_CONSUMER_GROUP, self.instance_name,
[LENDING_REQUESTS_STREAM_KEY, BOOK_RETURN_REQUESTS_STREAM_KEY], latest_ids=["0", "0"])
if len(events) > 0:
print("[WARN] Found claimed events that need processing, resuming...")
streams = await self.client.xreadgroup(LENDING_SERVICE_CONSUMER_GROUP, self.instance_name,
{LENDING_REQUESTS_STREAM_KEY: '0', BOOK_RETURN_REQUESTS_STREAM_KEY: '0'})

for stream in streams:
events = stream[1]
if len(events) > 0:
print("[WARN] Found claimed events that need processing, resuming...")
break

# This is the main loop
print("Ready to process events...")
with await self.pool as conn:
while not self.shutting_down:
while not self.shutting_down:
for stream in streams:
tasks = []
for stream_name, event_id, message in events:
stream_name = stream[0]
events = stream[1]
for event_id, message in events:
if stream_name == LENDING_REQUESTS_STREAM_KEY:
tasks.append(self.process_lending_request(event_id, message))
elif stream_name == BOOK_RETURN_REQUESTS_STREAM_KEY:
tasks.append(self.process_returned_books_request(event_id, message))
await asyncio.gather(*tasks)

# Gather new events to process (batch size = 10)
events = await conn.xread_group(LENDING_SERVICE_CONSUMER_GROUP, self.instance_name,
[LENDING_REQUESTS_STREAM_KEY, BOOK_RETURN_REQUESTS_STREAM_KEY], timeout=10000, count=10, latest_ids=[">", ">"])
# Gather new events to process (batch size = 10)
streams = await self.client.xreadgroup(LENDING_SERVICE_CONSUMER_GROUP, self.instance_name,
{LENDING_REQUESTS_STREAM_KEY: '>', BOOK_RETURN_REQUESTS_STREAM_KEY: '>'}, block=10000, count=10)


async def process_lending_request(self, request_id, request):
Expand All @@ -104,7 +110,7 @@ async def process_lending_request(self, request_id, request):
# See if Redis contains already a partial set of reserved books.
# This can happen in the case of a crash. In such case, we fetch
# the set to resume from where we left off.
books_found = await self.pool.smembers(request_reserved_books_key)
books_found = await self.client.smembers(request_reserved_books_key)
if len(books_found) > 0:
print("[WARN] Found partially-processed transaction, resuming.")

Expand All @@ -114,33 +120,33 @@ async def process_lending_request(self, request_id, request):
continue

# Is the book lent already?
if await self.pool.hexists(LENT_BOOKS_KEY, book_id):
if await self.client.hexists(LENT_BOOKS_KEY, book_id):
continue

# Try to get the book from automated storage
if await self.pool.smove(AUTOMATED_BOOK_STORAGE_KEY, request_reserved_books_key, book_id):
books_found.append(book_id)
if await self.client.smove(AUTOMATED_BOOK_STORAGE_KEY, request_reserved_books_key, book_id):
books_found.add(book_id)
continue

# Try to get the book from ShelvingService
# get_book() is idempotent, so it's ok to call it again
# in case of a crash.
if await self.shelving_service.get_book(book_id, request_id):
await self.pool.sadd(request_reserved_books_key, book_id)
books_found.append(book_id)
await self.client.sadd(request_reserved_books_key, book_id)
books_found.add(book_id)
continue

# Requests for which we can't find any book get denied.
# This is an arbitrary choice, but it makes more sense than
# to accept a request and then give out 0 books.
if len(books_found) == 0:
await self.pool.xack(LENDING_REQUESTS_STREAM_KEY, LENDING_SERVICE_CONSUMER_GROUP, request_id)
await self.client.xack(LENDING_REQUESTS_STREAM_KEY, LENDING_SERVICE_CONSUMER_GROUP, request_id)
print(f"Request: [{request_id}] by {user_id} DENIED.")
print(f" Cause: none of the requested books is available.\n")
return

# Reserving a connection for the upcoming transaction
with await self.pool as conn:
async with self.client.pipeline(transaction=True) as conn:

# The transaction can fail if another process is changing `user_book_counts_key`.
# We retry indefinitely but know that the transaction will eventually succeed
Expand All @@ -159,34 +165,33 @@ async def process_lending_request(self, request_id, request):
# all reservations.

await conn.unwatch()
transaction = conn.multi_exec()
conn.multi()

# Refill local storage
transaction.evalsha(REFILL_AUTOMATED_STORAGE_LUA,
keys=[AUTOMATED_BOOK_STORAGE_KEY, request_reserved_books_key],
args=[AUTOMATED_BOOK_STORAGE_CAPACITY])
keys=[AUTOMATED_BOOK_STORAGE_KEY, request_reserved_books_key]
conn.evalsha(REFILL_AUTOMATED_STORAGE_LUA, len(keys), *keys, AUTOMATED_BOOK_STORAGE_CAPACITY)

# Return remaining books
transaction.evalsha(BOOKS_TO_STREAM_LUA,
keys=[request_reserved_books_key, BOOKS_FOR_SHELVING_STREAM_KEY])
keys=[request_reserved_books_key, BOOKS_FOR_SHELVING_STREAM_KEY]
conn.evalsha(BOOKS_TO_STREAM_LUA, len(keys), *keys)

# Cleanup
transaction.unlink(request_reserved_books_key)
transaction.xack(LENDING_REQUESTS_STREAM_KEY, LENDING_SERVICE_CONSUMER_GROUP, request_id)
await transaction.execute()
conn.unlink(request_reserved_books_key)
conn.xack(LENDING_REQUESTS_STREAM_KEY, LENDING_SERVICE_CONSUMER_GROUP, request_id)
await conn.execute()
print(f"Request: [{request_id}] by {user_id} DENIED.")
print(f" Cause: too many books (user has {books_in_hand} books, requested {len(requested_books)} of which {len(books_found)} were found).\n")
else:
# The user has enough capacity to get all the found books.
# All temporarily reserved books will now be committed.
transaction = conn.multi_exec()
transaction.incrby(user_book_counts_key, len(books_found))
conn.multi()
conn.incrby(user_book_counts_key, len(books_found))
for book_id in books_found:
transaction.hset(LENT_BOOKS_KEY, book_id, user_id)
transaction.unlink(request_reserved_books_key)
transaction.xack(LENDING_REQUESTS_STREAM_KEY, LENDING_SERVICE_CONSUMER_GROUP, request_id)
conn.hset(LENT_BOOKS_KEY, book_id, user_id)
conn.unlink(request_reserved_books_key)
conn.xack(LENDING_REQUESTS_STREAM_KEY, LENDING_SERVICE_CONSUMER_GROUP, request_id)
try:
await transaction.execute()
await conn.execute()
print(f"Request: [{request_id}] by {user_id} ACCEPTED.")
print(f"Books:")
for i, book_id in enumerate(books_found):
Expand All @@ -205,30 +210,29 @@ async def process_returned_books_request(self, return_request_id, return_request
user_book_counts_key = BOOK_COUNTS_KEY_TEMPLATE.format(user_id=user_id)
temp_set_key = f"return:{return_request_id}:temp"

with await self.pool as conn:
async with self.client.pipeline(transaction=True) as conn:
conn.multi()
# Start the transaction and load the data
transaction = conn.multi_exec()
transaction.sadd(temp_set_key, *book_id_list)
conn.sadd(temp_set_key, *book_id_list)

# Apply book returns (updates `lent_books` and the user's book count)
transaction.evalsha(APPLY_BOOK_RETURN_LUA,
keys=[LENT_BOOKS_KEY, temp_set_key, user_book_counts_key],
args=[user_id])
keys=[LENT_BOOKS_KEY, temp_set_key, user_book_counts_key]
conn.evalsha(APPLY_BOOK_RETURN_LUA, len(keys), *keys, user_id)

# Refill automated storage
transaction.evalsha(REFILL_AUTOMATED_STORAGE_LUA,
keys=[AUTOMATED_BOOK_STORAGE_KEY, temp_set_key],
args=[AUTOMATED_BOOK_STORAGE_CAPACITY])
keys=[AUTOMATED_BOOK_STORAGE_KEY, temp_set_key]
conn.evalsha(REFILL_AUTOMATED_STORAGE_LUA, len(keys), *keys, AUTOMATED_BOOK_STORAGE_CAPACITY)

# Return remaining books
transaction.evalsha(BOOKS_TO_STREAM_LUA,
keys=[temp_set_key, BOOKS_FOR_SHELVING_STREAM_KEY])
keys=[temp_set_key, BOOKS_FOR_SHELVING_STREAM_KEY]
conn.evalsha(BOOKS_TO_STREAM_LUA, len(keys), *keys)

# Cleanup
transaction.unlink(temp_set_key)
transaction.xack(BOOK_RETURN_REQUESTS_STREAM_KEY, LENDING_SERVICE_CONSUMER_GROUP, return_request_id)
await transaction.execute()
print(f"Book Return [{return_request_id}] PROCESSED \n")
conn.unlink(temp_set_key)
conn.xack(BOOK_RETURN_REQUESTS_STREAM_KEY, LENDING_SERVICE_CONSUMER_GROUP, return_request_id)
await conn.execute()

print(f"Book Return {return_request_id} PROCESSED")



Loading