Skip to content

Add client basic auth and pkce #11

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 13 additions & 18 deletions fastapi_oauth20/clients/feishu.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,36 +2,31 @@
# -*- coding: utf-8 -*-
import httpx

from fastapi_oauth20.errors import GetUserInfoError
from fastapi_oauth20.oauth20 import OAuth20Base

AUTHORIZE_ENDPOINT = 'https://passport.feishu.cn/suite/passport/oauth/authorize'
ACCESS_TOKEN_ENDPOINT = 'https://passport.feishu.cn/suite/passport/oauth/token'
REFRESH_TOKEN_ENDPOINT = AUTHORIZE_ENDPOINT
REVOKE_TOKEN_ENDPOINT = None
DEFAULT_SCOPES = ['contact:user.employee_id:readonly', 'contact:user.base:readonly', 'contact:user.email:readonly']
PROFILE_ENDPOINT = 'https://passport.feishu.cn/suite/passport/oauth/userinfo'


class FeiShuOAuth20(OAuth20Base):
def __init__(self, client_id: str, client_secret: str):
super().__init__(
client_id=client_id,
client_secret=client_secret,
authorize_endpoint=AUTHORIZE_ENDPOINT,
access_token_endpoint=ACCESS_TOKEN_ENDPOINT,
refresh_token_endpoint=REFRESH_TOKEN_ENDPOINT,
revoke_token_endpoint=REVOKE_TOKEN_ENDPOINT,
authorize_endpoint='https://passport.feishu.cn/suite/passport/oauth/authorize',
access_token_endpoint='https://passport.feishu.cn/suite/passport/oauth/token',
refresh_token_endpoint='https://passport.feishu.cn/suite/passport/oauth/authorize',
oauth_callback_route_name='feishu',
default_scopes=DEFAULT_SCOPES,
default_scopes=[
'contact:user.employee_id:readonly',
'contact:user.base:readonly',
'contact:user.email:readonly',
],
)

async def get_userinfo(self, access_token: str) -> dict:
"""Get user info from FeiShu"""
headers = {'Authorization': f'Bearer {access_token}'}
async with httpx.AsyncClient() as client:
response = await client.get(PROFILE_ENDPOINT, headers=headers)
await self.raise_httpx_oauth20_errors(response)

res = response.json()

return res
response = await client.get('https://passport.feishu.cn/suite/passport/oauth/userinfo', headers=headers)
self.raise_httpx_oauth20_errors(response)
result = self.get_json_result(response, err_class=GetUserInfoError)
return result
27 changes: 9 additions & 18 deletions fastapi_oauth20/clients/gitee.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,36 +2,27 @@
# -*- coding: utf-8 -*-
import httpx

from fastapi_oauth20.errors import GetUserInfoError
from fastapi_oauth20.oauth20 import OAuth20Base

AUTHORIZE_ENDPOINT = 'https://gitee.com/oauth/authorize'
ACCESS_TOKEN_ENDPOINT = 'https://gitee.com/oauth/token'
REFRESH_TOKEN_ENDPOINT = ACCESS_TOKEN_ENDPOINT
REVOKE_TOKEN_ENDPOINT = None
DEFAULT_SCOPES = ['user_info']
PROFILE_ENDPOINT = 'https://gitee.com/api/v5/user'


class GiteeOAuth20(OAuth20Base):
def __init__(self, client_id: str, client_secret: str):
super().__init__(
client_id=client_id,
client_secret=client_secret,
authorize_endpoint=AUTHORIZE_ENDPOINT,
access_token_endpoint=ACCESS_TOKEN_ENDPOINT,
refresh_token_endpoint=REFRESH_TOKEN_ENDPOINT,
revoke_token_endpoint=REVOKE_TOKEN_ENDPOINT,
authorize_endpoint='https://gitee.com/oauth/authorize',
access_token_endpoint='https://gitee.com/oauth/token',
refresh_token_endpoint='https://gitee.com/oauth/token',
oauth_callback_route_name='gitee',
default_scopes=DEFAULT_SCOPES,
default_scopes=['user_info'],
)

