From 0e9e49538a8df767f085fffe09bd5b88970b721e Mon Sep 17 00:00:00 2001 From: Ray Luo Date: Fri, 23 May 2025 12:34:56 -0700 Subject: [PATCH] Implement issuer validation on CCA's aquire_token_for_client() --- msal/application.py | 8 ++++++++ msal/authority.py | 17 ++++++++++++++++- 2 files changed, 24 insertions(+), 1 deletion(-) diff --git a/msal/application.py b/msal/application.py index 24ef91d7..0d817f2d 100644 --- a/msal/application.py +++ b/msal/application.py @@ -2409,6 +2409,14 @@ def acquire_token_for_client(self, scopes, claims_challenge=None, **kwargs): - A successful response would contain "access_token" key, - an error response would contain "error" and usually "error_description". """ + if not self.authority.is_valid_issuer(): + return { + "error": "invalid_issuer", + "error_description": ( + "The issuer '{}' does not match this authority. " + "No token request will be sent." + ).format(self.authority._issuer), + } if kwargs.get("force_refresh"): raise ValueError( # We choose to disallow force_refresh "Historically, this method does not support force_refresh behavior. " diff --git a/msal/authority.py b/msal/authority.py index faf11603..2823db62 100644 --- a/msal/authority.py +++ b/msal/authority.py @@ -67,6 +67,7 @@ def __init__( performed. """ self._http_client = http_client + self._oidc_authority_url = oidc_authority_url if oidc_authority_url: logger.debug("Initializing with OIDC authority: %s", oidc_authority_url) tenant_discovery_endpoint = self._initialize_oidc_authority( @@ -95,6 +96,7 @@ def __init__( raise ValueError(error_message) logger.debug( 'openid_config("%s") = %s', tenant_discovery_endpoint, openid_config) + self._issuer = openid_config.get('issuer') self.authorization_endpoint = openid_config['authorization_endpoint'] self.token_endpoint = openid_config['token_endpoint'] self.device_authorization_endpoint = openid_config.get('device_authorization_endpoint') @@ -174,11 +176,24 @@ def user_realm_discovery(self, username, correlation_id=None, response=None): self.__class__._domains_without_user_realm_discovery.add(self.instance) return {} # This can guide the caller to fall back normal ROPC flow + def is_valid_issuer(self) -> bool: + if self._oidc_authority_url: + return self._oidc_authority_url == self._issuer + else: + # The non-OIDC cases include: + # those known-to-Microsoft, those known-to-developer, + # those already passed authority validation, or those opted out of authority validation. + # TODO: We plan to remove the OIDC discovery behavior in the near future. + # Then we can simply return True here. + return True + def canonicalize(authority_or_auth_endpoint): # Returns (url_parsed_result, hostname_in_lowercase, tenant) authority = urlparse(authority_or_auth_endpoint) - if authority.scheme == "https": + if ( + authority.scheme == "http" and authority.hostname in ("localhost", "127.0.0.1") + ) or authority.scheme == "https": parts = authority.path.split("/") first_part = parts[1] if len(parts) >= 2 and parts[1] else None if authority.hostname.endswith(_CIAM_DOMAIN_SUFFIX): # CIAM