Browse Source

Implement OAuth2 authorization_code integration (#797)

pull/856/head
Jesse P. Johnson 5 years ago
committed by Sebastián Ramírez
parent
commit
91fe90e8e6
  1. 1
      fastapi/security/__init__.py
  2. 37
      fastapi/security/oauth2.py
  3. 77
      tests/test_security_oauth2_authorization_code_bearer.py

1
fastapi/security/__init__.py

@ -8,6 +8,7 @@ from .http import (
)
from .oauth2 import (
OAuth2,
OAuth2AuthorizationCodeBearer,
OAuth2PasswordBearer,
OAuth2PasswordRequestForm,
SecurityScopes,

37
fastapi/security/oauth2.py

@ -163,6 +163,43 @@ class OAuth2PasswordBearer(OAuth2):
return param
class OAuth2AuthorizationCodeBearer(OAuth2):
def __init__(
self,
authorizationUrl: str,
tokenUrl: str,
refreshUrl: str = None,
scheme_name: str = None,
scopes: dict = None,
auto_error: bool = True,
):
if not scopes:
scopes = {}
flows = OAuthFlowsModel(
authorizationCode={
"authorizationUrl": authorizationUrl,
"tokenUrl": tokenUrl,
"refreshUrl": refreshUrl,
"scopes": scopes,
}
)
super().__init__(flows=flows, scheme_name=scheme_name, auto_error=auto_error)
async def __call__(self, request: Request) -> Optional[str]:
authorization: str = request.headers.get("Authorization")
scheme, param = get_authorization_scheme_param(authorization)
if not authorization or scheme.lower() != "bearer":
if self.auto_error:
raise HTTPException(
status_code=HTTP_401_UNAUTHORIZED,
detail="Not authenticated",
headers={"WWW-Authenticate": "Bearer"},
)
else:
return None # pragma: nocover
return param
class SecurityScopes:
def __init__(self, scopes: List[str] = None):
self.scopes = scopes or []

77
tests/test_security_oauth2_authorization_code_bearer.py

@ -0,0 +1,77 @@
from typing import Optional
from fastapi import FastAPI, Security
from fastapi.security import OAuth2AuthorizationCodeBearer
from starlette.testclient import TestClient
app = FastAPI()
oauth2_scheme = OAuth2AuthorizationCodeBearer(
authorizationUrl="/authorize", tokenUrl="/token", auto_error=True
)
@app.get("/items/")
async def read_items(token: Optional[str] = Security(oauth2_scheme)):
return {"token": token}
client = TestClient(app)
openapi_schema = {
"openapi": "3.0.2",
"info": {"title": "Fast API", "version": "0.1.0"},
"paths": {
"/items/": {
"get": {
"responses": {
"200": {
"description": "Successful Response",
"content": {"application/json": {"schema": {}}},
}
},
"summary": "Read Items",
"operationId": "read_items_items__get",
"security": [{"OAuth2AuthorizationCodeBearer": []}],
}
}
},
"components": {
"securitySchemes": {
"OAuth2AuthorizationCodeBearer": {
"type": "oauth2",
"flows": {
"authorizationCode": {
"authorizationUrl": "/authorize",
"tokenUrl": "/token",
"scopes": {},
}
},
}
}
},
}
def test_openapi_schema():
response = client.get("/openapi.json")
assert response.status_code == 200
assert response.json() == openapi_schema
def test_no_token():
response = client.get("/items")
assert response.status_code == 401
assert response.json() == {"detail": "Not authenticated"}
def test_incorrect_token():
response = client.get("/items", headers={"Authorization": "Non-existent testtoken"})
assert response.status_code == 401
assert response.json() == {"detail": "Not authenticated"}
def test_token():
response = client.get("/items", headers={"Authorization": "Bearer testtoken"})
assert response.status_code == 200
assert response.json() == {"token": "testtoken"}
Loading…
Cancel
Save