async def get_userinfo(self, access_token: str) -> dict:
"""Get user info from Gitee"""
headers = {'Authorization': f'Bearer {access_token}'}
async with httpx.AsyncClient() as client:
response = await client.get(PROFILE_ENDPOINT, headers=headers)
await self.raise_httpx_oauth20_errors(response)

res = response.json()

return res
response = await client.get('https://gitee.com/api/v5/user', headers=headers)
self.raise_httpx_oauth20_errors(response)
result = self.get_json_result(response, err_class=GetUserInfoError)
return result
37 changes: 13 additions & 24 deletions fastapi_oauth20/clients/github.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,46 +2,35 @@
# -*- coding: utf-8 -*-
import httpx

from fastapi_oauth20.errors import GetUserInfoError
from fastapi_oauth20.oauth20 import OAuth20Base

AUTHORIZE_ENDPOINT = 'https://github.com/login/oauth/authorize'
ACCESS_TOKEN_ENDPOINT = 'https://github.com/login/oauth/access_token'
REFRESH_TOKEN_ENDPOINT = None
REVOKE_TOKEN_ENDPOINT = None
DEFAULT_SCOPES = ['user', 'user:email']
PROFILE_ENDPOINT = 'https://api.github.com/user'


class GitHubOAuth20(OAuth20Base):
def __init__(self, client_id: str, client_secret: str):
super().__init__(
client_id=client_id,
client_secret=client_secret,
authorize_endpoint=AUTHORIZE_ENDPOINT,
access_token_endpoint=ACCESS_TOKEN_ENDPOINT,
refresh_token_endpoint=REFRESH_TOKEN_ENDPOINT,
revoke_token_endpoint=REVOKE_TOKEN_ENDPOINT,
authorize_endpoint='https://github.com/login/oauth/authorize',
access_token_endpoint='https://github.com/login/oauth/access_token',
oauth_callback_route_name='github',
default_scopes=DEFAULT_SCOPES,
default_scopes=['user', 'user:email'],
)

async def get_userinfo(self, access_token: str) -> dict:
"""Get user info from GitHub"""
headers = {'Authorization': f'Bearer {access_token}'}
async with httpx.AsyncClient(headers=headers) as client:
response = await client.get(PROFILE_ENDPOINT)
await self.raise_httpx_oauth20_errors(response)

res = response.json()
response = await client.get('https://api.github.com/user')
self.raise_httpx_oauth20_errors(response)
result = self.get_json_result(response, err_class=GetUserInfoError)

email = res.get('email')
email = result.get('email')
if email is None:
response = await client.get(f'{PROFILE_ENDPOINT}/emails')
await self.raise_httpx_oauth20_errors(response)

emails = response.json()

response = await client.get('https://api.github.com/user/emails')
self.raise_httpx_oauth20_errors(response)
emails = self.get_json_result(response, err_class=GetUserInfoError)
email = next((email['email'] for email in emails if email.get('primary')), emails[0]['email'])
res['email'] = email
result['email'] = email

return res
return result
28 changes: 10 additions & 18 deletions fastapi_oauth20/clients/google.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,36 +2,28 @@
# -*- coding: utf-8 -*-
import httpx

from fastapi_oauth20.errors import GetUserInfoError
from fastapi_oauth20.oauth20 import OAuth20Base

AUTHORIZE_ENDPOINT = 'https://accounts.google.com/o/oauth2/v2/auth'
ACCESS_TOKEN_ENDPOINT = 'https://oauth2.googleapis.com/token'
REFRESH_TOKEN_ENDPOINT = ACCESS_TOKEN_ENDPOINT
REVOKE_TOKEN_ENDPOINT = 'https://accounts.google.com/o/oauth2/revoke'
DEFAULT_SCOPES = ['email', 'openid', 'profile']
PROFILE_ENDPOINT = 'https://www.googleapis.com/oauth2/v1/userinfo'


