diff --git a/firebase_admin/_auth_utils.py b/firebase_admin/_auth_utils.py index 08b930ae6..c7b6c15f1 100644 --- a/firebase_admin/_auth_utils.py +++ b/firebase_admin/_auth_utils.py @@ -216,7 +216,7 @@ class InvalidIdTokenError(exceptions.InvalidArgumentError): default_message = 'The provided ID token is invalid' - def __init__(self, message, cause, http_response=None): + def __init__(self, message, cause=None, http_response=None): exceptions.InvalidArgumentError.__init__(self, message, cause, http_response) diff --git a/firebase_admin/_token_gen.py b/firebase_admin/_token_gen.py index 0ea34f77c..1d4556939 100644 --- a/firebase_admin/_token_gen.py +++ b/firebase_admin/_token_gen.py @@ -217,12 +217,18 @@ def __init__(self, app): project_id=app.project_id, short_name='ID token', operation='verify_id_token()', doc_url='https://firebase.google.com/docs/auth/admin/verify-id-tokens', - cert_url=ID_TOKEN_CERT_URI, issuer=ID_TOKEN_ISSUER_PREFIX) + cert_url=ID_TOKEN_CERT_URI, + issuer=ID_TOKEN_ISSUER_PREFIX, + invalid_token_error=_auth_utils.InvalidIdTokenError, + expired_token_error=ExpiredIdTokenError) self.cookie_verifier = _JWTVerifier( project_id=app.project_id, short_name='session cookie', operation='verify_session_cookie()', doc_url='https://firebase.google.com/docs/auth/admin/verify-id-tokens', - cert_url=COOKIE_CERT_URI, issuer=COOKIE_ISSUER_PREFIX) + cert_url=COOKIE_CERT_URI, + issuer=COOKIE_ISSUER_PREFIX, + invalid_token_error=InvalidSessionCookieError, + expired_token_error=ExpiredSessionCookieError) def verify_id_token(self, id_token): return self.id_token_verifier.verify(id_token, self.request) @@ -245,6 +251,8 @@ def __init__(self, **kwargs): self.articled_short_name = 'an {0}'.format(self.short_name) else: self.articled_short_name = 'a {0}'.format(self.short_name) + self._invalid_token_error = kwargs.pop('invalid_token_error') + self._expired_token_error = kwargs.pop('expired_token_error') def verify(self, token, request): """Verifies the signature and data for the provided JWT.""" @@ -261,8 +269,7 @@ def verify(self, token, request): 'or set your Firebase project ID as an app option. Alternatively set the ' 'GOOGLE_CLOUD_PROJECT environment variable.'.format(self.operation)) - header = jwt.decode_header(token) - payload = jwt.decode(token, verify=False) + header, payload = self._decode_unverified(token) issuer = payload.get('iss') audience = payload.get('aud') subject = payload.get('sub') @@ -275,12 +282,12 @@ def verify(self, token, request): 'See {0} for details on how to retrieve {1}.'.format(self.url, self.short_name)) error_message = None - if not header.get('kid'): - if audience == FIREBASE_AUDIENCE: - error_message = ( - '{0} expects {1}, but was given a custom ' - 'token.'.format(self.operation, self.articled_short_name)) - elif header.get('alg') == 'HS256' and payload.get( + if audience == FIREBASE_AUDIENCE: + error_message = ( + '{0} expects {1}, but was given a custom ' + 'token.'.format(self.operation, self.articled_short_name)) + elif not header.get('kid'): + if header.get('alg') == 'HS256' and payload.get( 'v') is 0 and 'uid' in payload.get('d', {}): error_message = ( '{0} expects {1}, but was given a legacy custom ' @@ -315,15 +322,30 @@ def verify(self, token, request): '{1}'.format(self.short_name, verify_id_token_msg)) if error_message: - raise ValueError(error_message) + raise self._invalid_token_error(error_message) + + try: + verified_claims = google.oauth2.id_token.verify_token( + token, + request=request, + audience=self.project_id, + certs_url=self.cert_url) + verified_claims['uid'] = verified_claims['sub'] + return verified_claims + except google.auth.exceptions.TransportError as error: + raise CertificateFetchError(str(error), cause=error) + except ValueError as error: + if 'Token expired' in str(error): + raise self._expired_token_error(str(error), cause=error) + raise self._invalid_token_error(str(error), cause=error) - verified_claims = google.oauth2.id_token.verify_token( - token, - request=request, - audience=self.project_id, - certs_url=self.cert_url) - verified_claims['uid'] = verified_claims['sub'] - return verified_claims + def _decode_unverified(self, token): + try: + header = jwt.decode_header(token) + payload = jwt.decode(token, verify=False) + return header, payload + except ValueError as error: + raise self._invalid_token_error(str(error), cause=error) class TokenSignError(exceptions.UnknownError): @@ -331,3 +353,45 @@ class TokenSignError(exceptions.UnknownError): def __init__(self, message, cause): exceptions.UnknownError.__init__(self, message, cause) + + +class CertificateFetchError(exceptions.UnknownError): + """Failed to fetch some public key certificates required to verify a token.""" + + def __init__(self, message, cause): + exceptions.UnknownError.__init__(self, message, cause) + + +class ExpiredIdTokenError(_auth_utils.InvalidIdTokenError): + """The provided ID token is expired.""" + + def __init__(self, message, cause): + _auth_utils.InvalidIdTokenError.__init__(self, message, cause) + + +class RevokedIdTokenError(_auth_utils.InvalidIdTokenError): + """The provided ID token has been revoked.""" + + def __init__(self, message): + _auth_utils.InvalidIdTokenError.__init__(self, message) + + +class InvalidSessionCookieError(exceptions.InvalidArgumentError): + """The provided string is not a valid Firebase session cookie.""" + + def __init__(self, message, cause=None): + exceptions.InvalidArgumentError.__init__(self, message, cause) + + +class ExpiredSessionCookieError(InvalidSessionCookieError): + """The provided session cookie is expired.""" + + def __init__(self, message, cause): + InvalidSessionCookieError.__init__(self, message, cause) + + +class RevokedSessionCookieError(InvalidSessionCookieError): + """The provided session cookie has been revoked.""" + + def __init__(self, message): + InvalidSessionCookieError.__init__(self, message) diff --git a/firebase_admin/auth.py b/firebase_admin/auth.py index 0913e50f7..bbc7c613a 100644 --- a/firebase_admin/auth.py +++ b/firebase_admin/auth.py @@ -31,20 +31,23 @@ _AUTH_ATTRIBUTE = '_auth' -_ID_TOKEN_REVOKED = 'ID_TOKEN_REVOKED' -_SESSION_COOKIE_REVOKED = 'SESSION_COOKIE_REVOKED' __all__ = [ 'ActionCodeSettings', - 'AuthError', + 'CertificateFetchError', 'DELETE_ATTRIBUTE', 'ErrorInfo', + 'ExpiredIdTokenError', + 'ExpiredSessionCookieError', 'ExportedUserRecord', 'ImportUserRecord', 'InvalidDynamicLinkDomainError', 'InvalidIdTokenError', + 'InvalidSessionCookieError', 'ListUsersPage', + 'RevokedIdTokenError', + 'RevokedSessionCookieError', 'TokenSignError', 'UidAlreadyExistsError', 'UnexpectedResponseError', @@ -76,17 +79,23 @@ ] ActionCodeSettings = _user_mgt.ActionCodeSettings +CertificateFetchError = _token_gen.CertificateFetchError DELETE_ATTRIBUTE = _user_mgt.DELETE_ATTRIBUTE ErrorInfo = _user_import.ErrorInfo +ExpiredIdTokenError = _token_gen.ExpiredIdTokenError +ExpiredSessionCookieError = _token_gen.ExpiredSessionCookieError ExportedUserRecord = _user_mgt.ExportedUserRecord -ListUsersPage = _user_mgt.ListUsersPage -UserImportHash = _user_import.UserImportHash ImportUserRecord = _user_import.ImportUserRecord InvalidDynamicLinkDomainError = _auth_utils.InvalidDynamicLinkDomainError InvalidIdTokenError = _auth_utils.InvalidIdTokenError +InvalidSessionCookieError = _token_gen.InvalidSessionCookieError +ListUsersPage = _user_mgt.ListUsersPage +RevokedIdTokenError = _token_gen.RevokedIdTokenError +RevokedSessionCookieError = _token_gen.RevokedSessionCookieError TokenSignError = _token_gen.TokenSignError UidAlreadyExistsError = _auth_utils.UidAlreadyExistsError UnexpectedResponseError = _auth_utils.UnexpectedResponseError +UserImportHash = _user_import.UserImportHash UserImportResult = _user_import.UserImportResult UserInfo = _user_mgt.UserInfo UserMetadata = _user_mgt.UserMetadata @@ -149,9 +158,12 @@ def verify_id_token(id_token, app=None, check_revoked=False): dict: A dictionary of key-value pairs parsed from the decoded JWT. Raises: - ValueError: If the JWT was found to be invalid, or if the App's project ID cannot - be determined. - AuthError: If ``check_revoked`` is requested and the token was revoked. + ValueError: If ``id_token`` is a not a string or is empty. + InvalidIdTokenError: If ``id_token`` is not a valid Firebase ID token. + ExpiredIdTokenError: If the specified ID token has expired. + RevokedIdTokenError: If ``check_revoked`` is ``True`` and the ID token has been revoked. + CertificateFetchError: If an error occurs while fetching the public key certificates + required to verify the ID token. """ if not isinstance(check_revoked, bool): # guard against accidental wrong assignment. @@ -160,7 +172,7 @@ def verify_id_token(id_token, app=None, check_revoked=False): token_verifier = _get_auth_service(app).token_verifier verified_claims = token_verifier.verify_id_token(id_token) if check_revoked: - _check_jwt_revoked(verified_claims, _ID_TOKEN_REVOKED, 'ID token', app) + _check_jwt_revoked(verified_claims, RevokedIdTokenError, 'ID token', app) return verified_claims @@ -201,14 +213,17 @@ def verify_session_cookie(session_cookie, check_revoked=False, app=None): dict: A dictionary of key-value pairs parsed from the decoded JWT. Raises: - ValueError: If the cookie was found to be invalid, or if the App's project ID cannot - be determined. - AuthError: If ``check_revoked`` is requested and the cookie was revoked. + ValueError: If ``session_cookie`` is a not a string or is empty. + InvalidSessionCookieError: If ``session_cookie`` is not a valid Firebase session cookie. + ExpiredSessionCookieError: If the specified session cookie has expired. + RevokedSessionCookieError: If ``check_revoked`` is ``True`` and the cookie has been revoked. + CertificateFetchError: If an error occurs while fetching the public key certificates + required to verify the session cookie. """ token_verifier = _get_auth_service(app).token_verifier verified_claims = token_verifier.verify_session_cookie(session_cookie) if check_revoked: - _check_jwt_revoked(verified_claims, _SESSION_COOKIE_REVOKED, 'session cookie', app) + _check_jwt_revoked(verified_claims, RevokedSessionCookieError, 'session cookie', app) return verified_claims @@ -513,19 +528,10 @@ def generate_sign_in_with_email_link(email, action_code_settings, app=None): 'EMAIL_SIGNIN', email, action_code_settings=action_code_settings) -def _check_jwt_revoked(verified_claims, error_code, label, app): +def _check_jwt_revoked(verified_claims, exc_type, label, app): user = get_user(verified_claims.get('uid'), app=app) if verified_claims.get('iat') * 1000 < user.tokens_valid_after_timestamp: - raise AuthError(error_code, 'The Firebase {0} has been revoked.'.format(label)) - - -class AuthError(Exception): - """Represents an Exception encountered while invoking the Firebase auth API.""" - - def __init__(self, code, message, error=None): - Exception.__init__(self, message) - self.code = code - self.detail = error + raise exc_type('The Firebase {0} has been revoked.'.format(label)) class _AuthService(object): diff --git a/integration/test_auth.py b/integration/test_auth.py index c0149fd69..eb1464476 100644 --- a/integration/test_auth.py +++ b/integration/test_auth.py @@ -351,9 +351,8 @@ def test_verify_id_token_revoked(new_user, api_key): # verify_id_token succeeded because it didn't check revoked. assert claims['iat'] * 1000 < user.tokens_valid_after_timestamp - with pytest.raises(auth.AuthError) as excinfo: + with pytest.raises(auth.RevokedIdTokenError) as excinfo: claims = auth.verify_id_token(id_token, check_revoked=True) - assert excinfo.value.code == auth._ID_TOKEN_REVOKED assert str(excinfo.value) == 'The Firebase ID token has been revoked.' # Sign in again, verify works. @@ -373,9 +372,8 @@ def test_verify_session_cookie_revoked(new_user, api_key): # verify_session_cookie succeeded because it didn't check revoked. assert claims['iat'] * 1000 < user.tokens_valid_after_timestamp - with pytest.raises(auth.AuthError) as excinfo: + with pytest.raises(auth.RevokedSessionCookieError) as excinfo: claims = auth.verify_session_cookie(session_cookie, check_revoked=True) - assert excinfo.value.code == auth._SESSION_COOKIE_REVOKED assert str(excinfo.value) == 'The Firebase session cookie has been revoked.' # Sign in again, verify works. diff --git a/snippets/auth/index.py b/snippets/auth/index.py index 5bfe21f8e..552875696 100644 --- a/snippets/auth/index.py +++ b/snippets/auth/index.py @@ -24,6 +24,7 @@ # [END import_sdk] from firebase_admin import credentials from firebase_admin import auth +from firebase_admin import exceptions sys.path.append("lib") @@ -31,6 +32,7 @@ def initialize_sdk_with_service_account(): # [START initialize_sdk_with_service_account] import firebase_admin from firebase_admin import credentials + from firebase_admin import exceptions cred = credentials.Certificate('path/to/serviceAccountKey.json') default_app = firebase_admin.initialize_app(cred) @@ -144,13 +146,12 @@ def verify_token_uid_check_revoke(id_token): decoded_token = auth.verify_id_token(id_token, check_revoked=True) # Token is valid and not revoked. uid = decoded_token['uid'] - except auth.AuthError as exc: - if exc.code == 'ID_TOKEN_REVOKED': - # Token revoked, inform the user to reauthenticate or signOut(). - pass - else: - # Token is invalid - pass + except auth.RevokedIdTokenError: + # Token revoked, inform the user to reauthenticate or signOut(). + pass + except auth.InvalidIdTokenError: + # Token is invalid + pass # [END verify_token_id_check_revoked] firebase_admin.delete_app(default_app) return uid @@ -322,7 +323,7 @@ def session_login(): response.set_cookie( 'session', session_cookie, expires=expires, httponly=True, secure=True) return response - except auth.AuthError: + except exceptions.FirebaseError: return flask.abort(401, 'Failed to create a session cookie') # [END session_login] @@ -344,9 +345,9 @@ def check_auth_time(id_token, flask): # User did not sign in recently. To guard against ID token theft, require # re-authentication. return flask.abort(401, 'Recent sign in required') - except ValueError: + except auth.InvalidIdTokenError: return flask.abort(401, 'Invalid ID token') - except auth.AuthError: + except exceptions.FirebaseError: return flask.abort(401, 'Failed to create a session cookie') # [END check_auth_time] @@ -359,16 +360,17 @@ def serve_content_for_user(decoded_claims): @app.route('/profile', methods=['POST']) def access_restricted_content(): session_cookie = flask.request.cookies.get('session') + if not session_cookie: + # Session cookie is unavailable. Force user to login. + return flask.redirect('/login') + # Verify the session cookie. In this case an additional check is added to detect # if the user's Firebase session was revoked, user deleted/disabled, etc. try: decoded_claims = auth.verify_session_cookie(session_cookie, check_revoked=True) return serve_content_for_user(decoded_claims) - except ValueError: - # Session cookie is unavailable or invalid. Force user to login. - return flask.redirect('/login') - except auth.AuthError: - # Session revoked. Force user to login. + except auth.InvalidSessionCookieError: + # Session cookie is invalid, expired or revoked. Force user to login. return flask.redirect('/login') # [END session_verify] @@ -385,11 +387,8 @@ def serve_content_for_admin(decoded_claims): return serve_content_for_admin(decoded_claims) else: return flask.abort(401, 'Insufficient permissions') - except ValueError: - # Session cookie is unavailable or invalid. Force user to login. - return flask.redirect('/login') - except auth.AuthError: - # Session revoked. Force user to login. + except auth.InvalidSessionCookieError: + # Session cookie is invalid, expired or revoked. Force user to login. return flask.redirect('/login') # [END session_verify_with_permission_check] @@ -413,7 +412,7 @@ def session_logout(): response = flask.make_response(flask.redirect('/login')) response.set_cookie('session', expires=0) return response - except ValueError: + except auth.InvalidSessionCookieError: return flask.redirect('/login') # [END session_clear_and_revoke] @@ -444,7 +443,7 @@ def import_users(): result.success_count, result.failure_count)) for err in result.errors: print('Failed to import {0} due to {1}'.format(users[err.index].uid, err.reason)) - except auth.AuthError: + except exceptions.FirebaseError: # Some unrecoverable error occurred that prevented the operation from running. pass # [END import_users] @@ -465,7 +464,7 @@ def import_with_hmac(): result = auth.import_users(users, hash_alg=hash_alg) for err in result.errors: print('Failed to import user:', err.reason) - except auth.AuthError as error: + except exceptions.FirebaseError as error: print('Error importing users:', error) # [END import_with_hmac] @@ -485,7 +484,7 @@ def import_with_pbkdf(): result = auth.import_users(users, hash_alg=hash_alg) for err in result.errors: print('Failed to import user:', err.reason) - except auth.AuthError as error: + except exceptions.FirebaseError as error: print('Error importing users:', error) # [END import_with_pbkdf] @@ -506,7 +505,7 @@ def import_with_standard_scrypt(): result = auth.import_users(users, hash_alg=hash_alg) for err in result.errors: print('Failed to import user:', err.reason) - except auth.AuthError as error: + except exceptions.FirebaseError as error: print('Error importing users:', error) # [END import_with_standard_scrypt] @@ -526,7 +525,7 @@ def import_with_bcrypt(): result = auth.import_users(users, hash_alg=hash_alg) for err in result.errors: print('Failed to import user:', err.reason) - except auth.AuthError as error: + except exceptions.FirebaseError as error: print('Error importing users:', error) # [END import_with_bcrypt] @@ -553,7 +552,7 @@ def import_with_scrypt(): result = auth.import_users(users, hash_alg=hash_alg) for err in result.errors: print('Failed to import user:', err.reason) - except auth.AuthError as error: + except exceptions.FirebaseError as error: print('Error importing users:', error) # [END import_with_scrypt] @@ -583,7 +582,7 @@ def import_without_password(): result = auth.import_users(users) for err in result.errors: print('Failed to import user:', err.reason) - except auth.AuthError as error: + except exceptions.FirebaseError as error: print('Error importing users:', error) # [END import_without_password] diff --git a/tests/test_token_gen.py b/tests/test_token_gen.py index baf8d9515..e016b8fb1 100644 --- a/tests/test_token_gen.py +++ b/tests/test_token_gen.py @@ -46,6 +46,15 @@ INVALID_STRINGS = [None, '', 0, 1, True, False, list(), tuple(), dict()] INVALID_BOOLS = [None, '', 'foo', 0, 1, list(), tuple(), dict()] +INVALID_JWT_ARGS = { + 'NoneToken': None, + 'EmptyToken': '', + 'BoolToken': True, + 'IntToken': 1, + 'ListToken': [], + 'EmptyDictToken': {}, + 'NonEmptyDictToken': {'a': 1}, +} # Fixture for mocking a HTTP server httpserver = plugin.httpserver @@ -363,13 +372,6 @@ class TestVerifyIdToken(object): 'iat': int(time.time()) - 10000, 'exp': int(time.time()) - 3600 }), - 'NoneToken': None, - 'EmptyToken': '', - 'BoolToken': True, - 'IntToken': 1, - 'ListToken': [], - 'EmptyDictToken': {}, - 'NonEmptyDictToken': {'a': 1}, 'BadFormatToken': 'foobar' } @@ -392,9 +394,8 @@ def test_valid_token_check_revoked(self, user_mgt_app, id_token): def test_revoked_token_check_revoked(self, user_mgt_app, revoked_tokens, id_token): _overwrite_cert_request(user_mgt_app, MOCK_REQUEST) _instrument_user_manager(user_mgt_app, 200, revoked_tokens) - with pytest.raises(auth.AuthError) as excinfo: + with pytest.raises(auth.RevokedIdTokenError) as excinfo: auth.verify_id_token(id_token, app=user_mgt_app, check_revoked=True) - assert excinfo.value.code == 'ID_TOKEN_REVOKED' assert str(excinfo.value) == 'The Firebase ID token has been revoked.' @pytest.mark.parametrize('arg', INVALID_BOOLS) @@ -411,11 +412,30 @@ def test_revoked_token_do_not_check_revoked(self, user_mgt_app, revoked_tokens, assert claims['admin'] is True assert claims['uid'] == claims['sub'] + @pytest.mark.parametrize('id_token', INVALID_JWT_ARGS.values(), ids=list(INVALID_JWT_ARGS)) + def test_invalid_arg(self, user_mgt_app, id_token): + _overwrite_cert_request(user_mgt_app, MOCK_REQUEST) + with pytest.raises(ValueError) as excinfo: + auth.verify_id_token(id_token, app=user_mgt_app) + assert 'Illegal ID token provided' in str(excinfo.value) + @pytest.mark.parametrize('id_token', invalid_tokens.values(), ids=list(invalid_tokens)) def test_invalid_token(self, user_mgt_app, id_token): _overwrite_cert_request(user_mgt_app, MOCK_REQUEST) - with pytest.raises(ValueError): + with pytest.raises(auth.InvalidIdTokenError) as excinfo: auth.verify_id_token(id_token, app=user_mgt_app) + assert isinstance(excinfo.value, exceptions.InvalidArgumentError) + assert excinfo.value.http_response is None + + def test_expired_token(self, user_mgt_app): + _overwrite_cert_request(user_mgt_app, MOCK_REQUEST) + id_token = self.invalid_tokens['ExpiredToken'] + with pytest.raises(auth.ExpiredIdTokenError) as excinfo: + auth.verify_id_token(id_token, app=user_mgt_app) + assert isinstance(excinfo.value, auth.InvalidIdTokenError) + assert 'Token expired' in str(excinfo.value) + assert excinfo.value.cause is not None + assert excinfo.value.http_response is None def test_project_id_option(self): app = firebase_admin.initialize_app( @@ -440,13 +460,19 @@ def test_project_id_env_var(self, env_var_app): def test_custom_token(self, auth_app): id_token = auth.create_custom_token(MOCK_UID, app=auth_app) _overwrite_cert_request(auth_app, MOCK_REQUEST) - with pytest.raises(ValueError): + with pytest.raises(auth.InvalidIdTokenError) as excinfo: auth.verify_id_token(id_token, app=auth_app) + message = 'verify_id_token() expects an ID token, but was given a custom token.' + assert str(excinfo.value) == message def test_certificate_request_failure(self, user_mgt_app): _overwrite_cert_request(user_mgt_app, testutils.MockRequest(404, 'not found')) - with pytest.raises(google.auth.exceptions.TransportError): + with pytest.raises(auth.CertificateFetchError) as excinfo: auth.verify_id_token(TEST_ID_TOKEN, app=user_mgt_app) + assert 'Could not fetch certificates' in str(excinfo.value) + assert isinstance(excinfo.value, exceptions.UnknownError) + assert excinfo.value.cause is not None + assert excinfo.value.http_response is None class TestVerifySessionCookie(object): @@ -471,13 +497,6 @@ class TestVerifySessionCookie(object): 'iat': int(time.time()) - 10000, 'exp': int(time.time()) - 3600 }), - 'NoneCookie': None, - 'EmptyCookie': '', - 'BoolCookie': True, - 'IntCookie': 1, - 'ListCookie': [], - 'EmptyDictCookie': {}, - 'NonEmptyDictCookie': {'a': 1}, 'BadFormatCookie': 'foobar', 'IDToken': TEST_ID_TOKEN, } @@ -501,9 +520,8 @@ def test_valid_cookie_check_revoked(self, user_mgt_app, cookie): def test_revoked_cookie_check_revoked(self, user_mgt_app, revoked_tokens, cookie): _overwrite_cert_request(user_mgt_app, MOCK_REQUEST) _instrument_user_manager(user_mgt_app, 200, revoked_tokens) - with pytest.raises(auth.AuthError) as excinfo: + with pytest.raises(auth.RevokedSessionCookieError) as excinfo: auth.verify_session_cookie(cookie, app=user_mgt_app, check_revoked=True) - assert excinfo.value.code == 'SESSION_COOKIE_REVOKED' assert str(excinfo.value) == 'The Firebase session cookie has been revoked.' @pytest.mark.parametrize('cookie', valid_cookies.values(), ids=list(valid_cookies)) @@ -514,11 +532,30 @@ def test_revoked_cookie_does_not_check_revoked(self, user_mgt_app, revoked_token assert claims['admin'] is True assert claims['uid'] == claims['sub'] + @pytest.mark.parametrize('cookie', INVALID_JWT_ARGS.values(), ids=list(INVALID_JWT_ARGS)) + def test_invalid_args(self, user_mgt_app, cookie): + _overwrite_cert_request(user_mgt_app, MOCK_REQUEST) + with pytest.raises(ValueError) as excinfo: + auth.verify_session_cookie(cookie, app=user_mgt_app) + assert 'Illegal session cookie provided' in str(excinfo.value) + @pytest.mark.parametrize('cookie', invalid_cookies.values(), ids=list(invalid_cookies)) def test_invalid_cookie(self, user_mgt_app, cookie): _overwrite_cert_request(user_mgt_app, MOCK_REQUEST) - with pytest.raises(ValueError): + with pytest.raises(auth.InvalidSessionCookieError) as excinfo: auth.verify_session_cookie(cookie, app=user_mgt_app) + assert isinstance(excinfo.value, exceptions.InvalidArgumentError) + assert excinfo.value.http_response is None + + def test_expired_cookie(self, user_mgt_app): + _overwrite_cert_request(user_mgt_app, MOCK_REQUEST) + cookie = self.invalid_cookies['ExpiredCookie'] + with pytest.raises(auth.ExpiredSessionCookieError) as excinfo: + auth.verify_session_cookie(cookie, app=user_mgt_app) + assert isinstance(excinfo.value, auth.InvalidSessionCookieError) + assert 'Token expired' in str(excinfo.value) + assert excinfo.value.cause is not None + assert excinfo.value.http_response is None def test_project_id_option(self): app = firebase_admin.initialize_app( @@ -540,13 +577,17 @@ def test_project_id_env_var(self, env_var_app): def test_custom_token(self, auth_app): custom_token = auth.create_custom_token(MOCK_UID, app=auth_app) _overwrite_cert_request(auth_app, MOCK_REQUEST) - with pytest.raises(ValueError): + with pytest.raises(auth.InvalidSessionCookieError): auth.verify_session_cookie(custom_token, app=auth_app) def test_certificate_request_failure(self, user_mgt_app): _overwrite_cert_request(user_mgt_app, testutils.MockRequest(404, 'not found')) - with pytest.raises(google.auth.exceptions.TransportError): + with pytest.raises(auth.CertificateFetchError) as excinfo: auth.verify_session_cookie(TEST_SESSION_COOKIE, app=user_mgt_app) + assert 'Could not fetch certificates' in str(excinfo.value) + assert isinstance(excinfo.value, exceptions.UnknownError) + assert excinfo.value.cause is not None + assert excinfo.value.http_response is None class TestCertificateCaching(object):