From a4bc0298101367bad0de76637387bba0007677d0 Mon Sep 17 00:00:00 2001 From: Jonathan Edey <145066863+jonathanedey@users.noreply.github.com> Date: Thu, 10 Apr 2025 16:24:07 -0400 Subject: [PATCH 1/5] Added minimal support for sending FCM messages in async using HTTP/2 (#870) * httpx async_send_each prototype * Clean up code and lint * fix: Add extra dependancy for http2 * fix: reset message batch limit to 500 * fix: Add new import to `setup.py` --- firebase_admin/_utils.py | 86 +++++++++++++ firebase_admin/messaging.py | 201 +++++++++++++++++++++++++++--- integration/conftest.py | 25 ++-- integration/test_messaging.py | 82 +++++++++++++ requirements.txt | 4 +- setup.py | 1 + tests/test_messaging.py | 225 ++++++++++++++++++++++++++++++++++ 7 files changed, 599 insertions(+), 25 deletions(-) diff --git a/firebase_admin/_utils.py b/firebase_admin/_utils.py index b6e292546..765d11587 100644 --- a/firebase_admin/_utils.py +++ b/firebase_admin/_utils.py @@ -16,9 +16,11 @@ import json from platform import python_version +from typing import Callable, Optional import google.auth import requests +import httpx import firebase_admin from firebase_admin import exceptions @@ -128,6 +130,36 @@ def handle_platform_error_from_requests(error, handle_func=None): return exc if exc else _handle_func_requests(error, message, error_dict) +def handle_platform_error_from_httpx( + error: httpx.HTTPError, + handle_func: Optional[Callable[..., Optional[exceptions.FirebaseError]]] = None +) -> exceptions.FirebaseError: + """Constructs a ``FirebaseError`` from the given httpx error. + + This can be used to handle errors returned by Google Cloud Platform (GCP) APIs. + + Args: + error: An error raised by the httpx module while making an HTTP call to a GCP API. + handle_func: A function that can be used to handle platform errors in a custom way. When + specified, this function will be called with three arguments. It has the same + signature as ```_handle_func_httpx``, but may return ``None``. + + Returns: + FirebaseError: A ``FirebaseError`` that can be raised to the user code. + """ + + if isinstance(error, httpx.HTTPStatusError): + response = error.response + content = response.content.decode() + status_code = response.status_code + error_dict, message = _parse_platform_error(content, status_code) + exc = None + if handle_func: + exc = handle_func(error, message, error_dict) + + return exc if exc else _handle_func_httpx(error, message, error_dict) + return handle_httpx_error(error) + def handle_operation_error(error): """Constructs a ``FirebaseError`` from the given operation error. @@ -204,6 +236,60 @@ def handle_requests_error(error, message=None, code=None): err_type = _error_code_to_exception_type(code) return err_type(message=message, cause=error, http_response=error.response) +def _handle_func_httpx(error: httpx.HTTPError, message, error_dict) -> exceptions.FirebaseError: + """Constructs a ``FirebaseError`` from the given GCP error. + + Args: + error: An error raised by the httpx module while making an HTTP call. + message: A message to be included in the resulting ``FirebaseError``. + error_dict: Parsed GCP error response. + + Returns: + FirebaseError: A ``FirebaseError`` that can be raised to the user code or None. + """ + code = error_dict.get('status') + return handle_httpx_error(error, message, code) + + +def handle_httpx_error(error: httpx.HTTPError, message=None, code=None) -> exceptions.FirebaseError: + """Constructs a ``FirebaseError`` from the given httpx error. + + This method is agnostic of the remote service that produced the error, whether it is a GCP + service or otherwise. Therefore, this method does not attempt to parse the error response in + any way. + + Args: + error: An error raised by the httpx module while making an HTTP call. + message: A message to be included in the resulting ``FirebaseError`` (optional). If not + specified the string representation of the ``error`` argument is used as the message. + code: A GCP error code that will be used to determine the resulting error type (optional). + If not specified the HTTP status code on the error response is used to determine a + suitable error code. + + Returns: + FirebaseError: A ``FirebaseError`` that can be raised to the user code. + """ + if isinstance(error, httpx.TimeoutException): + return exceptions.DeadlineExceededError( + message='Timed out while making an API call: {0}'.format(error), + cause=error) + if isinstance(error, httpx.ConnectError): + return exceptions.UnavailableError( + message='Failed to establish a connection: {0}'.format(error), + cause=error) + if isinstance(error, httpx.HTTPStatusError): + print("printing status error", error) + if not code: + code = _http_status_to_error_code(error.response.status_code) + if not message: + message = str(error) + + err_type = _error_code_to_exception_type(code) + return err_type(message=message, cause=error, http_response=error.response) + + return exceptions.UnknownError( + message='Unknown error while making a remote service call: {0}'.format(error), + cause=error) def _http_status_to_error_code(status): """Maps an HTTP status to a platform error code.""" diff --git a/firebase_admin/messaging.py b/firebase_admin/messaging.py index d2ad04a04..abac5ae54 100644 --- a/firebase_admin/messaging.py +++ b/firebase_admin/messaging.py @@ -14,21 +14,30 @@ """Firebase Cloud Messaging module.""" +from __future__ import annotations +from typing import Callable, List, Optional import concurrent.futures import json import warnings +import asyncio import requests +import httpx +from google.auth import credentials +from google.auth.transport import requests as auth_requests from googleapiclient import http from googleapiclient import _auth import firebase_admin -from firebase_admin import _http_client -from firebase_admin import _messaging_encoder -from firebase_admin import _messaging_utils -from firebase_admin import _gapic_utils -from firebase_admin import _utils -from firebase_admin import exceptions +from firebase_admin import ( + _http_client, + _messaging_encoder, + _messaging_utils, + _gapic_utils, + _utils, + exceptions, + App +) _MESSAGING_ATTRIBUTE = '_messaging' @@ -66,6 +75,7 @@ 'send_all', 'send_multicast', 'send_each', + 'send_each_async', 'send_each_for_multicast', 'subscribe_to_topic', 'unsubscribe_from_topic', @@ -97,10 +107,10 @@ UnregisteredError = _messaging_utils.UnregisteredError -def _get_messaging_service(app): +def _get_messaging_service(app: Optional[App]) -> _MessagingService: return _utils.get_app_service(app, _MESSAGING_ATTRIBUTE, _MessagingService) -def send(message, dry_run=False, app=None): +def send(message, dry_run=False, app: Optional[App] = None): """Sends the given message via Firebase Cloud Messaging (FCM). If the ``dry_run`` mode is enabled, the message will not be actually delivered to the @@ -140,6 +150,9 @@ def send_each(messages, dry_run=False, app=None): """ return _get_messaging_service(app).send_each(messages, dry_run) +async def send_each_async(messages, dry_run=True, app: Optional[App] = None) -> BatchResponse: + return await _get_messaging_service(app).send_each_async(messages, dry_run) + def send_each_for_multicast(multicast_message, dry_run=False, app=None): """Sends the given mutlicast message to each token via Firebase Cloud Messaging (FCM). @@ -321,21 +334,21 @@ def errors(self): class BatchResponse: """The response received from a batch request to the FCM API.""" - def __init__(self, responses): + def __init__(self, responses: List[SendResponse]) -> None: self._responses = responses self._success_count = len([resp for resp in responses if resp.success]) @property - def responses(self): + def responses(self) -> List[SendResponse]: """A list of ``messaging.SendResponse`` objects (possibly empty).""" return self._responses @property - def success_count(self): + def success_count(self) -> int: return self._success_count @property - def failure_count(self): + def failure_count(self) -> int: return len(self.responses) - self.success_count @@ -363,6 +376,56 @@ def exception(self): """A ``FirebaseError`` if an error occurs while sending the message to the FCM service.""" return self._exception +# Auth Flow +# TODO: Remove comments +# The aim here is to be able to get auth credentials right before the request is sent. +# This is similar to what is done in transport.requests.AuthorizedSession(). +# We can then pass this in at the client level. + +# Notes: +# - This implementations does not cover timeouts on requests sent to refresh credentials. +# - Uses HTTP/1 and a blocking credential for refreshing. +class GoogleAuthCredentialFlow(httpx.Auth): + """Google Auth Credential Auth Flow""" + def __init__(self, credential: credentials.Credentials): + self._credential = credential + self._max_refresh_attempts = 2 + self._refresh_status_codes = (401,) + + def apply_auth_headers(self, request: httpx.Request): + # Build request used to refresh credentials if needed + auth_request = auth_requests.Request() + # This refreshes the credentials if needed and mutates the request headers to + # contain access token and any other google auth headers + self._credential.before_request(auth_request, request.method, request.url, request.headers) + + + def auth_flow(self, request: httpx.Request): + # Keep original headers since `credentials.before_request` mutates the passed headers and we + # want to keep the original in cause we need an auth retry. + _original_headers = request.headers.copy() + + _credential_refresh_attempt = 0 + while _credential_refresh_attempt <= self._max_refresh_attempts: + # copy original headers + request.headers = _original_headers.copy() + # mutates request headers + self.apply_auth_headers(request) + + # Continue to perform the request + # yield here dispatches the request and returns with the response + response: httpx.Response = yield request + + # We can check the result of the response and determine in we need to retry + # on refreshable status codes. Current transport.requests.AuthorizedSession() + # only does this on 401 errors. We should do the same. + if response.status_code in self._refresh_status_codes: + _credential_refresh_attempt += 1 + else: + break + # Last yielded response is auto returned. + + class _MessagingService: """Service class that implements Firebase Cloud Messaging (FCM) functionality.""" @@ -381,7 +444,7 @@ class _MessagingService: 'UNREGISTERED': UnregisteredError, } - def __init__(self, app): + def __init__(self, app) -> None: project_id = app.project_id if not project_id: raise ValueError( @@ -396,6 +459,12 @@ def __init__(self, app): timeout = app.options.get('httpTimeout', _http_client.DEFAULT_TIMEOUT_SECONDS) self._credential = app.credential.get_credential() self._client = _http_client.JsonHttpClient(credential=self._credential, timeout=timeout) + self._async_client = httpx.AsyncClient( + http2=True, + auth=GoogleAuthCredentialFlow(self._credential), + timeout=timeout, + transport=HttpxRetryTransport() + ) self._build_transport = _auth.authorized_http @classmethod @@ -448,6 +517,40 @@ def send_data(data): message='Unknown error while making remote service calls: {0}'.format(error), cause=error) + async def send_each_async(self, messages: List[Message], dry_run: bool = True) -> BatchResponse: + """Sends the given messages to FCM via the FCM v1 API.""" + if not isinstance(messages, list): + raise ValueError('messages must be a list of messaging.Message instances.') + if len(messages) > 500: + raise ValueError('messages must not contain more than 500 elements.') + + async def send_data(data): + try: + resp = await self._async_client.request( + 'post', + url=self._fcm_url, + headers=self._fcm_headers, + json=data) + # HTTP/2 check + if resp.http_version != 'HTTP/2': + raise Exception('This messages was not sent with HTTP/2') + resp.raise_for_status() + # except httpx.HTTPStatusError as exception: + except httpx.HTTPError as exception: + return SendResponse(resp=None, exception=self._handle_fcm_httpx_error(exception)) + else: + return SendResponse(resp.json(), exception=None) + + message_data = [self._message_data(message, dry_run) for message in messages] + try: + responses = await asyncio.gather(*[send_data(message) for message in message_data]) + return BatchResponse(responses) + except Exception as error: + raise exceptions.UnknownError( + message='Unknown error while making remote service calls: {0}'.format(error), + cause=error) + + def send_all(self, messages, dry_run=False): """Sends the given messages to FCM via the batch API.""" if not isinstance(messages, list): @@ -533,6 +636,11 @@ def _handle_fcm_error(self, error): return _utils.handle_platform_error_from_requests( error, _MessagingService._build_fcm_error_requests) + def _handle_fcm_httpx_error(self, error: httpx.HTTPError) -> exceptions.FirebaseError: + """Handles errors received from the FCM API.""" + return _utils.handle_platform_error_from_httpx( + error, _MessagingService._build_fcm_error_httpx) + def _handle_iid_error(self, error): """Handles errors received from the Instance ID API.""" if error.response is None: @@ -562,6 +670,14 @@ def _handle_batch_error(self, error): return _gapic_utils.handle_platform_error_from_googleapiclient( error, _MessagingService._build_fcm_error_googleapiclient) + # TODO: Remove comments + # We should be careful to clean up the httpx clients. + # Since we are using an async client we must also close in async. However we can sync wrap this. + # The close method is called by the app on shutdown/clean-up of each service. We don't seem to + # make use of this much elsewhere. + def close(self) -> None: + asyncio.run(self._async_client.aclose()) + @classmethod def _build_fcm_error_requests(cls, error, message, error_dict): """Parses an error response from the FCM API and creates a FCM-specific exception if @@ -569,6 +685,19 @@ def _build_fcm_error_requests(cls, error, message, error_dict): exc_type = cls._build_fcm_error(error_dict) return exc_type(message, cause=error, http_response=error.response) if exc_type else None + @classmethod + def _build_fcm_error_httpx( + cls, error: httpx.HTTPError, message, error_dict + ) -> Optional[exceptions.FirebaseError]: + """Parses a httpx error response from the FCM API and creates a FCM-specific exception if + appropriate.""" + exc_type = cls._build_fcm_error(error_dict) + if isinstance(error, httpx.HTTPStatusError): + return exc_type( + message, cause=error, http_response=error.response) if exc_type else None + return exc_type(message, cause=error) if exc_type else None + + @classmethod def _build_fcm_error_googleapiclient(cls, error, message, error_dict, http_response): """Parses an error response from the FCM API and creates a FCM-specific exception if @@ -577,7 +706,7 @@ def _build_fcm_error_googleapiclient(cls, error, message, error_dict, http_respo return exc_type(message, cause=error, http_response=http_response) if exc_type else None @classmethod - def _build_fcm_error(cls, error_dict): + def _build_fcm_error(cls, error_dict) -> Optional[Callable[..., exceptions.FirebaseError]]: if not error_dict: return None fcm_code = None @@ -585,4 +714,46 @@ def _build_fcm_error(cls, error_dict): if detail.get('@type') == 'type.googleapis.com/google.firebase.fcm.v1.FcmError': fcm_code = detail.get('errorCode') break - return _MessagingService.FCM_ERROR_TYPES.get(fcm_code) + return _MessagingService.FCM_ERROR_TYPES.get(fcm_code) if fcm_code else None + + +# TODO: Remove comments +# Notes: +# This implementation currently only covers basic retires for pre-defined status errors +class HttpxRetryTransport(httpx.AsyncBaseTransport): + """HTTPX transport with retry logic.""" + # We could also support passing kwargs here + def __init__(self, **kwargs) -> None: + # Hardcoded settings for now + self._retryable_status_codes = (500, 503,) + self._max_retry_count = 4 + + # - We use a full AsyncHTTPTransport under the hood to make use of it's + # fully implemented `handle_async_request()`. + # - We could consider making the `HttpxRetryTransport`` class extend a + # `AsyncHTTPTransport` instead and use the parent class's methods to handle + # requests. + # - We should also ensure that that transport's internal retry is + # not enabled. + transport_kwargs = kwargs.copy() + transport_kwargs.update({'retries': 0, 'http2': True}) + self._wrapped_transport = httpx.AsyncHTTPTransport(**transport_kwargs) + + + async def handle_async_request(self, request: httpx.Request) -> httpx.Response: + _retry_count = 0 + + while True: + # Dispatch request + # Let exceptions pass through for now + response = await self._wrapped_transport.handle_async_request(request) + + # Check if request is retryable + if response.status_code in self._retryable_status_codes: + _retry_count += 1 + + # Return if retries exhausted + if _retry_count > self._max_retry_count: + return response + else: + return response diff --git a/integration/conftest.py b/integration/conftest.py index 71f53f612..bdecca40e 100644 --- a/integration/conftest.py +++ b/integration/conftest.py @@ -15,8 +15,9 @@ """pytest configuration and global fixtures for integration tests.""" import json -import asyncio +# import asyncio import pytest +from pytest_asyncio import is_async_test import firebase_admin from firebase_admin import credentials @@ -72,11 +73,17 @@ def api_key(request): with open(path) as keyfile: return keyfile.read().strip() -@pytest.fixture(scope="session") -def event_loop(): - """Create an instance of the default event loop for test session. - This avoids early eventloop closure. - """ - loop = asyncio.get_event_loop_policy().new_event_loop() - yield loop - loop.close() +# @pytest.fixture(scope="session") +# def event_loop(): +# """Create an instance of the default event loop for test session. +# This avoids early eventloop closure. +# """ +# loop = asyncio.get_event_loop_policy().new_event_loop() +# yield loop +# loop.close() + +def pytest_collection_modifyitems(items): + pytest_asyncio_tests = (item for item in items if is_async_test(item)) + session_scope_marker = pytest.mark.asyncio(loop_scope="session") + for async_test in pytest_asyncio_tests: + async_test.add_marker(session_scope_marker, append=False) diff --git a/integration/test_messaging.py b/integration/test_messaging.py index 4c1d7d0dc..af35ce01b 100644 --- a/integration/test_messaging.py +++ b/integration/test_messaging.py @@ -221,3 +221,85 @@ def test_subscribe(): def test_unsubscribe(): resp = messaging.unsubscribe_from_topic(_REGISTRATION_TOKEN, 'mock-topic') assert resp.success_count + resp.failure_count == 1 + +@pytest.mark.asyncio +async def test_send_each_async(): + messages = [ + messaging.Message( + topic='foo-bar', notification=messaging.Notification('Title', 'Body')), + messaging.Message( + topic='foo-bar', notification=messaging.Notification('Title', 'Body')), + messaging.Message( + token='not-a-token', notification=messaging.Notification('Title', 'Body')), + ] + + batch_response = await messaging.send_each_async(messages, dry_run=True) + + assert batch_response.success_count == 2 + assert batch_response.failure_count == 1 + assert len(batch_response.responses) == 3 + + response = batch_response.responses[0] + assert response.success is True + assert response.exception is None + assert re.match('^projects/.*/messages/.*$', response.message_id) + + response = batch_response.responses[1] + assert response.success is True + assert response.exception is None + assert re.match('^projects/.*/messages/.*$', response.message_id) + + response = batch_response.responses[2] + assert response.success is False + assert isinstance(response.exception, exceptions.InvalidArgumentError) + assert response.message_id is None + + +# @pytest.mark.asyncio +# async def test_send_each_async_error(): +# messages = [ +# messaging.Message( +# topic='foo-bar', notification=messaging.Notification('Title', 'Body')), +# messaging.Message( +# topic='foo-bar', notification=messaging.Notification('Title', 'Body')), +# messaging.Message( +# token='not-a-token', notification=messaging.Notification('Title', 'Body')), +# ] + +# batch_response = await messaging.send_each_async(messages, dry_run=True) + +# assert batch_response.success_count == 2 +# assert batch_response.failure_count == 1 +# assert len(batch_response.responses) == 3 + +# response = batch_response.responses[0] +# assert response.success is True +# assert response.exception is None +# assert re.match('^projects/.*/messages/.*$', response.message_id) + +# response = batch_response.responses[1] +# assert response.success is True +# assert response.exception is None +# assert re.match('^projects/.*/messages/.*$', response.message_id) + +# response = batch_response.responses[2] +# assert response.success is False +# assert isinstance(response.exception, exceptions.InvalidArgumentError) +# assert response.message_id is None + +@pytest.mark.asyncio +async def test_send_each_async_500(): + messages = [] + for msg_number in range(500): + topic = 'foo-bar-{0}'.format(msg_number % 10) + messages.append(messaging.Message(topic=topic)) + + batch_response = await messaging.send_each_async(messages, dry_run=True) + + assert batch_response.success_count == 500 + assert batch_response.failure_count == 0 + assert len(batch_response.responses) == 500 + for response in batch_response.responses: + assert response.success is True + assert response.exception is None + assert re.match('^projects/.*/messages/.*$', response.message_id) diff --git a/requirements.txt b/requirements.txt index fd5b0b39c..ba6f2f947 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,10 +5,12 @@ pytest-cov >= 2.4.0 pytest-localserver >= 0.4.1 pytest-asyncio >= 0.16.0 pytest-mock >= 3.6.1 +respx == 0.22.0 cachecontrol >= 0.12.14 google-api-core[grpc] >= 1.22.1, < 3.0.0dev; platform.python_implementation != 'PyPy' google-api-python-client >= 1.7.8 google-cloud-firestore >= 2.19.0; platform.python_implementation != 'PyPy' google-cloud-storage >= 1.37.1 -pyjwt[crypto] >= 2.5.0 \ No newline at end of file +pyjwt[crypto] >= 2.5.0 +httpx[http2] == 0.28.1 \ No newline at end of file diff --git a/setup.py b/setup.py index 23be6d481..e92d207aa 100644 --- a/setup.py +++ b/setup.py @@ -43,6 +43,7 @@ 'google-cloud-firestore>=2.19.0; platform.python_implementation != "PyPy"', 'google-cloud-storage>=1.37.1', 'pyjwt[crypto] >= 2.5.0', + 'httpx[http2] == 0.28.1', ] setup( diff --git a/tests/test_messaging.py b/tests/test_messaging.py index b7b5c69ba..aee3ae4e6 100644 --- a/tests/test_messaging.py +++ b/tests/test_messaging.py @@ -14,8 +14,10 @@ """Test cases for the firebase_admin.messaging module.""" import datetime +from itertools import chain, repeat import json import numbers +import respx from googleapiclient import http from googleapiclient import _helpers @@ -1924,6 +1926,229 @@ def test_send_each(self): assert all([r.success for r in batch_response.responses]) assert not any([r.exception for r in batch_response.responses]) + @respx.mock + @pytest.mark.asyncio + async def test_send_each_async(self): + responses = [ + respx.MockResponse(200, http_version='HTTP/2', json={'name': 'message-id1'}), + respx.MockResponse(200, http_version='HTTP/2', json={'name': 'message-id2'}), + respx.MockResponse(200, http_version='HTTP/2', json={'name': 'message-id3'}), + ] + msg1 = messaging.Message(topic='foo1') + msg2 = messaging.Message(topic='foo2') + msg3 = messaging.Message(topic='foo3') + route = respx.request( + 'POST', + 'https://fcm.googleapis.com/v1/projects/explicit-project-id/messages:send' + ).mock(side_effect=responses) + + batch_response = await messaging.send_each_async([msg1, msg2, msg3], dry_run=True) + + # try: + # batch_response = await messaging.send_each_async([msg1, msg2], dry_run=True) + # except Exception as error: + # if isinstance(error.cause.__cause__, StopIteration): + # raise Exception('Received more requests than mocks') + + assert batch_response.success_count == 3 + assert batch_response.failure_count == 0 + assert len(batch_response.responses) == 3 + assert [r.message_id for r in batch_response.responses] \ + == ['message-id1', 'message-id2', 'message-id3'] + assert all([r.success for r in batch_response.responses]) + assert not any([r.exception for r in batch_response.responses]) + + assert route.call_count == 3 + + @respx.mock + @pytest.mark.asyncio + async def test_send_each_async_error_401_fail_auth_retry(self): + payload = json.dumps({ + 'error': { + 'status': 'UNAUTHENTICATED', + 'message': 'test unauthenticated error', + 'details': [ + { + '@type': 'type.googleapis.com/google.firebase.fcm.v1.FcmError', + 'errorCode': 'SOME_UNKNOWN_CODE', + }, + ], + } + }) + + responses = repeat(respx.MockResponse(401, http_version='HTTP/2', content=payload)) + + msg1 = messaging.Message(topic='foo1') + route = respx.request( + 'POST', + 'https://fcm.googleapis.com/v1/projects/explicit-project-id/messages:send' + ).mock(side_effect=responses) + batch_response = await messaging.send_each_async([msg1], dry_run=True) + + assert route.call_count == 3 + assert batch_response.success_count == 0 + assert batch_response.failure_count == 1 + assert len(batch_response.responses) == 1 + exception = batch_response.responses[0].exception + assert isinstance(exception, exceptions.UnauthenticatedError) + + @respx.mock + @pytest.mark.asyncio + async def test_send_each_async_error_401_pass_on_auth_retry(self): + payload = json.dumps({ + 'error': { + 'status': 'UNAUTHENTICATED', + 'message': 'test unauthenticated error', + 'details': [ + { + '@type': 'type.googleapis.com/google.firebase.fcm.v1.FcmError', + 'errorCode': 'SOME_UNKNOWN_CODE', + }, + ], + } + }) + responses = [ + respx.MockResponse(401, http_version='HTTP/2', content=payload), + respx.MockResponse(200, http_version='HTTP/2', json={'name': 'message-id1'}), + ] + + msg1 = messaging.Message(topic='foo1') + route = respx.request( + 'POST', + 'https://fcm.googleapis.com/v1/projects/explicit-project-id/messages:send' + ).mock(side_effect=responses) + batch_response = await messaging.send_each_async([msg1], dry_run=True) + + assert route.call_count == 2 + assert batch_response.success_count == 1 + assert batch_response.failure_count == 0 + assert len(batch_response.responses) == 1 + assert [r.message_id for r in batch_response.responses] == ['message-id1'] + assert all([r.success for r in batch_response.responses]) + assert not any([r.exception for r in batch_response.responses]) + + @respx.mock + @pytest.mark.asyncio + async def test_send_each_async_error_500_fail_retry_config(self): + payload = json.dumps({ + 'error': { + 'status': 'INTERNAL', + 'message': 'test INTERNAL error', + 'details': [ + { + '@type': 'type.googleapis.com/google.firebase.fcm.v1.FcmError', + 'errorCode': 'SOME_UNKNOWN_CODE', + }, + ], + } + }) + + responses = repeat(respx.MockResponse(500, http_version='HTTP/2', content=payload)) + + msg1 = messaging.Message(topic='foo1') + route = respx.request( + 'POST', + 'https://fcm.googleapis.com/v1/projects/explicit-project-id/messages:send' + ).mock(side_effect=responses) + batch_response = await messaging.send_each_async([msg1], dry_run=True) + + assert route.call_count == 5 + assert batch_response.success_count == 0 + assert batch_response.failure_count == 1 + assert len(batch_response.responses) == 1 + exception = batch_response.responses[0].exception + assert isinstance(exception, exceptions.InternalError) + + + @respx.mock + @pytest.mark.asyncio + async def test_send_each_async_error_500_pass_on_retry_config(self): + payload = json.dumps({ + 'error': { + 'status': 'INTERNAL', + 'message': 'test INTERNAL error', + 'details': [ + { + '@type': 'type.googleapis.com/google.firebase.fcm.v1.FcmError', + 'errorCode': 'SOME_UNKNOWN_CODE', + }, + ], + } + }) + responses = chain( + [ + respx.MockResponse(500, http_version='HTTP/2', content=payload), + respx.MockResponse(500, http_version='HTTP/2', content=payload), + respx.MockResponse(500, http_version='HTTP/2', content=payload), + respx.MockResponse(500, http_version='HTTP/2', content=payload), + respx.MockResponse(200, http_version='HTTP/2', json={'name': 'message-id1'}), + ], + ) + + msg1 = messaging.Message(topic='foo1') + route = respx.request( + 'POST', + 'https://fcm.googleapis.com/v1/projects/explicit-project-id/messages:send' + ).mock(side_effect=responses) + batch_response = await messaging.send_each_async([msg1], dry_run=True) + + assert route.call_count == 5 + assert batch_response.success_count == 1 + assert batch_response.failure_count == 0 + assert len(batch_response.responses) == 1 + assert [r.message_id for r in batch_response.responses] == ['message-id1'] + assert all([r.success for r in batch_response.responses]) + assert not any([r.exception for r in batch_response.responses]) + + # @respx.mock + # @pytest.mark.asyncio + # async def test_send_each_async_error_request(self): + # payload = json.dumps({ + # 'error': { + # 'status': 'INTERNAL', + # 'message': 'test INTERNAL error', + # 'details': [ + # { + # '@type': 'type.googleapis.com/google.firebase.fcm.v1.FcmError', + # 'errorCode': 'SOME_UNKNOWN_CODE', + # }, + # ], + # } + # }) + # responses = chain( + # [ + # httpx.ConnectError("Test request error", request=httpx.Request('POST', 'URL')) + # # respx.MockResponse(500, http_version='HTTP/2', content=payload), + # ], + # # repeat( + # respx.MockResponse(200, http_version='HTTP/2', json={'name': 'message-id1'})), + # # respx.MockResponse(200, http_version='HTTP/2', json={'name': 'message-id3'}), + # ) + + # # responses = repeat(respx.MockResponse(500, http_version='HTTP/2', content=payload)) + + # msg1 = messaging.Message(topic='foo1') + # route = respx.request( + # 'POST', + # 'https://fcm.googleapis.com/v1/projects/explicit-project-id/messages:send' + # ).mock(side_effect=responses) + # batch_response = await messaging.send_each_async([msg1], dry_run=True) + + # assert route.call_count == 1 + # assert batch_response.success_count == 0 + # assert batch_response.failure_count == 1 + # assert len(batch_response.responses) == 1 + # exception = batch_response.responses[0].exception + # assert isinstance(exception, exceptions.UnavailableError) + + # # assert route.call_count == 4 + # # assert batch_response.success_count == 1 + # # assert batch_response.failure_count == 0 + # # assert len(batch_response.responses) == 1 + # # assert [r.message_id for r in batch_response.responses] == ['message-id1'] + # # assert all([r.success for r in batch_response.responses]) + # # assert not any([r.exception for r in batch_response.responses]) + @pytest.mark.parametrize('status', HTTP_ERROR_CODES) def test_send_each_detailed_error(self, status): success_payload = json.dumps({'name': 'message-id'}) From acf955a75088c15625fab69e6437b84a97adc986 Mon Sep 17 00:00:00 2001 From: Jonathan Edey <145066863+jonathanedey@users.noreply.github.com> Date: Wed, 21 May 2025 14:58:16 -0400 Subject: [PATCH 2/5] Refactored retry config into `_retry.py` and added support for exponential backoff and `Retry-After` header (#871) * Refactored retry config to `_retry.py` and added support for backoff and Retry-After * Added unit tests for `_retry.py` * Updated unit tests for HTTPX request errors * Address review comments --- firebase_admin/_retry.py | 224 ++++++++++++++++++ firebase_admin/messaging.py | 65 +----- tests/test_messaging.py | 69 ++---- tests/test_retry.py | 454 ++++++++++++++++++++++++++++++++++++ 4 files changed, 708 insertions(+), 104 deletions(-) create mode 100644 firebase_admin/_retry.py create mode 100644 tests/test_retry.py diff --git a/firebase_admin/_retry.py b/firebase_admin/_retry.py new file mode 100644 index 000000000..ef330cbd9 --- /dev/null +++ b/firebase_admin/_retry.py @@ -0,0 +1,224 @@ +# Copyright 2025 Google Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Internal retry logic module + +This module provides utilities for adding retry logic to HTTPX requests +""" + +from __future__ import annotations +import copy +import email.utils +import random +import re +import time +from types import CoroutineType +from typing import Any, Callable, List, Optional, Tuple +import logging +import asyncio +import httpx + +logger = logging.getLogger(__name__) + + +class HttpxRetry: + """HTTPX based retry config""" + # Status codes to be used for respecting `Retry-After` header + RETRY_AFTER_STATUS_CODES = frozenset([413, 429, 503]) + + # Default maximum backoff time. + DEFAULT_BACKOFF_MAX = 120 + + def __init__( + self, + max_retries: int = 10, + status_forcelist: Optional[List[int]] = None, + backoff_factor: float = 0, + backoff_max: float = DEFAULT_BACKOFF_MAX, + backoff_jitter: float = 0, + history: Optional[List[Tuple[ + httpx.Request, + Optional[httpx.Response], + Optional[Exception] + ]]] = None, + respect_retry_after_header: bool = False, + ) -> None: + self.retries_left = max_retries + self.status_forcelist = status_forcelist + self.backoff_factor = backoff_factor + self.backoff_max = backoff_max + self.backoff_jitter = backoff_jitter + if history: + self.history = history + else: + self.history = [] + self.respect_retry_after_header = respect_retry_after_header + + def copy(self) -> HttpxRetry: + """Creates a deep copy of this instance.""" + return copy.deepcopy(self) + + def is_retryable_response(self, response: httpx.Response) -> bool: + """Determine if a response implies that the request should be retried if possible.""" + if self.status_forcelist and response.status_code in self.status_forcelist: + return True + + has_retry_after = bool(response.headers.get("Retry-After")) + if ( + self.respect_retry_after_header + and has_retry_after + and response.status_code in self.RETRY_AFTER_STATUS_CODES + ): + return True + + return False + + def is_exhausted(self) -> bool: + """Determine if there are anymore more retires.""" + # retries_left is negative + return self.retries_left < 0 + + # Identical implementation of `urllib3.Retry.parse_retry_after()` + def _parse_retry_after(self, retry_after_header: str) -> float | None: + """Parses Retry-After string into a float with unit seconds.""" + seconds: float + # Whitespace: https://tools.ietf.org/html/rfc7230#section-3.2.4 + if re.match(r"^\s*[0-9]+\s*$", retry_after_header): + seconds = int(retry_after_header) + else: + retry_date_tuple = email.utils.parsedate_tz(retry_after_header) + if retry_date_tuple is None: + raise httpx.RemoteProtocolError(f"Invalid Retry-After header: {retry_after_header}") + + retry_date = email.utils.mktime_tz(retry_date_tuple) + seconds = retry_date - time.time() + + seconds = max(seconds, 0) + + return seconds + + def get_retry_after(self, response: httpx.Response) -> float | None: + """Determine the Retry-After time needed before sending the next request.""" + retry_after_header = response.headers.get('Retry-After', None) + if retry_after_header: + # Convert retry header to a float in seconds + return self._parse_retry_after(retry_after_header) + return None + + def get_backoff_time(self): + """Determine the backoff time needed before sending the next request.""" + # attempt_count is the number of previous request attempts + attempt_count = len(self.history) + # Backoff should be set to 0 until after first retry. + if attempt_count <= 1: + return 0 + backoff = self.backoff_factor * (2 ** (attempt_count-1)) + if self.backoff_jitter: + backoff += random.random() * self.backoff_jitter + return float(max(0, min(self.backoff_max, backoff))) + + async def sleep_for_backoff(self) -> None: + """Determine and wait the backoff time needed before sending the next request.""" + backoff = self.get_backoff_time() + logger.debug('Sleeping for backoff of %f seconds following failed request', backoff) + await asyncio.sleep(backoff) + + async def sleep(self, response: httpx.Response) -> None: + """Determine and wait the time needed before sending the next request.""" + if self.respect_retry_after_header: + retry_after = self.get_retry_after(response) + if retry_after: + logger.debug( + 'Sleeping for Retry-After header of %f seconds following failed request', + retry_after + ) + await asyncio.sleep(retry_after) + return + await self.sleep_for_backoff() + + def increment( + self, + request: httpx.Request, + response: Optional[httpx.Response] = None, + error: Optional[Exception] = None + ) -> None: + """Update the retry state based on request attempt.""" + self.retries_left -= 1 + self.history.append((request, response, error)) + + +class HttpxRetryTransport(httpx.AsyncBaseTransport): + """HTTPX transport with retry logic.""" + + DEFAULT_RETRY = HttpxRetry(max_retries=4, status_forcelist=[500, 503], backoff_factor=0.5) + + def __init__(self, retry: HttpxRetry = DEFAULT_RETRY, **kwargs) -> None: + self._retry = retry + + transport_kwargs = kwargs.copy() + transport_kwargs.update({'retries': 0, 'http2': True}) + # We use a full AsyncHTTPTransport under the hood that is already + # set up to handle requests. We also insure that that transport's internal + # retries are not allowed. + self._wrapped_transport = httpx.AsyncHTTPTransport(**transport_kwargs) + + async def handle_async_request(self, request: httpx.Request) -> httpx.Response: + return await self._dispatch_with_retry( + request, self._wrapped_transport.handle_async_request) + + async def _dispatch_with_retry( + self, + request: httpx.Request, + dispatch_method: Callable[[httpx.Request], CoroutineType[Any, Any, httpx.Response]] + ) -> httpx.Response: + """Sends a request with retry logic using a provided dispatch method.""" + # This request config is used across all requests that use this transport and therefore + # needs to be copied to be used for just this request and it's retries. + retry = self._retry.copy() + # First request + response, error = None, None + + while not retry.is_exhausted(): + + # First retry + if response: + await retry.sleep(response) + + # Need to reset here so only last attempt's error or response is saved. + response, error = None, None + + try: + logger.debug('Sending request in _dispatch_with_retry(): %r', request) + response = await dispatch_method(request) + logger.debug('Received response: %r', response) + except httpx.HTTPError as err: + logger.debug('Received error: %r', err) + error = err + + if response and not retry.is_retryable_response(response): + return response + + if error: + raise error + + retry.increment(request, response, error) + + if response: + return response + if error: + raise error + raise AssertionError('_dispatch_with_retry() ended with no response or exception') + + async def aclose(self) -> None: + await self._wrapped_transport.aclose() diff --git a/firebase_admin/messaging.py b/firebase_admin/messaging.py index abac5ae54..7f747db18 100644 --- a/firebase_admin/messaging.py +++ b/firebase_admin/messaging.py @@ -20,6 +20,7 @@ import json import warnings import asyncio +import logging import requests import httpx @@ -38,7 +39,9 @@ exceptions, App ) +from firebase_admin._retry import HttpxRetryTransport +logger = logging.getLogger(__name__) _MESSAGING_ATTRIBUTE = '_messaging' @@ -376,15 +379,6 @@ def exception(self): """A ``FirebaseError`` if an error occurs while sending the message to the FCM service.""" return self._exception -# Auth Flow -# TODO: Remove comments -# The aim here is to be able to get auth credentials right before the request is sent. -# This is similar to what is done in transport.requests.AuthorizedSession(). -# We can then pass this in at the client level. - -# Notes: -# - This implementations does not cover timeouts on requests sent to refresh credentials. -# - Uses HTTP/1 and a blocking credential for refreshing. class GoogleAuthCredentialFlow(httpx.Auth): """Google Auth Credential Auth Flow""" def __init__(self, credential: credentials.Credentials): @@ -410,6 +404,9 @@ def auth_flow(self, request: httpx.Request): # copy original headers request.headers = _original_headers.copy() # mutates request headers + logger.debug( + 'Refreshing credentials for request attempt %d', + _credential_refresh_attempt + 1) self.apply_auth_headers(request) # Continue to perform the request @@ -420,6 +417,9 @@ def auth_flow(self, request: httpx.Request): # on refreshable status codes. Current transport.requests.AuthorizedSession() # only does this on 401 errors. We should do the same. if response.status_code in self._refresh_status_codes: + logger.debug( + 'Request attempt %d failed due to unauthorized credentials', + _credential_refresh_attempt + 1) _credential_refresh_attempt += 1 else: break @@ -670,11 +670,6 @@ def _handle_batch_error(self, error): return _gapic_utils.handle_platform_error_from_googleapiclient( error, _MessagingService._build_fcm_error_googleapiclient) - # TODO: Remove comments - # We should be careful to clean up the httpx clients. - # Since we are using an async client we must also close in async. However we can sync wrap this. - # The close method is called by the app on shutdown/clean-up of each service. We don't seem to - # make use of this much elsewhere. def close(self) -> None: asyncio.run(self._async_client.aclose()) @@ -715,45 +710,3 @@ def _build_fcm_error(cls, error_dict) -> Optional[Callable[..., exceptions.Fireb fcm_code = detail.get('errorCode') break return _MessagingService.FCM_ERROR_TYPES.get(fcm_code) if fcm_code else None - - -# TODO: Remove comments -# Notes: -# This implementation currently only covers basic retires for pre-defined status errors -class HttpxRetryTransport(httpx.AsyncBaseTransport): - """HTTPX transport with retry logic.""" - # We could also support passing kwargs here - def __init__(self, **kwargs) -> None: - # Hardcoded settings for now - self._retryable_status_codes = (500, 503,) - self._max_retry_count = 4 - - # - We use a full AsyncHTTPTransport under the hood to make use of it's - # fully implemented `handle_async_request()`. - # - We could consider making the `HttpxRetryTransport`` class extend a - # `AsyncHTTPTransport` instead and use the parent class's methods to handle - # requests. - # - We should also ensure that that transport's internal retry is - # not enabled. - transport_kwargs = kwargs.copy() - transport_kwargs.update({'retries': 0, 'http2': True}) - self._wrapped_transport = httpx.AsyncHTTPTransport(**transport_kwargs) - - - async def handle_async_request(self, request: httpx.Request) -> httpx.Response: - _retry_count = 0 - - while True: - # Dispatch request - # Let exceptions pass through for now - response = await self._wrapped_transport.handle_async_request(request) - - # Check if request is retryable - if response.status_code in self._retryable_status_codes: - _retry_count += 1 - - # Return if retries exhausted - if _retry_count > self._max_retry_count: - return response - else: - return response diff --git a/tests/test_messaging.py b/tests/test_messaging.py index aee3ae4e6..0200c11ea 100644 --- a/tests/test_messaging.py +++ b/tests/test_messaging.py @@ -17,6 +17,7 @@ from itertools import chain, repeat import json import numbers +import httpx import respx from googleapiclient import http @@ -2100,54 +2101,26 @@ async def test_send_each_async_error_500_pass_on_retry_config(self): assert all([r.success for r in batch_response.responses]) assert not any([r.exception for r in batch_response.responses]) - # @respx.mock - # @pytest.mark.asyncio - # async def test_send_each_async_error_request(self): - # payload = json.dumps({ - # 'error': { - # 'status': 'INTERNAL', - # 'message': 'test INTERNAL error', - # 'details': [ - # { - # '@type': 'type.googleapis.com/google.firebase.fcm.v1.FcmError', - # 'errorCode': 'SOME_UNKNOWN_CODE', - # }, - # ], - # } - # }) - # responses = chain( - # [ - # httpx.ConnectError("Test request error", request=httpx.Request('POST', 'URL')) - # # respx.MockResponse(500, http_version='HTTP/2', content=payload), - # ], - # # repeat( - # respx.MockResponse(200, http_version='HTTP/2', json={'name': 'message-id1'})), - # # respx.MockResponse(200, http_version='HTTP/2', json={'name': 'message-id3'}), - # ) - - # # responses = repeat(respx.MockResponse(500, http_version='HTTP/2', content=payload)) - - # msg1 = messaging.Message(topic='foo1') - # route = respx.request( - # 'POST', - # 'https://fcm.googleapis.com/v1/projects/explicit-project-id/messages:send' - # ).mock(side_effect=responses) - # batch_response = await messaging.send_each_async([msg1], dry_run=True) - - # assert route.call_count == 1 - # assert batch_response.success_count == 0 - # assert batch_response.failure_count == 1 - # assert len(batch_response.responses) == 1 - # exception = batch_response.responses[0].exception - # assert isinstance(exception, exceptions.UnavailableError) - - # # assert route.call_count == 4 - # # assert batch_response.success_count == 1 - # # assert batch_response.failure_count == 0 - # # assert len(batch_response.responses) == 1 - # # assert [r.message_id for r in batch_response.responses] == ['message-id1'] - # # assert all([r.success for r in batch_response.responses]) - # # assert not any([r.exception for r in batch_response.responses]) + @respx.mock + @pytest.mark.asyncio + async def test_send_each_async_request_error(self): + responses = httpx.ConnectError("Test request error", request=httpx.Request( + 'POST', + 'https://fcm.googleapis.com/v1/projects/explicit-project-id/messages:send')) + + msg1 = messaging.Message(topic='foo1') + route = respx.request( + 'POST', + 'https://fcm.googleapis.com/v1/projects/explicit-project-id/messages:send' + ).mock(side_effect=responses) + batch_response = await messaging.send_each_async([msg1], dry_run=True) + + assert route.call_count == 1 + assert batch_response.success_count == 0 + assert batch_response.failure_count == 1 + assert len(batch_response.responses) == 1 + exception = batch_response.responses[0].exception + assert isinstance(exception, exceptions.UnavailableError) @pytest.mark.parametrize('status', HTTP_ERROR_CODES) def test_send_each_detailed_error(self, status): diff --git a/tests/test_retry.py b/tests/test_retry.py new file mode 100644 index 000000000..751fdea7b --- /dev/null +++ b/tests/test_retry.py @@ -0,0 +1,454 @@ +# Copyright 2025 Google Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Test cases for the firebase_admin._retry module.""" + +import time +import email.utils +from itertools import repeat +from unittest.mock import call +import pytest +import httpx +from pytest_mock import MockerFixture +import respx + +from firebase_admin._retry import HttpxRetry, HttpxRetryTransport + +_TEST_URL = 'http://firebase.test.url/' + +@pytest.fixture +def base_url() -> str: + """Provides a consistent base URL for tests.""" + return _TEST_URL + +class TestHttpxRetryTransport(): + @pytest.mark.asyncio + @respx.mock + async def test_no_retry_on_success(self, base_url: str, mocker: MockerFixture): + """Test that a successful response doesn't trigger retries.""" + retry_config = HttpxRetry(max_retries=3, status_forcelist=[500]) + transport = HttpxRetryTransport(retry=retry_config) + client = httpx.AsyncClient(transport=transport) + + route = respx.post(base_url).mock(return_value=httpx.Response(200, text="Success")) + + mock_sleep = mocker.patch('asyncio.sleep', return_value=None) + response = await client.post(base_url) + + assert response.status_code == 200 + assert response.text == "Success" + assert route.call_count == 1 + mock_sleep.assert_not_called() + + @pytest.mark.asyncio + @respx.mock + async def test_no_retry_on_non_retryable_status(self, base_url: str, mocker: MockerFixture): + """Test that a non-retryable error status doesn't trigger retries.""" + retry_config = HttpxRetry(max_retries=3, status_forcelist=[500, 503]) + transport = HttpxRetryTransport(retry=retry_config) + client = httpx.AsyncClient(transport=transport) + + route = respx.post(base_url).mock(return_value=httpx.Response(404, text="Not Found")) + + mock_sleep = mocker.patch('asyncio.sleep', return_value=None) + response = await client.post(base_url) + + assert response.status_code == 404 + assert response.text == "Not Found" + assert route.call_count == 1 + mock_sleep.assert_not_called() + + @pytest.mark.asyncio + @respx.mock + async def test_retry_on_status_code_success_on_last_retry( + self, base_url: str, mocker: MockerFixture + ): + """Test retry on status code from status_forcelist, succeeding on the last attempt.""" + retry_config = HttpxRetry(max_retries=2, status_forcelist=[503, 500], backoff_factor=0.5) + transport = HttpxRetryTransport(retry=retry_config) + client = httpx.AsyncClient(transport=transport) + + route = respx.post(base_url).mock(side_effect=[ + httpx.Response(503, text="Attempt 1 Failed"), + httpx.Response(500, text="Attempt 2 Failed"), + httpx.Response(200, text="Attempt 3 Success"), + ]) + + mock_sleep = mocker.patch('asyncio.sleep', return_value=None) + response = await client.post(base_url) + + assert response.status_code == 200 + assert response.text == "Attempt 3 Success" + assert route.call_count == 3 + assert mock_sleep.call_count == 2 + # Check sleep calls (backoff_factor is 0.5) + mock_sleep.assert_has_calls([call(0.0), call(1.0)]) + + @pytest.mark.asyncio + @respx.mock + async def test_retry_exhausted_returns_last_response( + self, base_url: str, mocker: MockerFixture + ): + """Test that the last response is returned when retries are exhausted.""" + retry_config = HttpxRetry(max_retries=1, status_forcelist=[500], backoff_factor=0) + transport = HttpxRetryTransport(retry=retry_config) + client = httpx.AsyncClient(transport=transport) + + route = respx.post(base_url).mock(side_effect=[ + httpx.Response(500, text="Attempt 1 Failed"), + httpx.Response(500, text="Attempt 2 Failed (Final)"), + # Should stop after previous response + httpx.Response(200, text="This should not be reached"), + ]) + + mock_sleep = mocker.patch('asyncio.sleep', return_value=None) + response = await client.post(base_url) + + assert response.status_code == 500 + assert response.text == "Attempt 2 Failed (Final)" + assert route.call_count == 2 # Initial call + 1 retry + assert mock_sleep.call_count == 1 # Slept before the single retry + + @pytest.mark.asyncio + @respx.mock + async def test_retry_after_header_seconds(self, base_url: str, mocker: MockerFixture): + """Test respecting Retry-After header with seconds value.""" + retry_config = HttpxRetry( + max_retries=1, respect_retry_after_header=True, backoff_factor=100) + transport = HttpxRetryTransport(retry=retry_config) + client = httpx.AsyncClient(transport=transport) + + route = respx.post(base_url).mock(side_effect=[ + httpx.Response(429, text="Too Many Requests", headers={'Retry-After': '10'}), + httpx.Response(200, text="OK"), + ]) + + mock_sleep = mocker.patch('asyncio.sleep', return_value=None) + response = await client.post(base_url) + + assert response.status_code == 200 + assert route.call_count == 2 + assert mock_sleep.call_count == 1 + # Assert sleep was called with the value from Retry-After header + mock_sleep.assert_called_once_with(10.0) + + @pytest.mark.asyncio + @respx.mock + async def test_retry_after_header_http_date(self, base_url: str, mocker: MockerFixture): + """Test respecting Retry-After header with an HTTP-date value.""" + retry_config = HttpxRetry( + max_retries=1, respect_retry_after_header=True, backoff_factor=100) + transport = HttpxRetryTransport(retry=retry_config) + client = httpx.AsyncClient(transport=transport) + + # Calculate a future time and format as HTTP-date + retry_delay_seconds = 60 + time_at_request = time.time() + retry_time = time_at_request + retry_delay_seconds + http_date = email.utils.formatdate(retry_time) + + route = respx.post(base_url).mock(side_effect=[ + httpx.Response(503, text="Maintenance", headers={'Retry-After': http_date}), + httpx.Response(200, text="OK"), + ]) + + mock_sleep = mocker.patch('asyncio.sleep', return_value=None) + # Patch time.time() within the test context to control the baseline for date calculation + # Set the mock time to be *just before* the Retry-After time + mocker.patch('time.time', return_value=time_at_request) + response = await client.post(base_url) + + assert response.status_code == 200 + assert route.call_count == 2 + assert mock_sleep.call_count == 1 + # Check that sleep was called with approximately the correct delay + # Allow for small floating point inaccuracies + mock_sleep.assert_called_once() + args, _ = mock_sleep.call_args + assert args[0] == pytest.approx(retry_delay_seconds, abs=2) + + @pytest.mark.asyncio + @respx.mock + async def test_retry_after_ignored_when_disabled(self, base_url: str, mocker: MockerFixture): + """Test Retry-After header is ignored if `respect_retry_after_header` is `False`.""" + retry_config = HttpxRetry( + max_retries=3, respect_retry_after_header=False, status_forcelist=[429], + backoff_factor=0.5, backoff_max=10) + transport = HttpxRetryTransport(retry=retry_config) + client = httpx.AsyncClient(transport=transport) + + route = respx.post(base_url).mock(side_effect=[ + httpx.Response(429, text="Too Many Requests", headers={'Retry-After': '60'}), + httpx.Response(429, text="Too Many Requests", headers={'Retry-After': '60'}), + httpx.Response(429, text="Too Many Requests", headers={'Retry-After': '60'}), + httpx.Response(200, text="OK"), + ]) + + mock_sleep = mocker.patch('asyncio.sleep', return_value=None) + response = await client.post(base_url) + + assert response.status_code == 200 + assert route.call_count == 4 + assert mock_sleep.call_count == 3 + + # Assert sleep was called with the calculated backoff times: + # After first attempt: delay = 0 + # After retry 1 (attempt 2): delay = 0.5 * (2**(2-1)) = 0.5 * 2 = 1.0 + # After retry 2 (attempt 3): delay = 0.5 * (2**(3-1)) = 0.5 * 4 = 2.0 + expected_sleeps = [call(0), call(1.0), call(2.0)] + mock_sleep.assert_has_calls(expected_sleeps) + + @pytest.mark.asyncio + @respx.mock + async def test_retry_after_header_missing_backoff_fallback( + self, base_url: str, mocker: MockerFixture + ): + """Test Retry-After header is ignored if `respect_retry_after_header`is `True` but header is + not set.""" + retry_config = HttpxRetry( + max_retries=3, respect_retry_after_header=True, status_forcelist=[429], + backoff_factor=0.5, backoff_max=10) + transport = HttpxRetryTransport(retry=retry_config) + client = httpx.AsyncClient(transport=transport) + + route = respx.post(base_url).mock(side_effect=[ + httpx.Response(429, text="Too Many Requests"), + httpx.Response(429, text="Too Many Requests"), + httpx.Response(429, text="Too Many Requests"), + httpx.Response(200, text="OK"), + ]) + + mock_sleep = mocker.patch('asyncio.sleep', return_value=None) + response = await client.post(base_url) + + assert response.status_code == 200 + assert route.call_count == 4 + assert mock_sleep.call_count == 3 + + # Assert sleep was called with the calculated backoff times: + # After first attempt: delay = 0 + # After retry 1 (attempt 2): delay = 0.5 * (2**(2-1)) = 0.5 * 2 = 1.0 + # After retry 2 (attempt 3): delay = 0.5 * (2**(3-1)) = 0.5 * 4 = 2.0 + expected_sleeps = [call(0), call(1.0), call(2.0)] + mock_sleep.assert_has_calls(expected_sleeps) + + @pytest.mark.asyncio + @respx.mock + async def test_exponential_backoff(self, base_url: str, mocker: MockerFixture): + """Test that sleep time increases exponentially with `backoff_factor`.""" + # status=3 allows 3 retries (attempts 2, 3, 4) + retry_config = HttpxRetry( + max_retries=3, status_forcelist=[500], backoff_factor=0.1, backoff_max=10.0) + transport = HttpxRetryTransport(retry=retry_config) + client = httpx.AsyncClient(transport=transport) + + route = respx.post(base_url).mock(side_effect=[ + httpx.Response(500, text="Fail 1"), + httpx.Response(500, text="Fail 2"), + httpx.Response(500, text="Fail 3"), + httpx.Response(200, text="Success"), + ]) + + mock_sleep = mocker.patch('asyncio.sleep', return_value=None) + response = await client.post(base_url) + + assert response.status_code == 200 + assert route.call_count == 4 + assert mock_sleep.call_count == 3 + + # Check expected backoff times: + # After first attempt: delay = 0 + # After retry 1 (attempt 2): delay = 0.1 * (2**(2-1)) = 0.1 * 2 = 0.2 + # After retry 2 (attempt 3): delay = 0.1 * (2**(3-1)) = 0.1 * 4 = 0.4 + expected_sleeps = [call(0), call(0.2), call(0.4)] + mock_sleep.assert_has_calls(expected_sleeps) + + @pytest.mark.asyncio + @respx.mock + async def test_backoff_max(self, base_url: str, mocker: MockerFixture): + """Test that backoff time respects `backoff_max`.""" + # status=4 allows 4 retries. backoff_factor=1 causes rapid increase. + retry_config = HttpxRetry( + max_retries=4, status_forcelist=[500], backoff_factor=1, backoff_max=3.0) + transport = HttpxRetryTransport(retry=retry_config) + client = httpx.AsyncClient(transport=transport) + + route = respx.post(base_url).mock(side_effect=[ + httpx.Response(500, text="Fail 1"), + httpx.Response(500, text="Fail 2"), + httpx.Response(500, text="Fail 2"), + httpx.Response(500, text="Fail 4"), + httpx.Response(200, text="Success"), + ]) + + mock_sleep = mocker.patch('asyncio.sleep', return_value=None) + response = await client.post(base_url) + + assert response.status_code == 200 + assert route.call_count == 5 + assert mock_sleep.call_count == 4 + + # Check expected backoff times: + # After first attempt: delay = 0 + # After retry 1 (attempt 2): delay = 1*(2**(2-1)) = 2. Clamped by max(0, min(3.0, 2)) = 2.0 + # After retry 2 (attempt 3): delay = 1*(2**(3-1)) = 4. Clamped by max(0, min(3.0, 4)) = 3.0 + # After retry 3 (attempt 4): delay = 1*(2**(4-1)) = 8. Clamped by max(0, min(3.0, 8)) = 3.0 + expected_sleeps = [call(0.0), call(2.0), call(3.0), call(3.0)] + mock_sleep.assert_has_calls(expected_sleeps) + + @pytest.mark.asyncio + @respx.mock + async def test_backoff_jitter(self, base_url: str, mocker: MockerFixture): + """Test that `backoff_jitter` adds randomness within bounds.""" + retry_config = HttpxRetry( + max_retries=3, status_forcelist=[500], backoff_factor=0.2, backoff_jitter=0.1) + transport = HttpxRetryTransport(retry=retry_config) + client = httpx.AsyncClient(transport=transport) + + route = respx.post(base_url).mock(side_effect=[ + httpx.Response(500, text="Fail 1"), + httpx.Response(500, text="Fail 2"), + httpx.Response(500, text="Fail 3"), + httpx.Response(200, text="Success"), + ]) + + mock_sleep = mocker.patch('asyncio.sleep', return_value=None) + response = await client.post(base_url) + + assert response.status_code == 200 + assert route.call_count == 4 + assert mock_sleep.call_count == 3 + + # Check expected backoff times are within the expected range [base - jitter, base + jitter] + # After first attempt: delay = 0 + # After retry 1 (attempt 2): delay = 0.2 * (2**(2-1)) = 0.2 * 2 = 0.4 +/- 0.1 + # After retry 2 (attempt 3): delay = 0.2 * (2**(3-1)) = 0.2 * 4 = 0.8 +/- 0.1 + expected_sleeps = [ + call(pytest.approx(0.0, abs=0.1)), + call(pytest.approx(0.4, abs=0.1)), + call(pytest.approx(0.8, abs=0.1)) + ] + mock_sleep.assert_has_calls(expected_sleeps) + + @pytest.mark.asyncio + @respx.mock + async def test_error_not_retryable(self, base_url): + """Test that non-HTTP errors are raised immediately if not retryable.""" + retry_config = HttpxRetry(max_retries=3) + transport = HttpxRetryTransport(retry=retry_config) + client = httpx.AsyncClient(transport=transport) + + # Mock a connection error + route = respx.post(base_url).mock( + side_effect=repeat(httpx.ConnectError("Connection failed"))) + + with pytest.raises(httpx.ConnectError, match="Connection failed"): + await client.post(base_url) + + assert route.call_count == 1 + + +class TestHttpxRetry(): + _TEST_REQUEST = httpx.Request('POST', _TEST_URL) + + def test_httpx_retry_copy(self, base_url): + """Test that `HttpxRetry.copy()` creates a deep copy.""" + original = HttpxRetry(max_retries=5, status_forcelist=[500, 503], backoff_factor=0.5) + original.history.append((base_url, None, None)) # Add something mutable + + copied = original.copy() + + # Assert they are different objects + assert original is not copied + assert original.history is not copied.history + + # Assert values are the same initially + assert copied.retries_left == original.retries_left + assert copied.status_forcelist == original.status_forcelist + assert copied.backoff_factor == original.backoff_factor + assert len(copied.history) == 1 + + # Modify the copy and check original is unchanged + copied.retries_left = 1 + copied.status_forcelist = [404] + copied.history.append((base_url, None, None)) + + assert original.retries_left == 5 + assert original.status_forcelist == [500, 503] + assert len(original.history) == 1 + + def test_parse_retry_after_seconds(self): + retry = HttpxRetry() + assert retry._parse_retry_after('10') == 10.0 + assert retry._parse_retry_after(' 30 ') == 30.0 + + + def test_parse_retry_after_http_date(self, mocker: MockerFixture): + mocker.patch('time.time', return_value=1000.0) + retry = HttpxRetry() + # Date string representing 1015 seconds since epoch + http_date = email.utils.formatdate(1015.0) + # time.time() is mocked to 1000.0, so delay should be 15s + assert retry._parse_retry_after(http_date) == pytest.approx(15.0) + + def test_parse_retry_after_past_http_date(self, mocker: MockerFixture): + """Test that a past date results in 0 seconds.""" + mocker.patch('time.time', return_value=1000.0) + retry = HttpxRetry() + http_date = email.utils.formatdate(990.0) # 10s in the past + assert retry._parse_retry_after(http_date) == 0.0 + + def test_parse_retry_after_invalid_date(self): + retry = HttpxRetry() + with pytest.raises(httpx.RemoteProtocolError, match='Invalid Retry-After header'): + retry._parse_retry_after('Invalid Date Format') + + def test_get_backoff_time_calculation(self): + retry = HttpxRetry( + max_retries=6, status_forcelist=[503], backoff_factor=0.5, backoff_max=10.0) + response = httpx.Response(503) + # No history -> attempt 1 -> no backoff before first request + # Note: get_backoff_time() is typically called *before* the *next* request, + # so history length reflects completed attempts. + assert retry.get_backoff_time() == 0.0 + + # Simulate attempt 1 completed + retry.increment(self._TEST_REQUEST, response) + # History len 1, attempt 2 -> base case 0 + assert retry.get_backoff_time() == pytest.approx(0) + + # Simulate attempt 2 completed + retry.increment(self._TEST_REQUEST, response) + # History len 2, attempt 3 -> 0.5*(2^1) = 1.0 + assert retry.get_backoff_time() == pytest.approx(1.0) + + # Simulate attempt 3 completed + retry.increment(self._TEST_REQUEST, response) + # History len 3, attempt 4 -> 0.5*(2^2) = 2.0 + assert retry.get_backoff_time() == pytest.approx(2.0) + + # Simulate attempt 4 completed + retry.increment(self._TEST_REQUEST, response) + # History len 4, attempt 5 -> 0.5*(2^3) = 4.0 + assert retry.get_backoff_time() == pytest.approx(4.0) + + # Simulate attempt 5 completed + retry.increment(self._TEST_REQUEST, response) + # History len 5, attempt 6 -> 0.5*(2^4) = 8.0 + assert retry.get_backoff_time() == pytest.approx(8.0) + + # Simulate attempt 6 completed + retry.increment(self._TEST_REQUEST, response) + # History len 6, attempt 7 -> 0.5*(2^5) = 16.0 Clamped to 10 + assert retry.get_backoff_time() == pytest.approx(10.0) From 28d778ccc8bedc6493916b94fa46024d3725bd8e Mon Sep 17 00:00:00 2001 From: Jonathan Edey <145066863+jonathanedey@users.noreply.github.com> Date: Tue, 27 May 2025 10:31:34 -0400 Subject: [PATCH 3/5] Added `HttpxAsyncClient` wrapper for `httpx.AsyncClient` and support for `send_each_for_multicast_async()` (#878) * Refactored retry config to `_retry.py` and added support for backoff and Retry-After * Added unit tests for `_retry.py` * Updated unit tests for HTTPX request errors * Add HttpxAsyncClient to wrap httpx.AsyncClient * Added forced refresh to google auth credential flow and fixed lint * Added unit tests for `GoogleAuthCredentialFlow` and `HttpxAsyncClient` * Removed duplicate export * Added support for `send_each_for_multicast_async()` and updated doc string and type hints * Remove duplicate auth class * Cover auth request error case when `requests` request fails in HTTPX auth flow * Update test for `send_each_for_multicast_async()` * Address review comments * fix lint and some types * Address review comments and removed unused code * Update metric header test logic for `TestHttpxAsyncClient` --- firebase_admin/_http_client.py | 214 ++++++++++- firebase_admin/_retry.py | 7 +- firebase_admin/messaging.py | 155 ++++---- integration/conftest.py | 10 - integration/test_messaging.py | 49 +-- tests/test_http_client.py | 683 ++++++++++++++++++++++++++++----- 6 files changed, 894 insertions(+), 224 deletions(-) diff --git a/firebase_admin/_http_client.py b/firebase_admin/_http_client.py index 57c09e2e4..a6d0a5bef 100644 --- a/firebase_admin/_http_client.py +++ b/firebase_admin/_http_client.py @@ -14,14 +14,23 @@ """Internal HTTP client module. - This module provides utilities for making HTTP calls using the requests library. - """ - -from google.auth import transport -import requests +This module provides utilities for making HTTP calls using the requests library. +""" + +from __future__ import annotations +import logging +from typing import Any, Dict, Generator, Optional, Tuple, Union +import httpx +import requests.adapters from requests.packages.urllib3.util import retry # pylint: disable=import-error +from google.auth import credentials +from google.auth import transport +from google.auth.transport import requests as google_auth_requests from firebase_admin import _utils +from firebase_admin._retry import HttpxRetry, HttpxRetryTransport + +logger = logging.getLogger(__name__) if hasattr(retry.Retry.DEFAULT, 'allowed_methods'): _ANY_METHOD = {'allowed_methods': None} @@ -34,6 +43,9 @@ connect=1, read=1, status=4, status_forcelist=[500, 503], raise_on_status=False, backoff_factor=0.5, **_ANY_METHOD) +DEFAULT_HTTPX_RETRY_CONFIG = HttpxRetry( + max_retries=4, status_forcelist=[500, 503], backoff_factor=0.5) + DEFAULT_TIMEOUT_SECONDS = 120 @@ -144,7 +156,6 @@ def close(self): self._session.close() self._session = None - class JsonHttpClient(HttpClient): """An HTTP client that parses response messages as JSON.""" @@ -153,3 +164,194 @@ def __init__(self, **kwargs): def parse_body(self, resp): return resp.json() + +class GoogleAuthCredentialFlow(httpx.Auth): + """Google Auth Credential Auth Flow""" + def __init__(self, credential: credentials.Credentials): + self._credential = credential + self._max_refresh_attempts = 2 + self._refresh_status_codes = (401,) + + def apply_auth_headers( + self, + request: httpx.Request, + auth_request: google_auth_requests.Request + ) -> None: + """A helper function to refreshes credentials if needed and mutates the request headers to + contain access token and any other google auth headers.""" + + logger.debug( + 'Attempting to apply auth headers. Credential validity before: %s', + self._credential.valid + ) + self._credential.before_request( + auth_request, request.method, str(request.url), request.headers + ) + logger.debug('Auth headers applied. Credential validity after: %s', self._credential.valid) + + def auth_flow(self, request: httpx.Request) -> Generator[httpx.Request, httpx.Response, None]: + _original_headers = request.headers.copy() + _credential_refresh_attempt = 0 + + # Create a Google auth request object to be used for refreshing credentials + auth_request = google_auth_requests.Request() + + while True: + # Copy original headers for each attempt + request.headers = _original_headers.copy() + + # Apply auth headers (which might include an implicit refresh if token is expired) + self.apply_auth_headers(request, auth_request) + + logger.debug( + 'Dispatching request, attempt %d of %d', + _credential_refresh_attempt, self._max_refresh_attempts + ) + response: httpx.Response = yield request + + if response.status_code in self._refresh_status_codes: + if _credential_refresh_attempt < self._max_refresh_attempts: + logger.debug( + 'Received status %d. Attempting explicit credential refresh. \ + Attempt %d of %d.', + response.status_code, + _credential_refresh_attempt + 1, + self._max_refresh_attempts + ) + # Explicitly force a credentials refresh + self._credential.refresh(auth_request) + _credential_refresh_attempt += 1 + else: + logger.debug( + 'Received status %d, but max auth refresh attempts (%d) reached. \ + Returning last response.', + response.status_code, self._max_refresh_attempts + ) + break + else: + # Status code is not one that requires a refresh, so break and return response + logger.debug( + 'Status code %d does not require refresh. Returning response.', + response.status_code + ) + break + # The last yielded response is automatically returned by httpx's auth flow. + +class HttpxAsyncClient(): + """Async HTTP client used to make HTTP/2 calls using HTTPX. + + HttpxAsyncClient maintains an async HTTPX client, and handles request authentication and retries + if necessary. + """ + def __init__( + self, + credential: Optional[credentials.Credentials] = None, + base_url: str = '', + headers: Optional[Union[httpx.Headers, Dict[str, str]]] = None, + retry_config: HttpxRetry = DEFAULT_HTTPX_RETRY_CONFIG, + timeout: int = DEFAULT_TIMEOUT_SECONDS, + http2: bool = True + ) -> None: + """Creates a new HttpxAsyncClient instance from the provided arguments. + + If a credential is provided, initializes a new async HTTPX client authorized with it. + Otherwise, initializes a new unauthorized async HTTPX client. + + Args: + credential: A Google credential that can be used to authenticate requests (optional). + base_url: A URL prefix to be added to all outgoing requests (optional). + headers: A map of headers to be added to all outgoing requests (optional). + retry_config: A HttpxRetry configuration. Default settings would retry up to 4 times for + HTTP 500 and 503 errors (optional). + timeout: HTTP timeout in seconds. Defaults to 120 seconds when not specified (optional). + http2: A boolean indicating if HTTP/2 support should be enabled. Defaults to `True` when + not specified (optional). + """ + self._base_url = base_url + self._timeout = timeout + self._headers = {**headers, **METRICS_HEADERS} if headers else {**METRICS_HEADERS} + self._retry_config = retry_config + + # Only set up retries on urls starting with 'http://' and 'https://' + self._mounts = { + 'http://': HttpxRetryTransport(retry=self._retry_config, http2=http2), + 'https://': HttpxRetryTransport(retry=self._retry_config, http2=http2) + } + + if credential: + self._async_client = httpx.AsyncClient( + http2=http2, + timeout=self._timeout, + headers=self._headers, + auth=GoogleAuthCredentialFlow(credential), # Add auth flow for credentials. + mounts=self._mounts + ) + else: + self._async_client = httpx.AsyncClient( + http2=http2, + timeout=self._timeout, + headers=self._headers, + mounts=self._mounts + ) + + @property + def base_url(self): + return self._base_url + + @property + def timeout(self): + return self._timeout + + @property + def async_client(self): + return self._async_client + + async def request(self, method: str, url: str, **kwargs: Any) -> httpx.Response: + """Makes an HTTP call using the HTTPX library. + + This is the sole entry point to the HTTPX library. All other helper methods in this + class call this method to send HTTP requests out. Refer to + https://www.python-httpx.org/api/ for more information on supported options + and features. + + Args: + method: HTTP method name as a string (e.g. get, post). + url: URL of the remote endpoint. + **kwargs: An additional set of keyword arguments to be passed into the HTTPX API + (e.g. json, params, timeout). + + Returns: + Response: An HTTPX response object. + + Raises: + HTTPError: Any HTTPX exceptions encountered while making the HTTP call. + RequestException: Any requests exceptions encountered while making the HTTP call. + """ + if 'timeout' not in kwargs: + kwargs['timeout'] = self.timeout + resp = await self._async_client.request(method, self.base_url + url, **kwargs) + return resp.raise_for_status() + + async def headers(self, method: str, url: str, **kwargs: Any) -> httpx.Headers: + resp = await self.request(method, url, **kwargs) + return resp.headers + + async def body_and_response( + self, method: str, url: str, **kwargs: Any) -> Tuple[Any, httpx.Response]: + resp = await self.request(method, url, **kwargs) + return self.parse_body(resp), resp + + async def body(self, method: str, url: str, **kwargs: Any) -> Any: + resp = await self.request(method, url, **kwargs) + return self.parse_body(resp) + + async def headers_and_body( + self, method: str, url: str, **kwargs: Any) -> Tuple[httpx.Headers, Any]: + resp = await self.request(method, url, **kwargs) + return resp.headers, self.parse_body(resp) + + def parse_body(self, resp: httpx.Response) -> Any: + return resp.json() + + async def aclose(self) -> None: + await self._async_client.aclose() diff --git a/firebase_admin/_retry.py b/firebase_admin/_retry.py index ef330cbd9..efd90a743 100644 --- a/firebase_admin/_retry.py +++ b/firebase_admin/_retry.py @@ -23,8 +23,7 @@ import random import re import time -from types import CoroutineType -from typing import Any, Callable, List, Optional, Tuple +from typing import Any, Callable, List, Optional, Tuple, Coroutine import logging import asyncio import httpx @@ -163,7 +162,7 @@ class HttpxRetryTransport(httpx.AsyncBaseTransport): DEFAULT_RETRY = HttpxRetry(max_retries=4, status_forcelist=[500, 503], backoff_factor=0.5) - def __init__(self, retry: HttpxRetry = DEFAULT_RETRY, **kwargs) -> None: + def __init__(self, retry: HttpxRetry = DEFAULT_RETRY, **kwargs: Any) -> None: self._retry = retry transport_kwargs = kwargs.copy() @@ -180,7 +179,7 @@ async def handle_async_request(self, request: httpx.Request) -> httpx.Response: async def _dispatch_with_retry( self, request: httpx.Request, - dispatch_method: Callable[[httpx.Request], CoroutineType[Any, Any, httpx.Response]] + dispatch_method: Callable[[httpx.Request], Coroutine[Any, Any, httpx.Response]] ) -> httpx.Response: """Sends a request with retry logic using a provided dispatch method.""" # This request config is used across all requests that use this transport and therefore diff --git a/firebase_admin/messaging.py b/firebase_admin/messaging.py index 7f747db18..069d92939 100644 --- a/firebase_admin/messaging.py +++ b/firebase_admin/messaging.py @@ -15,7 +15,7 @@ """Firebase Cloud Messaging module.""" from __future__ import annotations -from typing import Callable, List, Optional +from typing import Any, Callable, Dict, List, Optional, cast import concurrent.futures import json import warnings @@ -24,8 +24,6 @@ import requests import httpx -from google.auth import credentials -from google.auth.transport import requests as auth_requests from googleapiclient import http from googleapiclient import _auth @@ -39,7 +37,6 @@ exceptions, App ) -from firebase_admin._retry import HttpxRetryTransport logger = logging.getLogger(__name__) @@ -113,7 +110,7 @@ def _get_messaging_service(app: Optional[App]) -> _MessagingService: return _utils.get_app_service(app, _MESSAGING_ATTRIBUTE, _MessagingService) -def send(message, dry_run=False, app: Optional[App] = None): +def send(message: Message, dry_run: bool = False, app: Optional[App] = None) -> str: """Sends the given message via Firebase Cloud Messaging (FCM). If the ``dry_run`` mode is enabled, the message will not be actually delivered to the @@ -133,7 +130,11 @@ def send(message, dry_run=False, app: Optional[App] = None): """ return _get_messaging_service(app).send(message, dry_run) -def send_each(messages, dry_run=False, app=None): +def send_each( + messages: List[Message], + dry_run: bool = False, + app: Optional[App] = None + ) -> BatchResponse: """Sends each message in the given list via Firebase Cloud Messaging. If the ``dry_run`` mode is enabled, the message will not be actually delivered to the @@ -153,7 +154,64 @@ def send_each(messages, dry_run=False, app=None): """ return _get_messaging_service(app).send_each(messages, dry_run) -async def send_each_async(messages, dry_run=True, app: Optional[App] = None) -> BatchResponse: +async def send_each_async( + messages: List[Message], + dry_run: bool = False, + app: Optional[App] = None + ) -> BatchResponse: + """Sends each message in the given list asynchronously via Firebase Cloud Messaging. + + If the ``dry_run`` mode is enabled, the message will not be actually delivered to the + recipients. Instead FCM performs all the usual validations, and emulates the send operation. + + Args: + messages: A list of ``messaging.Message`` instances. + dry_run: A boolean indicating whether to run the operation in dry run mode (optional). + app: An App instance (optional). + + Returns: + BatchResponse: A ``messaging.BatchResponse`` instance. + + Raises: + FirebaseError: If an error occurs while sending the message to the FCM service. + ValueError: If the input arguments are invalid. + """ + return await _get_messaging_service(app).send_each_async(messages, dry_run) + +async def send_each_for_multicast_async( + multicast_message: MulticastMessage, + dry_run: bool = False, + app: Optional[App] = None + ) -> BatchResponse: + """Sends the given mutlicast message to each token asynchronously via Firebase Cloud Messaging + (FCM). + + If the ``dry_run`` mode is enabled, the message will not be actually delivered to the + recipients. Instead FCM performs all the usual validations, and emulates the send operation. + + Args: + multicast_message: An instance of ``messaging.MulticastMessage``. + dry_run: A boolean indicating whether to run the operation in dry run mode (optional). + app: An App instance (optional). + + Returns: + BatchResponse: A ``messaging.BatchResponse`` instance. + + Raises: + FirebaseError: If an error occurs while sending the message to the FCM service. + ValueError: If the input arguments are invalid. + """ + if not isinstance(multicast_message, MulticastMessage): + raise ValueError('Message must be an instance of messaging.MulticastMessage class.') + messages = [Message( + data=multicast_message.data, + notification=multicast_message.notification, + android=multicast_message.android, + webpush=multicast_message.webpush, + apns=multicast_message.apns, + fcm_options=multicast_message.fcm_options, + token=token + ) for token in multicast_message.tokens] return await _get_messaging_service(app).send_each_async(messages, dry_run) def send_each_for_multicast(multicast_message, dry_run=False, app=None): @@ -379,54 +437,6 @@ def exception(self): """A ``FirebaseError`` if an error occurs while sending the message to the FCM service.""" return self._exception -class GoogleAuthCredentialFlow(httpx.Auth): - """Google Auth Credential Auth Flow""" - def __init__(self, credential: credentials.Credentials): - self._credential = credential - self._max_refresh_attempts = 2 - self._refresh_status_codes = (401,) - - def apply_auth_headers(self, request: httpx.Request): - # Build request used to refresh credentials if needed - auth_request = auth_requests.Request() - # This refreshes the credentials if needed and mutates the request headers to - # contain access token and any other google auth headers - self._credential.before_request(auth_request, request.method, request.url, request.headers) - - - def auth_flow(self, request: httpx.Request): - # Keep original headers since `credentials.before_request` mutates the passed headers and we - # want to keep the original in cause we need an auth retry. - _original_headers = request.headers.copy() - - _credential_refresh_attempt = 0 - while _credential_refresh_attempt <= self._max_refresh_attempts: - # copy original headers - request.headers = _original_headers.copy() - # mutates request headers - logger.debug( - 'Refreshing credentials for request attempt %d', - _credential_refresh_attempt + 1) - self.apply_auth_headers(request) - - # Continue to perform the request - # yield here dispatches the request and returns with the response - response: httpx.Response = yield request - - # We can check the result of the response and determine in we need to retry - # on refreshable status codes. Current transport.requests.AuthorizedSession() - # only does this on 401 errors. We should do the same. - if response.status_code in self._refresh_status_codes: - logger.debug( - 'Request attempt %d failed due to unauthorized credentials', - _credential_refresh_attempt + 1) - _credential_refresh_attempt += 1 - else: - break - # Last yielded response is auto returned. - - - class _MessagingService: """Service class that implements Firebase Cloud Messaging (FCM) functionality.""" @@ -444,7 +454,7 @@ class _MessagingService: 'UNREGISTERED': UnregisteredError, } - def __init__(self, app) -> None: + def __init__(self, app: App) -> None: project_id = app.project_id if not project_id: raise ValueError( @@ -459,12 +469,8 @@ def __init__(self, app) -> None: timeout = app.options.get('httpTimeout', _http_client.DEFAULT_TIMEOUT_SECONDS) self._credential = app.credential.get_credential() self._client = _http_client.JsonHttpClient(credential=self._credential, timeout=timeout) - self._async_client = httpx.AsyncClient( - http2=True, - auth=GoogleAuthCredentialFlow(self._credential), - timeout=timeout, - transport=HttpxRetryTransport() - ) + self._async_client = _http_client.HttpxAsyncClient( + credential=self._credential, timeout=timeout) self._build_transport = _auth.authorized_http @classmethod @@ -473,7 +479,7 @@ def encode_message(cls, message): raise ValueError('Message must be an instance of messaging.Message class.') return cls.JSON_ENCODER.default(message) - def send(self, message, dry_run=False): + def send(self, message: Message, dry_run: bool = False) -> str: """Sends the given message to FCM via the FCM v1 API.""" data = self._message_data(message, dry_run) try: @@ -486,9 +492,9 @@ def send(self, message, dry_run=False): except requests.exceptions.RequestException as error: raise self._handle_fcm_error(error) else: - return resp['name'] + return cast(str, resp['name']) - def send_each(self, messages, dry_run=False): + def send_each(self, messages: List[Message], dry_run: bool = False) -> BatchResponse: """Sends the given messages to FCM via the FCM v1 API.""" if not isinstance(messages, list): raise ValueError('messages must be a list of messaging.Message instances.') @@ -531,13 +537,11 @@ async def send_data(data): url=self._fcm_url, headers=self._fcm_headers, json=data) - # HTTP/2 check - if resp.http_version != 'HTTP/2': - raise Exception('This messages was not sent with HTTP/2') - resp.raise_for_status() - # except httpx.HTTPStatusError as exception: except httpx.HTTPError as exception: return SendResponse(resp=None, exception=self._handle_fcm_httpx_error(exception)) + # Catch errors caused by the requests library during authorization + except requests.exceptions.RequestException as exception: + return SendResponse(resp=None, exception=self._handle_fcm_error(exception)) else: return SendResponse(resp.json(), exception=None) @@ -682,7 +686,10 @@ def _build_fcm_error_requests(cls, error, message, error_dict): @classmethod def _build_fcm_error_httpx( - cls, error: httpx.HTTPError, message, error_dict + cls, + error: httpx.HTTPError, + message: str, + error_dict: Optional[Dict[str, Any]] ) -> Optional[exceptions.FirebaseError]: """Parses a httpx error response from the FCM API and creates a FCM-specific exception if appropriate.""" @@ -701,7 +708,11 @@ def _build_fcm_error_googleapiclient(cls, error, message, error_dict, http_respo return exc_type(message, cause=error, http_response=http_response) if exc_type else None @classmethod - def _build_fcm_error(cls, error_dict) -> Optional[Callable[..., exceptions.FirebaseError]]: + def _build_fcm_error( + cls, + error_dict: Optional[Dict[str, Any]] + ) -> Optional[Callable[..., exceptions.FirebaseError]]: + """Parses an error response to determine the appropriate FCM-specific error type.""" if not error_dict: return None fcm_code = None diff --git a/integration/conftest.py b/integration/conftest.py index bdecca40e..efa45932d 100644 --- a/integration/conftest.py +++ b/integration/conftest.py @@ -15,7 +15,6 @@ """pytest configuration and global fixtures for integration tests.""" import json -# import asyncio import pytest from pytest_asyncio import is_async_test @@ -73,15 +72,6 @@ def api_key(request): with open(path) as keyfile: return keyfile.read().strip() -# @pytest.fixture(scope="session") -# def event_loop(): -# """Create an instance of the default event loop for test session. -# This avoids early eventloop closure. -# """ -# loop = asyncio.get_event_loop_policy().new_event_loop() -# yield loop -# loop.close() - def pytest_collection_modifyitems(items): pytest_asyncio_tests = (item for item in items if is_async_test(item)) session_scope_marker = pytest.mark.asyncio(loop_scope="session") diff --git a/integration/test_messaging.py b/integration/test_messaging.py index af35ce01b..296a4d338 100644 --- a/integration/test_messaging.py +++ b/integration/test_messaging.py @@ -254,39 +254,6 @@ async def test_send_each_async(): assert isinstance(response.exception, exceptions.InvalidArgumentError) assert response.message_id is None - -# @pytest.mark.asyncio -# async def test_send_each_async_error(): -# messages = [ -# messaging.Message( -# topic='foo-bar', notification=messaging.Notification('Title', 'Body')), -# messaging.Message( -# topic='foo-bar', notification=messaging.Notification('Title', 'Body')), -# messaging.Message( -# token='not-a-token', notification=messaging.Notification('Title', 'Body')), -# ] - -# batch_response = await messaging.send_each_async(messages, dry_run=True) - -# assert batch_response.success_count == 2 -# assert batch_response.failure_count == 1 -# assert len(batch_response.responses) == 3 - -# response = batch_response.responses[0] -# assert response.success is True -# assert response.exception is None -# assert re.match('^projects/.*/messages/.*$', response.message_id) - -# response = batch_response.responses[1] -# assert response.success is True -# assert response.exception is None -# assert re.match('^projects/.*/messages/.*$', response.message_id) - -# response = batch_response.responses[2] -# assert response.success is False -# assert isinstance(response.exception, exceptions.InvalidArgumentError) -# assert response.message_id is None - @pytest.mark.asyncio async def test_send_each_async_500(): messages = [] @@ -303,3 +270,19 @@ async def test_send_each_async_500(): assert response.success is True assert response.exception is None assert re.match('^projects/.*/messages/.*$', response.message_id) + +@pytest.mark.asyncio +async def test_send_each_for_multicast_async(): + multicast = messaging.MulticastMessage( + notification=messaging.Notification('Title', 'Body'), + tokens=['not-a-token', 'also-not-a-token']) + + batch_response = await messaging.send_each_for_multicast_async(multicast) + + assert batch_response.success_count == 0 + assert batch_response.failure_count == 2 + assert len(batch_response.responses) == 2 + for response in batch_response.responses: + assert response.success is False + assert response.exception is not None + assert response.message_id is None diff --git a/tests/test_http_client.py b/tests/test_http_client.py index 78036166c..f1e7f6a64 100644 --- a/tests/test_http_client.py +++ b/tests/test_http_client.py @@ -13,116 +13,131 @@ # limitations under the License. """Tests for firebase_admin._http_client.""" +from typing import Dict, Optional, Union import pytest +import httpx +import respx from pytest_localserver import http +from pytest_mock import MockerFixture import requests from firebase_admin import _http_client, _utils +from firebase_admin._retry import HttpxRetry, HttpxRetryTransport +from firebase_admin._http_client import ( + HttpxAsyncClient, + GoogleAuthCredentialFlow, + DEFAULT_TIMEOUT_SECONDS +) from tests import testutils _TEST_URL = 'http://firebase.test.url/' +@pytest.fixture +def default_retry_config() -> HttpxRetry: + """Provides a fresh copy of the default retry config instance.""" + return _http_client.DEFAULT_HTTPX_RETRY_CONFIG -def test_http_client_default_session(): - client = _http_client.HttpClient() - assert client.session is not None - assert client.base_url == '' - recorder = _instrument(client, 'body') - resp = client.request('get', _TEST_URL) - assert resp.status_code == 200 - assert resp.text == 'body' - assert len(recorder) == 1 - assert recorder[0].method == 'GET' - assert recorder[0].url == _TEST_URL - -def test_http_client_custom_session(): - session = requests.Session() - client = _http_client.HttpClient(session=session) - assert client.session is session - assert client.base_url == '' - recorder = _instrument(client, 'body') - resp = client.request('get', _TEST_URL) - assert resp.status_code == 200 - assert resp.text == 'body' - assert len(recorder) == 1 - assert recorder[0].method == 'GET' - assert recorder[0].url == _TEST_URL - -def test_base_url(): - client = _http_client.HttpClient(base_url=_TEST_URL) - assert client.session is not None - assert client.base_url == _TEST_URL - recorder = _instrument(client, 'body') - resp = client.request('get', 'foo') - assert resp.status_code == 200 - assert resp.text == 'body' - assert len(recorder) == 1 - assert recorder[0].method == 'GET' - assert recorder[0].url == _TEST_URL + 'foo' - -def test_metrics_headers(): - client = _http_client.HttpClient() - assert client.session is not None - recorder = _instrument(client, 'body') - resp = client.request('get', _TEST_URL) - assert resp.status_code == 200 - assert resp.text == 'body' - assert len(recorder) == 1 - assert recorder[0].method == 'GET' - assert recorder[0].url == _TEST_URL - assert recorder[0].headers['x-goog-api-client'] == _utils.get_metrics_header() - -def test_metrics_headers_with_credentials(): - client = _http_client.HttpClient( - credential=testutils.MockGoogleCredential()) - assert client.session is not None - recorder = _instrument(client, 'body') - resp = client.request('get', _TEST_URL) - assert resp.status_code == 200 - assert resp.text == 'body' - assert len(recorder) == 1 - assert recorder[0].method == 'GET' - assert recorder[0].url == _TEST_URL - expected_metrics_header = _utils.get_metrics_header() + ' mock-cred-metric-tag' - assert recorder[0].headers['x-goog-api-client'] == expected_metrics_header - -def test_credential(): - client = _http_client.HttpClient( - credential=testutils.MockGoogleCredential()) - assert client.session is not None - recorder = _instrument(client, 'body') - resp = client.request('get', _TEST_URL) - assert resp.status_code == 200 - assert resp.text == 'body' - assert len(recorder) == 1 - assert recorder[0].method == 'GET' - assert recorder[0].url == _TEST_URL - assert recorder[0].headers['Authorization'] == 'Bearer mock-token' - -@pytest.mark.parametrize('options, timeout', [ - ({}, _http_client.DEFAULT_TIMEOUT_SECONDS), - ({'timeout': 7}, 7), - ({'timeout': 0}, 0), - ({'timeout': None}, None), -]) -def test_timeout(options, timeout): - client = _http_client.HttpClient(**options) - assert client.timeout == timeout - recorder = _instrument(client, 'body') - client.request('get', _TEST_URL) - assert len(recorder) == 1 - if timeout is None: - assert recorder[0]._extra_kwargs['timeout'] is None - else: - assert recorder[0]._extra_kwargs['timeout'] == pytest.approx(timeout, 0.001) - - -def _instrument(client, payload, status=200): - recorder = [] - adapter = testutils.MockAdapter(payload, status, recorder) - client.session.mount(_TEST_URL, adapter) - return recorder +class TestHttpClient: + def test_http_client_default_session(self): + client = _http_client.HttpClient() + assert client.session is not None + assert client.base_url == '' + recorder = self._instrument(client, 'body') + resp = client.request('get', _TEST_URL) + assert resp.status_code == 200 + assert resp.text == 'body' + assert len(recorder) == 1 + assert recorder[0].method == 'GET' + assert recorder[0].url == _TEST_URL + + def test_http_client_custom_session(self): + session = requests.Session() + client = _http_client.HttpClient(session=session) + assert client.session is session + assert client.base_url == '' + recorder = self._instrument(client, 'body') + resp = client.request('get', _TEST_URL) + assert resp.status_code == 200 + assert resp.text == 'body' + assert len(recorder) == 1 + assert recorder[0].method == 'GET' + assert recorder[0].url == _TEST_URL + + def test_base_url(self): + client = _http_client.HttpClient(base_url=_TEST_URL) + assert client.session is not None + assert client.base_url == _TEST_URL + recorder = self._instrument(client, 'body') + resp = client.request('get', 'foo') + assert resp.status_code == 200 + assert resp.text == 'body' + assert len(recorder) == 1 + assert recorder[0].method == 'GET' + assert recorder[0].url == _TEST_URL + 'foo' + + def test_metrics_headers(self): + client = _http_client.HttpClient() + assert client.session is not None + recorder = self._instrument(client, 'body') + resp = client.request('get', _TEST_URL) + assert resp.status_code == 200 + assert resp.text == 'body' + assert len(recorder) == 1 + assert recorder[0].method == 'GET' + assert recorder[0].url == _TEST_URL + assert recorder[0].headers['x-goog-api-client'] == _utils.get_metrics_header() + + def test_metrics_headers_with_credentials(self): + client = _http_client.HttpClient( + credential=testutils.MockGoogleCredential()) + assert client.session is not None + recorder = self._instrument(client, 'body') + resp = client.request('get', _TEST_URL) + assert resp.status_code == 200 + assert resp.text == 'body' + assert len(recorder) == 1 + assert recorder[0].method == 'GET' + assert recorder[0].url == _TEST_URL + expected_metrics_header = _utils.get_metrics_header() + ' mock-cred-metric-tag' + assert recorder[0].headers['x-goog-api-client'] == expected_metrics_header + + def test_credential(self): + client = _http_client.HttpClient( + credential=testutils.MockGoogleCredential()) + assert client.session is not None + recorder = self._instrument(client, 'body') + resp = client.request('get', _TEST_URL) + assert resp.status_code == 200 + assert resp.text == 'body' + assert len(recorder) == 1 + assert recorder[0].method == 'GET' + assert recorder[0].url == _TEST_URL + assert recorder[0].headers['Authorization'] == 'Bearer mock-token' + + @pytest.mark.parametrize('options, timeout', [ + ({}, _http_client.DEFAULT_TIMEOUT_SECONDS), + ({'timeout': 7}, 7), + ({'timeout': 0}, 0), + ({'timeout': None}, None), + ]) + def test_timeout(self, options, timeout): + client = _http_client.HttpClient(**options) + assert client.timeout == timeout + recorder = self._instrument(client, 'body') + client.request('get', _TEST_URL) + assert len(recorder) == 1 + if timeout is None: + assert recorder[0]._extra_kwargs['timeout'] is None + else: + assert recorder[0]._extra_kwargs['timeout'] == pytest.approx(timeout, 0.001) + + + def _instrument(self, client, payload, status=200): + recorder = [] + adapter = testutils.MockAdapter(payload, status, recorder) + client.session.mount(_TEST_URL, adapter) + return recorder class TestHttpRetry: @@ -183,3 +198,473 @@ def test_no_retry_on_404(self): client.request('get', '/') assert excinfo.value.response.status_code == 404 assert len(self.httpserver.requests) == 1 + +class TestHttpxAsyncClient: + def test_init_default(self, mocker: MockerFixture, default_retry_config: HttpxRetry): + """Test client initialization with default settings (no credentials).""" + + # Mock httpx.AsyncClient and HttpxRetryTransport init to check args passed to them + mock_async_client_init = mocker.patch('httpx.AsyncClient.__init__', return_value=None) + mock_transport_init = mocker.patch( + 'firebase_admin._retry.HttpxRetryTransport.__init__', return_value=None + ) + + client = HttpxAsyncClient() + + assert client.base_url == '' + assert client.timeout == DEFAULT_TIMEOUT_SECONDS + assert client._headers == _http_client.METRICS_HEADERS + assert client._retry_config == default_retry_config + + # Check httpx.AsyncClient call args + _, init_kwargs = mock_async_client_init.call_args + assert init_kwargs.get('http2') is True + assert init_kwargs.get('timeout') == DEFAULT_TIMEOUT_SECONDS + assert init_kwargs.get('headers') == _http_client.METRICS_HEADERS + assert init_kwargs.get('auth') is None + assert 'mounts' in init_kwargs + assert 'http://' in init_kwargs['mounts'] + assert 'https://' in init_kwargs['mounts'] + assert isinstance(init_kwargs['mounts']['http://'], HttpxRetryTransport) + assert isinstance(init_kwargs['mounts']['https://'], HttpxRetryTransport) + + # Check that HttpxRetryTransport was initialized with the default retry config + assert mock_transport_init.call_count >= 1 + _, transport_call_kwargs = mock_transport_init.call_args_list[0] + assert transport_call_kwargs.get('retry') == default_retry_config + assert transport_call_kwargs.get('http2') is True + + def test_init_with_credentials(self, mocker: MockerFixture, default_retry_config: HttpxRetry): + """Test client initialization with credentials.""" + + # Mock GoogleAuthCredentialFlow, httpx.AsyncClient and HttpxRetryTransport init to + # check args passed to them + mock_auth_flow_init = mocker.patch( + 'firebase_admin._http_client.GoogleAuthCredentialFlow.__init__', return_value=None + ) + mock_async_client_init = mocker.patch('httpx.AsyncClient.__init__', return_value=None) + mock_transport_init = mocker.patch( + 'firebase_admin._retry.HttpxRetryTransport.__init__', return_value=None + ) + + mock_credential = testutils.MockGoogleCredential() + client = HttpxAsyncClient(credential=mock_credential) + + assert client.base_url == '' + assert client.timeout == DEFAULT_TIMEOUT_SECONDS + assert client._headers == _http_client.METRICS_HEADERS + assert client._retry_config == default_retry_config + + # Verify GoogleAuthCredentialFlow was initialized with the credential + mock_auth_flow_init.assert_called_once_with(mock_credential) + + # Check httpx.AsyncClient call args + _, init_kwargs = mock_async_client_init.call_args + assert init_kwargs.get('http2') is True + assert init_kwargs.get('timeout') == DEFAULT_TIMEOUT_SECONDS + assert init_kwargs.get('headers') == _http_client.METRICS_HEADERS + assert isinstance(init_kwargs.get('auth'), GoogleAuthCredentialFlow) + assert 'mounts' in init_kwargs + assert 'http://' in init_kwargs['mounts'] + assert 'https://' in init_kwargs['mounts'] + assert isinstance(init_kwargs['mounts']['http://'], HttpxRetryTransport) + assert isinstance(init_kwargs['mounts']['https://'], HttpxRetryTransport) + + # Check that HttpxRetryTransport was initialized with the default retry config + assert mock_transport_init.call_count >= 1 + _, transport_call_kwargs = mock_transport_init.call_args_list[0] + assert transport_call_kwargs.get('retry') == default_retry_config + assert transport_call_kwargs.get('http2') is True + + def test_init_with_custom_settings(self, mocker: MockerFixture): + """Test client initialization with custom settings.""" + + # Mock httpx.AsyncClient and HttpxRetryTransport init to check args passed to them + mock_auth_flow_init = mocker.patch( + 'firebase_admin._http_client.GoogleAuthCredentialFlow.__init__', return_value=None + ) + mock_async_client_init = mocker.patch('httpx.AsyncClient.__init__', return_value=None) + mock_transport_init = mocker.patch( + 'firebase_admin._retry.HttpxRetryTransport.__init__', return_value=None + ) + + mock_credential = testutils.MockGoogleCredential() + headers = {'X-Custom': 'Test'} + custom_retry = HttpxRetry(max_retries=1, status_forcelist=[429], backoff_factor=0) + timeout = 60 + http2 = False + + expected_headers = {**headers, **_http_client.METRICS_HEADERS} + + client = HttpxAsyncClient( + credential=mock_credential, base_url=_TEST_URL, headers=headers, + retry_config=custom_retry, timeout=timeout, http2=http2) + + assert client.base_url == _TEST_URL + assert client._headers == expected_headers + assert client._retry_config == custom_retry + assert client.timeout == timeout + + # Verify GoogleAuthCredentialFlow was initialized with the credential + mock_auth_flow_init.assert_called_once_with(mock_credential) + # Verify original headers are not mutated + assert headers == {'X-Custom': 'Test'} + + # Check httpx.AsyncClient call args + _, init_kwargs = mock_async_client_init.call_args + assert init_kwargs.get('http2') is False + assert init_kwargs.get('timeout') == timeout + assert init_kwargs.get('headers') == expected_headers + assert isinstance(init_kwargs.get('auth'), GoogleAuthCredentialFlow) + assert 'mounts' in init_kwargs + assert 'http://' in init_kwargs['mounts'] + assert 'https://' in init_kwargs['mounts'] + assert isinstance(init_kwargs['mounts']['http://'], HttpxRetryTransport) + assert isinstance(init_kwargs['mounts']['https://'], HttpxRetryTransport) + + # Check that HttpxRetryTransport was initialized with the default retry config + assert mock_transport_init.call_count >= 1 + _, transport_call_kwargs = mock_transport_init.call_args_list[0] + assert transport_call_kwargs.get('retry') == custom_retry + assert transport_call_kwargs.get('http2') is False + + + @respx.mock + @pytest.mark.asyncio + async def test_request(self): + """Test client request.""" + + client = HttpxAsyncClient() + + responses = [ + respx.MockResponse(200, http_version='HTTP/2', content='body'), + ] + route = respx.request('POST', _TEST_URL).mock(side_effect=responses) + + resp = await client.request('post', _TEST_URL) + assert resp.status_code == 200 + assert resp.text == 'body' + assert route.call_count == 1 + + request = route.calls.last.request + assert request.method == 'POST' + assert request.url == _TEST_URL + self.check_headers(request.headers, has_auth=False) + + @respx.mock + @pytest.mark.asyncio + async def test_request_raise_for_status(self): + """Test client request raise for status error.""" + + client = HttpxAsyncClient() + + responses = [ + respx.MockResponse(404, http_version='HTTP/2', content='Status error'), + ] + route = respx.request('POST', _TEST_URL).mock(side_effect=responses) + + with pytest.raises(httpx.HTTPStatusError) as exc_info: + resp = await client.request('post', _TEST_URL) + resp = exc_info.value.response + assert resp.status_code == 404 + assert resp.text == 'Status error' + assert route.call_count == 1 + + request = route.calls.last.request + assert request.method == 'POST' + assert request.url == _TEST_URL + self.check_headers(request.headers, has_auth=False) + + + @respx.mock + @pytest.mark.asyncio + async def test_request_with_base_url(self): + """Test client request with base_url.""" + + client = HttpxAsyncClient(base_url=_TEST_URL) + + url_extension = 'post/123' + responses = [ + respx.MockResponse(200, http_version='HTTP/2', content='body'), + ] + route = respx.request('POST', _TEST_URL + url_extension).mock(side_effect=responses) + + resp = await client.request('POST', url_extension) + assert resp.status_code == 200 + assert resp.text == 'body' + assert route.call_count == 1 + + request = route.calls.last.request + assert request.method == 'POST' + assert request.url == _TEST_URL + url_extension + self.check_headers(request.headers, has_auth=False) + + @respx.mock + @pytest.mark.asyncio + async def test_request_with_timeout(self): + """Test client request with timeout.""" + + timeout = 60 + client = HttpxAsyncClient(timeout=timeout) + responses = [ + respx.MockResponse(200, http_version='HTTP/2', content='body'), + ] + route = respx.request('POST', _TEST_URL).mock(side_effect=responses) + + resp = await client.request('POST', _TEST_URL) + assert resp.status_code == 200 + assert resp.text == 'body' + assert route.call_count == 1 + + request = route.calls.last.request + assert request.method == 'POST' + assert request.url == _TEST_URL + self.check_headers(request.headers, has_auth=False) + + @respx.mock + @pytest.mark.asyncio + async def test_request_with_credential(self): + """Test client request with credentials.""" + + mock_credential = testutils.MockGoogleCredential() + client = HttpxAsyncClient(credential=mock_credential) + + responses = [ + respx.MockResponse(200, http_version='HTTP/2', content='test'), + ] + route = respx.request('POST', _TEST_URL).mock(side_effect=responses) + + resp = await client.request('post', _TEST_URL) + + assert resp.status_code == 200 + assert resp.text == 'test' + assert route.call_count == 1 + + request = route.calls.last.request + assert request.method == 'POST' + assert request.url == _TEST_URL + self.check_headers(request.headers) + + @respx.mock + @pytest.mark.asyncio + async def test_request_with_headers(self): + """Test client request with credentials.""" + + mock_credential = testutils.MockGoogleCredential() + headers = httpx.Headers({'X-Custom': 'Test'}) + client = HttpxAsyncClient(credential=mock_credential, headers=headers) + + responses = [ + respx.MockResponse(200, http_version='HTTP/2', content='body'), + ] + route = respx.request('POST', _TEST_URL).mock(side_effect=responses) + + resp = await client.request('post', _TEST_URL) + assert resp.status_code == 200 + assert resp.text == 'body' + assert route.call_count == 1 + + request = route.calls.last.request + assert request.method == 'POST' + assert request.url == _TEST_URL + self.check_headers(request.headers, expected_headers=headers) + + + @respx.mock + @pytest.mark.asyncio + async def test_response_get_headers(self): + """Test the headers() helper method.""" + + client = HttpxAsyncClient() + expected_headers = {'X-Custom': 'Test'} + + responses = [ + respx.MockResponse(200, http_version='HTTP/2', headers=expected_headers), + ] + route = respx.request('POST', _TEST_URL).mock(side_effect=responses) + + headers = await client.headers('post', _TEST_URL) + + self.check_headers( + headers, expected_headers=expected_headers, has_auth=False, has_metrics=False + ) + assert route.call_count == 1 + + request = route.calls.last.request + assert request.method == 'POST' + assert request.url == _TEST_URL + self.check_headers(request.headers, has_auth=False) + + @respx.mock + @pytest.mark.asyncio + async def test_response_get_body_and_response(self): + """Test the body_and_response() helper method.""" + + client = HttpxAsyncClient() + expected_body = {'key': 'value'} + + responses = [ + respx.MockResponse(200, http_version='HTTP/2', json=expected_body), + ] + route = respx.request('POST', _TEST_URL).mock(side_effect=responses) + + body, resp = await client.body_and_response('post', _TEST_URL) + + assert resp.status_code == 200 + assert body == expected_body + assert route.call_count == 1 + + request = route.calls.last.request + assert request.method == 'POST' + assert request.url == _TEST_URL + self.check_headers(request.headers, has_auth=False) + + + @respx.mock + @pytest.mark.asyncio + async def test_response_get_body(self): + """Test the body() helper method.""" + + client = HttpxAsyncClient() + expected_body = {'key': 'value'} + + responses = [ + respx.MockResponse(200, http_version='HTTP/2', json=expected_body), + ] + route = respx.request('POST', _TEST_URL).mock(side_effect=responses) + + body = await client.body('post', _TEST_URL) + + assert body == expected_body + assert route.call_count == 1 + + request = route.calls.last.request + assert request.method == 'POST' + assert request.url == _TEST_URL + self.check_headers(request.headers, has_auth=False) + + @respx.mock + @pytest.mark.asyncio + async def test_response_get_headers_and_body(self): + """Test the headers_and_body() helper method.""" + + client = HttpxAsyncClient() + expected_headers = {'X-Custom': 'Test'} + expected_body = {'key': 'value'} + + responses = [ + respx.MockResponse( + 200, http_version='HTTP/2', json=expected_body, headers=expected_headers), + ] + route = respx.request('POST', _TEST_URL).mock(side_effect=responses) + + headers, body = await client.headers_and_body('post', _TEST_URL) + + assert body == expected_body + self.check_headers( + headers, expected_headers=expected_headers, has_auth=False, has_metrics=False + ) + assert route.call_count == 1 + + request = route.calls.last.request + assert request.method == 'POST' + assert request.url == _TEST_URL + self.check_headers(request.headers, has_auth=False) + + @pytest.mark.asyncio + async def test_aclose(self): + """Test that aclose calls the underlying client's aclose.""" + + client = HttpxAsyncClient() + assert client._async_client.is_closed is False + await client.aclose() + assert client._async_client.is_closed is True + + + def check_headers( + self, + headers: Union[httpx.Headers, Dict[str, str]], + expected_headers: Optional[Union[httpx.Headers, Dict[str, str]]] = None, + has_auth: bool = True, + has_metrics: bool = True + ): + if expected_headers: + for header_key in expected_headers.keys(): + assert header_key in headers + assert headers.get(header_key) == expected_headers.get(header_key) + + if has_auth: + assert 'Authorization' in headers + assert headers.get('Authorization') == 'Bearer mock-token' + + if has_metrics: + for header_key in _http_client.METRICS_HEADERS: + assert header_key in headers + expected_metrics_header = _http_client.METRICS_HEADERS.get(header_key, '') + if has_auth: + expected_metrics_header += ' mock-cred-metric-tag' + assert headers.get(header_key) == expected_metrics_header + + +class TestGoogleAuthCredentialFlow: + + @respx.mock + @pytest.mark.asyncio + async def test_auth_headers_retry(self): + """Test invalid credential retry.""" + + mock_credential = testutils.MockGoogleCredential() + client = HttpxAsyncClient(credential=mock_credential) + + responses = [ + respx.MockResponse(401, http_version='HTTP/2', content='Auth error'), + respx.MockResponse(401, http_version='HTTP/2', content='Auth error'), + respx.MockResponse(200, http_version='HTTP/2', content='body'), + ] + route = respx.request('POST', _TEST_URL).mock(side_effect=responses) + + resp = await client.request('post', _TEST_URL) + assert resp.status_code == 200 + assert resp.text == 'body' + assert route.call_count == 3 + + request = route.calls.last.request + assert request.method == 'POST' + assert request.url == _TEST_URL + headers = request.headers + assert 'Authorization' in headers + assert headers.get('Authorization') == 'Bearer mock-token' + + @respx.mock + @pytest.mark.asyncio + async def test_auth_headers_retry_exhausted(self, mocker: MockerFixture): + """Test invalid credential retry exhausted.""" + + mock_credential = testutils.MockGoogleCredential() + mock_credential_patch = mocker.spy(mock_credential, 'refresh') + client = HttpxAsyncClient(credential=mock_credential) + + responses = [ + respx.MockResponse(401, http_version='HTTP/2', content='Auth error'), + respx.MockResponse(401, http_version='HTTP/2', content='Auth error'), + respx.MockResponse(401, http_version='HTTP/2', content='Auth error'), + # Should stop after previous response + respx.MockResponse(200, http_version='HTTP/2', content='body'), + ] + route = respx.request('POST', _TEST_URL).mock(side_effect=responses) + + with pytest.raises(httpx.HTTPStatusError) as exc_info: + resp = await client.request('post', _TEST_URL) + resp = exc_info.value.response + assert resp.status_code == 401 + assert resp.text == 'Auth error' + assert route.call_count == 3 + + assert mock_credential_patch.call_count == 3 + + request = route.calls.last.request + assert request.method == 'POST' + assert request.url == _TEST_URL + headers = request.headers + assert 'Authorization' in headers + assert headers.get('Authorization') == 'Bearer mock-token' From e4550ec3cf3badc70b83b75c4b7c3a16a5287837 Mon Sep 17 00:00:00 2001 From: jonathanedey Date: Tue, 27 May 2025 10:54:25 -0400 Subject: [PATCH 4/5] Add `send_each_for_multicast_async` to `__all__` --- firebase_admin/messaging.py | 1 + 1 file changed, 1 insertion(+) diff --git a/firebase_admin/messaging.py b/firebase_admin/messaging.py index 069d92939..ae17dbb32 100644 --- a/firebase_admin/messaging.py +++ b/firebase_admin/messaging.py @@ -77,6 +77,7 @@ 'send_each', 'send_each_async', 'send_each_for_multicast', + 'send_each_for_multicast_async', 'subscribe_to_topic', 'unsubscribe_from_topic', ] From 9b6e95e28422f1e2b9ba6c3453a83bd6bc6e05ac Mon Sep 17 00:00:00 2001 From: jonathanedey Date: Wed, 28 May 2025 12:49:49 -0400 Subject: [PATCH 5/5] Apply suggestions from TW review --- firebase_admin/_http_client.py | 6 +++--- firebase_admin/messaging.py | 14 +++++++------- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/firebase_admin/_http_client.py b/firebase_admin/_http_client.py index a6d0a5bef..6d2582291 100644 --- a/firebase_admin/_http_client.py +++ b/firebase_admin/_http_client.py @@ -177,8 +177,8 @@ def apply_auth_headers( request: httpx.Request, auth_request: google_auth_requests.Request ) -> None: - """A helper function to refreshes credentials if needed and mutates the request headers to - contain access token and any other google auth headers.""" + """A helper function that refreshes credentials if needed and mutates the request headers + to contain access token and any other Google Auth headers.""" logger.debug( 'Attempting to apply auth headers. Credential validity before: %s', @@ -240,7 +240,7 @@ def auth_flow(self, request: httpx.Request) -> Generator[httpx.Request, httpx.Re class HttpxAsyncClient(): """Async HTTP client used to make HTTP/2 calls using HTTPX. - HttpxAsyncClient maintains an async HTTPX client, and handles request authentication and retries + HttpxAsyncClient maintains an async HTTPX client, handles request authentication, and retries if necessary. """ def __init__( diff --git a/firebase_admin/messaging.py b/firebase_admin/messaging.py index ae17dbb32..4a69da725 100644 --- a/firebase_admin/messaging.py +++ b/firebase_admin/messaging.py @@ -115,7 +115,7 @@ def send(message: Message, dry_run: bool = False, app: Optional[App] = None) -> """Sends the given message via Firebase Cloud Messaging (FCM). If the ``dry_run`` mode is enabled, the message will not be actually delivered to the - recipients. Instead FCM performs all the usual validations, and emulates the send operation. + recipients. Instead, FCM performs all the usual validations and emulates the send operation. Args: message: An instance of ``messaging.Message``. @@ -139,7 +139,7 @@ def send_each( """Sends each message in the given list via Firebase Cloud Messaging. If the ``dry_run`` mode is enabled, the message will not be actually delivered to the - recipients. Instead FCM performs all the usual validations, and emulates the send operation. + recipients. Instead, FCM performs all the usual validations and emulates the send operation. Args: messages: A list of ``messaging.Message`` instances. @@ -163,7 +163,7 @@ async def send_each_async( """Sends each message in the given list asynchronously via Firebase Cloud Messaging. If the ``dry_run`` mode is enabled, the message will not be actually delivered to the - recipients. Instead FCM performs all the usual validations, and emulates the send operation. + recipients. Instead, FCM performs all the usual validations and emulates the send operation. Args: messages: A list of ``messaging.Message`` instances. @@ -188,7 +188,7 @@ async def send_each_for_multicast_async( (FCM). If the ``dry_run`` mode is enabled, the message will not be actually delivered to the - recipients. Instead FCM performs all the usual validations, and emulates the send operation. + recipients. Instead, FCM performs all the usual validations and emulates the send operation. Args: multicast_message: An instance of ``messaging.MulticastMessage``. @@ -219,7 +219,7 @@ def send_each_for_multicast(multicast_message, dry_run=False, app=None): """Sends the given mutlicast message to each token via Firebase Cloud Messaging (FCM). If the ``dry_run`` mode is enabled, the message will not be actually delivered to the - recipients. Instead FCM performs all the usual validations, and emulates the send operation. + recipients. Instead, FCM performs all the usual validations and emulates the send operation. Args: multicast_message: An instance of ``messaging.MulticastMessage``. @@ -250,7 +250,7 @@ def send_all(messages, dry_run=False, app=None): """Sends the given list of messages via Firebase Cloud Messaging as a single batch. If the ``dry_run`` mode is enabled, the message will not be actually delivered to the - recipients. Instead FCM performs all the usual validations, and emulates the send operation. + recipients. Instead, FCM performs all the usual validations and emulates the send operation. Args: messages: A list of ``messaging.Message`` instances. @@ -273,7 +273,7 @@ def send_multicast(multicast_message, dry_run=False, app=None): """Sends the given mutlicast message to all tokens via Firebase Cloud Messaging (FCM). If the ``dry_run`` mode is enabled, the message will not be actually delivered to the - recipients. Instead FCM performs all the usual validations, and emulates the send operation. + recipients. Instead, FCM performs all the usual validations and emulates the send operation. Args: multicast_message: An instance of ``messaging.MulticastMessage``.