class GoogleOAuth20(OAuth20Base):
def __init__(self, client_id: str, client_secret: str):
super().__init__(
client_id=client_id,
client_secret=client_secret,
authorize_endpoint=AUTHORIZE_ENDPOINT,
access_token_endpoint=ACCESS_TOKEN_ENDPOINT,
refresh_token_endpoint=REFRESH_TOKEN_ENDPOINT,
revoke_token_endpoint=REVOKE_TOKEN_ENDPOINT,
authorize_endpoint='https://accounts.google.com/o/oauth2/v2/auth',
access_token_endpoint='https://oauth2.googleapis.com/token',
refresh_token_endpoint='https://oauth2.googleapis.com/token',
revoke_token_endpoint='https://accounts.google.com/o/oauth2/revoke',
oauth_callback_route_name='google',
default_scopes=DEFAULT_SCOPES,
default_scopes=['email', 'openid', 'profile'],
)

async def get_userinfo(self, access_token: str) -> dict:
"""Get user info from Google"""
headers = {'Authorization': f'Bearer {access_token}'}
async with httpx.AsyncClient() as client:
response = await client.get(PROFILE_ENDPOINT, headers=headers)
await self.raise_httpx_oauth20_errors(response)

res = response.json()

return res
response = await client.get('https://www.googleapis.com/oauth2/v1/userinfo', headers=headers)
self.raise_httpx_oauth20_errors(response)
result = self.get_json_result(response, err_class=GetUserInfoError)
return result
51 changes: 9 additions & 42 deletions fastapi_oauth20/clients/linuxdo.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,60 +2,27 @@
# -*- coding: utf-8 -*-
import httpx

from fastapi_oauth20.errors import GetUserInfoError
from fastapi_oauth20.oauth20 import OAuth20Base

AUTHORIZE_ENDPOINT = 'https://connect.linux.do/oauth2/authorize'
ACCESS_TOKEN_ENDPOINT = 'https://connect.linux.do/oauth2/token'
REFRESH_TOKEN_ENDPOINT = ACCESS_TOKEN_ENDPOINT
REVOKE_TOKEN_ENDPOINT = None
DEFAULT_SCOPES = None
PROFILE_ENDPOINT = 'https://connect.linux.do/api/user'


class LinuxDoOAuth20(OAuth20Base):
def __init__(self, client_id: str, client_secret: str):
super().__init__(
client_id=client_id,
client_secret=client_secret,
authorize_endpoint=AUTHORIZE_ENDPOINT,
access_token_endpoint=ACCESS_TOKEN_ENDPOINT,
refresh_token_endpoint=REFRESH_TOKEN_ENDPOINT,
revoke_token_endpoint=REVOKE_TOKEN_ENDPOINT,
authorize_endpoint='https://connect.linux.do/oauth2/authorize',
access_token_endpoint='https://connect.linux.do/oauth2/token',
refresh_token_endpoint='https://connect.linux.do/oauth2/token',
oauth_callback_route_name='linuxdo',
default_scopes=DEFAULT_SCOPES,
token_endpoint_basic_auth=True,
)

async def get_access_token(self, code: str, redirect_uri: str, code_verifier: str | None = None) -> dict:
"""Obtain the token based on the Linux do authorization method"""
data = {
'code': code,
'redirect_uri': redirect_uri,
'grant_type': 'authorization_code',
}

auth = httpx.BasicAuth(self.client_id, self.client_secret)

if code_verifier:
data.update({'code_verifier': code_verifier})
async with httpx.AsyncClient(auth=auth) as client:
response = await client.post(
self.access_token_endpoint,
data=data,
headers=self.request_headers,
)
await self.raise_httpx_oauth20_errors(response)

res = response.json()

return res

async def get_userinfo(self, access_token: str) -> dict:
"""Get user info from Linux Do"""
headers = {'Authorization': f'Bearer {access_token}'}
async with httpx.AsyncClient() as client:
response = await client.get(PROFILE_ENDPOINT, headers=headers)
await self.raise_httpx_oauth20_errors(response)

res = response.json()

return res
response = await client.get('https://connect.linux.do/api/user', headers=headers)
self.raise_httpx_oauth20_errors(response)
result = self.get_json_result(response, err_class=GetUserInfoError)
return result
26 changes: 8 additions & 18 deletions fastapi_oauth20/clients/oschina.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,36 +2,26 @@
# -*- coding: utf-8 -*-
import httpx

from fastapi_oauth20.errors import GetUserInfoError
from fastapi_oauth20.oauth20 import OAuth20Base

AUTHORIZE_ENDPOINT = 'https://www.oschina.net/action/oauth2/authorize'
ACCESS_TOKEN_ENDPOINT = 'https://www.oschina.net/action/openapi/token'
REFRESH_TOKEN_ENDPOINT = ACCESS_TOKEN_ENDPOINT
REVOKE_TOKEN_ENDPOINT = None
DEFAULT_SCOPES = None
PROFILE_ENDPOINT = 'https://www.oschina.net/action/openapi/user'


class OSChinaOAuth20(OAuth20Base):
def __init__(self, client_id: str, client_secret: str):
super().__init__(
client_id=client_id,
client_secret=client_secret,
authorize_endpoint=AUTHORIZE_ENDPOINT,
access_token_endpoint=ACCESS_TOKEN_ENDPOINT,
refresh_token_endpoint=REFRESH_TOKEN_ENDPOINT,
revoke_token_endpoint=REVOKE_TOKEN_ENDPOINT,
authorize_endpoint='https://www.oschina.net/action/oauth2/authorize',
access_token_endpoint='https://www.oschina.net/action/openapi/token',
refresh_token_endpoint='https://www.oschina.net/action/openapi/token',
oauth_callback_route_name='oschina',
default_scopes=DEFAULT_SCOPES,
)

async def get_userinfo(self, access_token: str) -> dict:
"""Get user info from OSChina"""
headers = {'Authorization': f'Bearer {access_token}'}
async with httpx.AsyncClient() as client:
response = await client.get(PROFILE_ENDPOINT, headers=headers)
await self.raise_httpx_oauth20_errors(response)

res = response.json()

return res
response = await client.get('https://www.oschina.net/action/openapi/user', headers=headers)
self.raise_httpx_oauth20_errors(response)
result = self.get_json_result(response, err_class=GetUserInfoError)
return result
41 changes: 34 additions & 7 deletions fastapi_oauth20/errors.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,57 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import httpx


class FastAPIOAuth20BaseError(Exception):
pass
"""The fastapi-oauth20 base error."""

msg: str

def __init__(self, msg: str) -> None:
self.msg = msg
super().__init__(msg)


class OAuth20RequestError(FastAPIOAuth20BaseError):
"""OAuth2 httpx request error"""

class HTTPXOAuth20Error(FastAPIOAuth20BaseError):
def __init__(self, msg: str, response: httpx.Response | None = None) -> None:
self.response = response
super().__init__(msg)


class HTTPXOAuth20Error(OAuth20RequestError):
"""OAuth2 error for httpx raise for status"""

pass


class RefreshTokenError(FastAPIOAuth20BaseError):
"""Refresh token error if the refresh endpoint is missing"""
class AccessTokenError(OAuth20RequestError):
"""Error raised when get access token fail."""

pass


class RefreshTokenError(OAuth20RequestError):
"""Refresh token error when refresh token fail."""

pass


class RevokeTokenError(OAuth20RequestError):
"""Revoke token error when revoke token fail."""

pass


class RevokeTokenError(FastAPIOAuth20BaseError):
"""Revoke token error if the revoke endpoint is missing"""
class GetUserInfoError(OAuth20RequestError):
"""Get user info error when get user info fail."""

pass


class RedirectURIError(FastAPIOAuth20BaseError):
class RedirectURIError(OAuth20RequestError):
"""Redirect URI set error"""

pass
Loading