Browse Source
Co-authored-by: Hylke Postma <[email protected]> Co-authored-by: Sebastián Ramírez <[email protected]>pull/3628/head
committed by
GitHub
15 changed files with 1062 additions and 16 deletions
@ -0,0 +1,73 @@ |
|||
from fastapi import Depends, FastAPI, Security |
|||
from fastapi.security import APIKeyCookie |
|||
from fastapi.testclient import TestClient |
|||
from pydantic import BaseModel |
|||
|
|||
app = FastAPI() |
|||
|
|||
api_key = APIKeyCookie(name="key", description="An API Cookie Key") |
|||
|
|||
|
|||
class User(BaseModel): |
|||
username: str |
|||
|
|||
|
|||
def get_current_user(oauth_header: str = Security(api_key)): |
|||
user = User(username=oauth_header) |
|||
return user |
|||
|
|||
|
|||
@app.get("/users/me") |
|||
def read_current_user(current_user: User = Depends(get_current_user)): |
|||
return current_user |
|||
|
|||
|
|||
client = TestClient(app) |
|||
|
|||
openapi_schema = { |
|||
"openapi": "3.0.2", |
|||
"info": {"title": "FastAPI", "version": "0.1.0"}, |
|||
"paths": { |
|||
"/users/me": { |
|||
"get": { |
|||
"responses": { |
|||
"200": { |
|||
"description": "Successful Response", |
|||
"content": {"application/json": {"schema": {}}}, |
|||
} |
|||
}, |
|||
"summary": "Read Current User", |
|||
"operationId": "read_current_user_users_me_get", |
|||
"security": [{"APIKeyCookie": []}], |
|||
} |
|||
} |
|||
}, |
|||
"components": { |
|||
"securitySchemes": { |
|||
"APIKeyCookie": { |
|||
"type": "apiKey", |
|||
"name": "key", |
|||
"in": "cookie", |
|||
"description": "An API Cookie Key", |
|||
} |
|||
} |
|||
}, |
|||
} |
|||
|
|||
|
|||
def test_openapi_schema(): |
|||
response = client.get("/openapi.json") |
|||
assert response.status_code == 200, response.text |
|||
assert response.json() == openapi_schema |
|||
|
|||
|
|||
def test_security_api_key(): |
|||
response = client.get("/users/me", cookies={"key": "secret"}) |
|||
assert response.status_code == 200, response.text |
|||
assert response.json() == {"username": "secret"} |
|||
|
|||
|
|||
def test_security_api_key_no_key(): |
|||
response = client.get("/users/me") |
|||
assert response.status_code == 403, response.text |
|||
assert response.json() == {"detail": "Not authenticated"} |
@ -0,0 +1,73 @@ |
|||
from fastapi import Depends, FastAPI, Security |
|||
from fastapi.security import APIKeyHeader |
|||
from fastapi.testclient import TestClient |
|||
from pydantic import BaseModel |
|||
|
|||
app = FastAPI() |
|||
|
|||
api_key = APIKeyHeader(name="key", description="An API Key Header") |
|||
|
|||
|
|||
class User(BaseModel): |
|||
username: str |
|||
|
|||
|
|||
def get_current_user(oauth_header: str = Security(api_key)): |
|||
user = User(username=oauth_header) |
|||
return user |
|||
|
|||
|
|||
@app.get("/users/me") |
|||
def read_current_user(current_user: User = Depends(get_current_user)): |
|||
return current_user |
|||
|
|||
|
|||
client = TestClient(app) |
|||
|
|||
openapi_schema = { |
|||
"openapi": "3.0.2", |
|||
"info": {"title": "FastAPI", "version": "0.1.0"}, |
|||
"paths": { |
|||
"/users/me": { |
|||
"get": { |
|||
"responses": { |
|||
"200": { |
|||
"description": "Successful Response", |
|||
"content": {"application/json": {"schema": {}}}, |
|||
} |
|||
}, |
|||
"summary": "Read Current User", |
|||
"operationId": "read_current_user_users_me_get", |
|||
"security": [{"APIKeyHeader": []}], |
|||
} |
|||
} |
|||
}, |
|||
"components": { |
|||
"securitySchemes": { |
|||
"APIKeyHeader": { |
|||
"type": "apiKey", |
|||
"name": "key", |
|||
"in": "header", |
|||
"description": "An API Key Header", |
|||
} |
|||
} |
|||
}, |
|||
} |
|||
|
|||
|
|||
def test_openapi_schema(): |
|||
response = client.get("/openapi.json") |
|||
assert response.status_code == 200, response.text |
|||
assert response.json() == openapi_schema |
|||
|
|||
|
|||
def test_security_api_key(): |
|||
response = client.get("/users/me", headers={"key": "secret"}) |
|||
assert response.status_code == 200, response.text |
|||
assert response.json() == {"username": "secret"} |
|||
|
|||
|
|||
def test_security_api_key_no_key(): |
|||
response = client.get("/users/me") |
|||
assert response.status_code == 403, response.text |
|||
assert response.json() == {"detail": "Not authenticated"} |
@ -0,0 +1,73 @@ |
|||
from fastapi import Depends, FastAPI, Security |
|||
from fastapi.security import APIKeyQuery |
|||
from fastapi.testclient import TestClient |
|||
from pydantic import BaseModel |
|||
|
|||
app = FastAPI() |
|||
|
|||
api_key = APIKeyQuery(name="key", description="API Key Query") |
|||
|
|||
|
|||
class User(BaseModel): |
|||
username: str |
|||
|
|||
|
|||
def get_current_user(oauth_header: str = Security(api_key)): |
|||
user = User(username=oauth_header) |
|||
return user |
|||
|
|||
|
|||
@app.get("/users/me") |
|||
def read_current_user(current_user: User = Depends(get_current_user)): |
|||
return current_user |
|||
|
|||
|
|||
client = TestClient(app) |
|||
|
|||
openapi_schema = { |
|||
"openapi": "3.0.2", |
|||
"info": {"title": "FastAPI", "version": "0.1.0"}, |
|||
"paths": { |
|||
"/users/me": { |
|||
"get": { |
|||
"responses": { |
|||
"200": { |
|||
"description": "Successful Response", |
|||
"content": {"application/json": {"schema": {}}}, |
|||
} |
|||
}, |
|||
"summary": "Read Current User", |
|||
"operationId": "read_current_user_users_me_get", |
|||
"security": [{"APIKeyQuery": []}], |
|||
} |
|||
} |
|||
}, |
|||
"components": { |
|||
"securitySchemes": { |
|||
"APIKeyQuery": { |
|||
"type": "apiKey", |
|||
"name": "key", |
|||
"in": "query", |
|||
"description": "API Key Query", |
|||
} |
|||
} |
|||
}, |
|||
} |
|||
|
|||
|
|||
def test_openapi_schema(): |
|||
response = client.get("/openapi.json") |
|||
assert response.status_code == 200, response.text |
|||
assert response.json() == openapi_schema |
|||
|
|||
|
|||
def test_security_api_key(): |
|||
response = client.get("/users/me?key=secret") |
|||
assert response.status_code == 200, response.text |
|||
assert response.json() == {"username": "secret"} |
|||
|
|||
|
|||
def test_security_api_key_no_key(): |
|||
response = client.get("/users/me") |
|||
assert response.status_code == 403, response.text |
|||
assert response.json() == {"detail": "Not authenticated"} |
@ -0,0 +1,62 @@ |
|||
from fastapi import FastAPI, Security |
|||
from fastapi.security.http import HTTPAuthorizationCredentials, HTTPBase |
|||
from fastapi.testclient import TestClient |
|||
|
|||
app = FastAPI() |
|||
|
|||
security = HTTPBase(scheme="Other", description="Other Security Scheme") |
|||
|
|||
|
|||
@app.get("/users/me") |
|||
def read_current_user(credentials: HTTPAuthorizationCredentials = Security(security)): |
|||
return {"scheme": credentials.scheme, "credentials": credentials.credentials} |
|||
|
|||
|
|||
client = TestClient(app) |
|||
|
|||
openapi_schema = { |
|||
"openapi": "3.0.2", |
|||
"info": {"title": "FastAPI", "version": "0.1.0"}, |
|||
"paths": { |
|||
"/users/me": { |
|||
"get": { |
|||
"responses": { |
|||
"200": { |
|||
"description": "Successful Response", |
|||
"content": {"application/json": {"schema": {}}}, |
|||
} |
|||
}, |
|||
"summary": "Read Current User", |
|||
"operationId": "read_current_user_users_me_get", |
|||
"security": [{"HTTPBase": []}], |
|||
} |
|||
} |
|||
}, |
|||
"components": { |
|||
"securitySchemes": { |
|||
"HTTPBase": { |
|||
"type": "http", |
|||
"scheme": "Other", |
|||
"description": "Other Security Scheme", |
|||
} |
|||
} |
|||
}, |
|||
} |
|||
|
|||
|
|||
def test_openapi_schema(): |
|||
response = client.get("/openapi.json") |
|||
assert response.status_code == 200, response.text |
|||
assert response.json() == openapi_schema |
|||
|
|||
|
|||
def test_security_http_base(): |
|||
response = client.get("/users/me", headers={"Authorization": "Other foobar"}) |
|||
assert response.status_code == 200, response.text |
|||
assert response.json() == {"scheme": "Other", "credentials": "foobar"} |
|||
|
|||
|
|||
def test_security_http_base_no_credentials(): |
|||
response = client.get("/users/me") |
|||
assert response.status_code == 403, response.text |
|||
assert response.json() == {"detail": "Not authenticated"} |
@ -0,0 +1,85 @@ |
|||
from base64 import b64encode |
|||
|
|||
from fastapi import FastAPI, Security |
|||
from fastapi.security import HTTPBasic, HTTPBasicCredentials |
|||
from fastapi.testclient import TestClient |
|||
from requests.auth import HTTPBasicAuth |
|||
|
|||
app = FastAPI() |
|||
|
|||
security = HTTPBasic(realm="simple", description="HTTPBasic scheme") |
|||
|
|||
|
|||
@app.get("/users/me") |
|||
def read_current_user(credentials: HTTPBasicCredentials = Security(security)): |
|||
return {"username": credentials.username, "password": credentials.password} |
|||
|
|||
|
|||
client = TestClient(app) |
|||
|
|||
openapi_schema = { |
|||
"openapi": "3.0.2", |
|||
"info": {"title": "FastAPI", "version": "0.1.0"}, |
|||
"paths": { |
|||
"/users/me": { |
|||
"get": { |
|||
"responses": { |
|||
"200": { |
|||
"description": "Successful Response", |
|||
"content": {"application/json": {"schema": {}}}, |
|||
} |
|||
}, |
|||
"summary": "Read Current User", |
|||
"operationId": "read_current_user_users_me_get", |
|||
"security": [{"HTTPBasic": []}], |
|||
} |
|||
} |
|||
}, |
|||
"components": { |
|||
"securitySchemes": { |
|||
"HTTPBasic": { |
|||
"type": "http", |
|||
"scheme": "basic", |
|||
"description": "HTTPBasic scheme", |
|||
} |
|||
} |
|||
}, |
|||
} |
|||
|
|||
|
|||
def test_openapi_schema(): |
|||
response = client.get("/openapi.json") |
|||
assert response.status_code == 200, response.text |
|||
assert response.json() == openapi_schema |
|||
|
|||
|
|||
def test_security_http_basic(): |
|||
auth = HTTPBasicAuth(username="john", password="secret") |
|||
response = client.get("/users/me", auth=auth) |
|||
assert response.status_code == 200, response.text |
|||
assert response.json() == {"username": "john", "password": "secret"} |
|||
|
|||
|
|||
def test_security_http_basic_no_credentials(): |
|||
response = client.get("/users/me") |
|||
assert response.json() == {"detail": "Not authenticated"} |
|||
assert response.status_code == 401, response.text |
|||
assert response.headers["WWW-Authenticate"] == 'Basic realm="simple"' |
|||
|
|||
|
|||
def test_security_http_basic_invalid_credentials(): |
|||
response = client.get( |
|||
"/users/me", headers={"Authorization": "Basic notabase64token"} |
|||
) |
|||
assert response.status_code == 401, response.text |
|||
assert response.headers["WWW-Authenticate"] == 'Basic realm="simple"' |
|||
assert response.json() == {"detail": "Invalid authentication credentials"} |
|||
|
|||
|
|||
def test_security_http_basic_non_basic_credentials(): |
|||
payload = b64encode(b"johnsecret").decode("ascii") |
|||
auth_header = f"Basic {payload}" |
|||
response = client.get("/users/me", headers={"Authorization": auth_header}) |
|||
assert response.status_code == 401, response.text |
|||
assert response.headers["WWW-Authenticate"] == 'Basic realm="simple"' |
|||
assert response.json() == {"detail": "Invalid authentication credentials"} |
@ -0,0 +1,68 @@ |
|||
from fastapi import FastAPI, Security |
|||
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer |
|||
from fastapi.testclient import TestClient |
|||
|
|||
app = FastAPI() |
|||
|
|||
security = HTTPBearer(description="HTTP Bearer token scheme") |
|||
|
|||
|
|||
@app.get("/users/me") |
|||
def read_current_user(credentials: HTTPAuthorizationCredentials = Security(security)): |
|||
return {"scheme": credentials.scheme, "credentials": credentials.credentials} |
|||
|
|||
|
|||
client = TestClient(app) |
|||
|
|||
openapi_schema = { |
|||
"openapi": "3.0.2", |
|||
"info": {"title": "FastAPI", "version": "0.1.0"}, |
|||
"paths": { |
|||
"/users/me": { |
|||
"get": { |
|||
"responses": { |
|||
"200": { |
|||
"description": "Successful Response", |
|||
"content": {"application/json": {"schema": {}}}, |
|||
} |
|||
}, |
|||
"summary": "Read Current User", |
|||
"operationId": "read_current_user_users_me_get", |
|||
"security": [{"HTTPBearer": []}], |
|||
} |
|||
} |
|||
}, |
|||
"components": { |
|||
"securitySchemes": { |
|||
"HTTPBearer": { |
|||
"type": "http", |
|||
"scheme": "bearer", |
|||
"description": "HTTP Bearer token scheme", |
|||
} |
|||
} |
|||
}, |
|||
} |
|||
|
|||
|
|||
def test_openapi_schema(): |
|||
response = client.get("/openapi.json") |
|||
assert response.status_code == 200, response.text |
|||
assert response.json() == openapi_schema |
|||
|
|||
|
|||
def test_security_http_bearer(): |
|||
response = client.get("/users/me", headers={"Authorization": "Bearer foobar"}) |
|||
assert response.status_code == 200, response.text |
|||
assert response.json() == {"scheme": "Bearer", "credentials": "foobar"} |
|||
|
|||
|
|||
def test_security_http_bearer_no_credentials(): |
|||
response = client.get("/users/me") |
|||
assert response.status_code == 403, response.text |
|||
assert response.json() == {"detail": "Not authenticated"} |
|||
|
|||
|
|||
def test_security_http_bearer_incorrect_scheme_credentials(): |
|||
response = client.get("/users/me", headers={"Authorization": "Basic notreally"}) |
|||
assert response.status_code == 403, response.text |
|||
assert response.json() == {"detail": "Invalid authentication credentials"} |
@ -0,0 +1,70 @@ |
|||
from fastapi import FastAPI, Security |
|||
from fastapi.security import HTTPAuthorizationCredentials, HTTPDigest |
|||
from fastapi.testclient import TestClient |
|||
|
|||
app = FastAPI() |
|||
|
|||
security = HTTPDigest(description="HTTPDigest scheme") |
|||
|
|||
|
|||
@app.get("/users/me") |
|||
def read_current_user(credentials: HTTPAuthorizationCredentials = Security(security)): |
|||
return {"scheme": credentials.scheme, "credentials": credentials.credentials} |
|||
|
|||
|
|||
client = TestClient(app) |
|||
|
|||
openapi_schema = { |
|||
"openapi": "3.0.2", |
|||
"info": {"title": "FastAPI", "version": "0.1.0"}, |
|||
"paths": { |
|||
"/users/me": { |
|||
"get": { |
|||
"responses": { |
|||
"200": { |
|||
"description": "Successful Response", |
|||
"content": {"application/json": {"schema": {}}}, |
|||
} |
|||
}, |
|||
"summary": "Read Current User", |
|||
"operationId": "read_current_user_users_me_get", |
|||
"security": [{"HTTPDigest": []}], |
|||
} |
|||
} |
|||
}, |
|||
"components": { |
|||
"securitySchemes": { |
|||
"HTTPDigest": { |
|||
"type": "http", |
|||
"scheme": "digest", |
|||
"description": "HTTPDigest scheme", |
|||
} |
|||
} |
|||
}, |
|||
} |
|||
|
|||
|
|||
def test_openapi_schema(): |
|||
response = client.get("/openapi.json") |
|||
assert response.status_code == 200, response.text |
|||
assert response.json() == openapi_schema |
|||
|
|||
|
|||
def test_security_http_digest(): |
|||
response = client.get("/users/me", headers={"Authorization": "Digest foobar"}) |
|||
assert response.status_code == 200, response.text |
|||
assert response.json() == {"scheme": "Digest", "credentials": "foobar"} |
|||
|
|||
|
|||
def test_security_http_digest_no_credentials(): |
|||
response = client.get("/users/me") |
|||
assert response.status_code == 403, response.text |
|||
assert response.json() == {"detail": "Not authenticated"} |
|||
|
|||
|
|||
def test_security_http_digest_incorrect_scheme_credentials(): |
|||
response = client.get( |
|||
"/users/me", headers={"Authorization": "Other invalidauthorization"} |
|||
) |
|||
assert response.status_code == 403, response.text |
|||
assert response.json() == {"detail": "Invalid authentication credentials"} |
@ -0,0 +1,81 @@ |
|||
from typing import Optional |
|||
|
|||
from fastapi import FastAPI, Security |
|||
from fastapi.security import OAuth2AuthorizationCodeBearer |
|||
from fastapi.testclient import TestClient |
|||
|
|||
app = FastAPI() |
|||
|
|||
oauth2_scheme = OAuth2AuthorizationCodeBearer( |
|||
authorizationUrl="authorize", |
|||
tokenUrl="token", |
|||
description="OAuth2 Code Bearer", |
|||
auto_error=True, |
|||
) |
|||
|
|||
|
|||
@app.get("/items/") |
|||
async def read_items(token: Optional[str] = Security(oauth2_scheme)): |
|||
return {"token": token} |
|||
|
|||
|
|||
client = TestClient(app) |
|||
|
|||
openapi_schema = { |
|||
"openapi": "3.0.2", |
|||
"info": {"title": "FastAPI", "version": "0.1.0"}, |
|||
"paths": { |
|||
"/items/": { |
|||
"get": { |
|||
"responses": { |
|||
"200": { |
|||
"description": "Successful Response", |
|||
"content": {"application/json": {"schema": {}}}, |
|||
} |
|||
}, |
|||
"summary": "Read Items", |
|||
"operationId": "read_items_items__get", |
|||
"security": [{"OAuth2AuthorizationCodeBearer": []}], |
|||
} |
|||
} |
|||
}, |
|||
"components": { |
|||
"securitySchemes": { |
|||
"OAuth2AuthorizationCodeBearer": { |
|||
"type": "oauth2", |
|||
"flows": { |
|||
"authorizationCode": { |
|||
"authorizationUrl": "authorize", |
|||
"tokenUrl": "token", |
|||
"scopes": {}, |
|||
} |
|||
}, |
|||
"description": "OAuth2 Code Bearer", |
|||
} |
|||
} |
|||
}, |
|||
} |
|||
|
|||
|
|||
def test_openapi_schema(): |
|||
response = client.get("/openapi.json") |
|||
assert response.status_code == 200, response.text |
|||
assert response.json() == openapi_schema |
|||
|
|||
|
|||
def test_no_token(): |
|||
response = client.get("/items") |
|||
assert response.status_code == 401, response.text |
|||
assert response.json() == {"detail": "Not authenticated"} |
|||
|
|||
|
|||
def test_incorrect_token(): |
|||
response = client.get("/items", headers={"Authorization": "Non-existent testtoken"}) |
|||
assert response.status_code == 401, response.text |
|||
assert response.json() == {"detail": "Not authenticated"} |
|||
|
|||
|
|||
def test_token(): |
|||
response = client.get("/items", headers={"Authorization": "Bearer testtoken"}) |
|||
assert response.status_code == 200, response.text |
|||
assert response.json() == {"token": "testtoken"} |
@ -0,0 +1,255 @@ |
|||
from typing import Optional |
|||
|
|||
import pytest |
|||
from fastapi import Depends, FastAPI, Security |
|||
from fastapi.security import OAuth2, OAuth2PasswordRequestFormStrict |
|||
from fastapi.testclient import TestClient |
|||
from pydantic import BaseModel |
|||
|
|||
app = FastAPI() |
|||
|
|||
reusable_oauth2 = OAuth2( |
|||
flows={ |
|||
"password": { |
|||
"tokenUrl": "token", |
|||
"scopes": {"read:users": "Read the users", "write:users": "Create users"}, |
|||
} |
|||
}, |
|||
description="OAuth2 security scheme", |
|||
auto_error=False, |
|||
) |
|||
|
|||
|
|||
class User(BaseModel): |
|||
username: str |
|||
|
|||
|
|||
def get_current_user(oauth_header: Optional[str] = Security(reusable_oauth2)): |
|||
if oauth_header is None: |
|||
return None |
|||
user = User(username=oauth_header) |
|||
return user |
|||
|
|||
|
|||
@app.post("/login") |
|||
def login(form_data: OAuth2PasswordRequestFormStrict = Depends()): |
|||
return form_data |
|||
|
|||
|
|||
@app.get("/users/me") |
|||
def read_users_me(current_user: Optional[User] = Depends(get_current_user)): |
|||
if current_user is None: |
|||
return {"msg": "Create an account first"} |
|||
return current_user |
|||
|
|||
|
|||
client = TestClient(app) |
|||
|
|||
openapi_schema = { |
|||
"openapi": "3.0.2", |
|||
"info": {"title": "FastAPI", "version": "0.1.0"}, |
|||
"paths": { |
|||
"/login": { |
|||
"post": { |
|||
"responses": { |
|||
"200": { |
|||
"description": "Successful Response", |
|||
"content": {"application/json": {"schema": {}}}, |
|||
}, |
|||
"422": { |
|||
"description": "Validation Error", |
|||
"content": { |
|||
"application/json": { |
|||
"schema": { |
|||
"$ref": "#/components/schemas/HTTPValidationError" |
|||
} |
|||
} |
|||
}, |
|||
}, |
|||
}, |
|||
"summary": "Login", |
|||
"operationId": "login_login_post", |
|||
"requestBody": { |
|||
"content": { |
|||
"application/x-www-form-urlencoded": { |
|||
"schema": { |
|||
"$ref": "#/components/schemas/Body_login_login_post" |
|||
} |
|||
} |
|||
}, |
|||
"required": True, |
|||
}, |
|||
} |
|||
}, |
|||
"/users/me": { |
|||
"get": { |
|||
"responses": { |
|||
"200": { |
|||
"description": "Successful Response", |
|||
"content": {"application/json": {"schema": {}}}, |
|||
} |
|||
}, |
|||
"summary": "Read Users Me", |
|||
"operationId": "read_users_me_users_me_get", |
|||
"security": [{"OAuth2": []}], |
|||
} |
|||
}, |
|||
}, |
|||
"components": { |
|||
"schemas": { |
|||
"Body_login_login_post": { |
|||
"title": "Body_login_login_post", |
|||
"required": ["grant_type", "username", "password"], |
|||
"type": "object", |
|||
"properties": { |
|||
"grant_type": { |
|||
"title": "Grant Type", |
|||
"pattern": "password", |
|||
"type": "string", |
|||
}, |
|||
"username": {"title": "Username", "type": "string"}, |
|||
"password": {"title": "Password", "type": "string"}, |
|||
"scope": {"title": "Scope", "type": "string", "default": ""}, |
|||
"client_id": {"title": "Client Id", "type": "string"}, |
|||
"client_secret": {"title": "Client Secret", "type": "string"}, |
|||
}, |
|||
}, |
|||
"ValidationError": { |
|||
"title": "ValidationError", |
|||
"required": ["loc", "msg", "type"], |
|||
"type": "object", |
|||
"properties": { |
|||
"loc": { |
|||
"title": "Location", |
|||
"type": "array", |
|||
"items": {"type": "string"}, |
|||
}, |
|||
"msg": {"title": "Message", "type": "string"}, |
|||
"type": {"title": "Error Type", "type": "string"}, |
|||
}, |
|||
}, |
|||
"HTTPValidationError": { |
|||
"title": "HTTPValidationError", |
|||
"type": "object", |
|||
"properties": { |
|||
"detail": { |
|||
"title": "Detail", |
|||
"type": "array", |
|||
"items": {"$ref": "#/components/schemas/ValidationError"}, |
|||
} |
|||
}, |
|||
}, |
|||
}, |
|||
"securitySchemes": { |
|||
"OAuth2": { |
|||
"type": "oauth2", |
|||
"flows": { |
|||
"password": { |
|||
"scopes": { |
|||
"read:users": "Read the users", |
|||
"write:users": "Create users", |
|||
}, |
|||
"tokenUrl": "token", |
|||
} |
|||
}, |
|||
"description": "OAuth2 security scheme", |
|||
} |
|||
}, |
|||
}, |
|||
} |
|||
|
|||
|
|||
def test_openapi_schema(): |
|||
response = client.get("/openapi.json") |
|||
assert response.status_code == 200, response.text |
|||
assert response.json() == openapi_schema |
|||
|
|||
|
|||
def test_security_oauth2(): |
|||
response = client.get("/users/me", headers={"Authorization": "Bearer footokenbar"}) |
|||
assert response.status_code == 200, response.text |
|||
assert response.json() == {"username": "Bearer footokenbar"} |
|||
|
|||
|
|||
def test_security_oauth2_password_other_header(): |
|||
response = client.get("/users/me", headers={"Authorization": "Other footokenbar"}) |
|||
assert response.status_code == 200, response.text |
|||
assert response.json() == {"username": "Other footokenbar"} |
|||
|
|||
|
|||
def test_security_oauth2_password_bearer_no_header(): |
|||
response = client.get("/users/me") |
|||
assert response.status_code == 200, response.text |
|||
assert response.json() == {"msg": "Create an account first"} |
|||
|
|||
|
|||
required_params = { |
|||
"detail": [ |
|||
{ |
|||
"loc": ["body", "grant_type"], |
|||
"msg": "field required", |
|||
"type": "value_error.missing", |
|||
}, |
|||
{ |
|||
"loc": ["body", "username"], |
|||
"msg": "field required", |
|||
"type": "value_error.missing", |
|||
}, |
|||
{ |
|||
"loc": ["body", "password"], |
|||
"msg": "field required", |
|||
"type": "value_error.missing", |
|||
}, |
|||
] |
|||
} |
|||
|
|||
grant_type_required = { |
|||
"detail": [ |
|||
{ |
|||
"loc": ["body", "grant_type"], |
|||
"msg": "field required", |
|||
"type": "value_error.missing", |
|||
} |
|||
] |
|||
} |
|||
|
|||
grant_type_incorrect = { |
|||
"detail": [ |
|||
{ |
|||
"loc": ["body", "grant_type"], |
|||
"msg": 'string does not match regex "password"', |
|||
"type": "value_error.str.regex", |
|||
"ctx": {"pattern": "password"}, |
|||
} |
|||
] |
|||
} |
|||
|
|||
|
|||
@pytest.mark.parametrize( |
|||
"data,expected_status,expected_response", |
|||
[ |
|||
(None, 422, required_params), |
|||
({"username": "johndoe", "password": "secret"}, 422, grant_type_required), |
|||
( |
|||
{"username": "johndoe", "password": "secret", "grant_type": "incorrect"}, |
|||
422, |
|||
grant_type_incorrect, |
|||
), |
|||
( |
|||
{"username": "johndoe", "password": "secret", "grant_type": "password"}, |
|||
200, |
|||
{ |
|||
"grant_type": "password", |
|||
"username": "johndoe", |
|||
"password": "secret", |
|||
"scopes": [], |
|||
"client_id": None, |
|||
"client_secret": None, |
|||
}, |
|||
), |
|||
], |
|||
) |
|||
def test_strict_login(data, expected_status, expected_response): |
|||
response = client.post("/login", data=data) |
|||
assert response.status_code == expected_status |
|||
assert response.json() == expected_response |
@ -0,0 +1,76 @@ |
|||
from typing import Optional |
|||
|
|||
from fastapi import FastAPI, Security |
|||
from fastapi.security import OAuth2PasswordBearer |
|||
from fastapi.testclient import TestClient |
|||
|
|||
app = FastAPI() |
|||
|
|||
oauth2_scheme = OAuth2PasswordBearer( |
|||
tokenUrl="/token", |
|||
description="OAuth2PasswordBearer security scheme", |
|||
auto_error=False, |
|||
) |
|||
|
|||
|
|||
@app.get("/items/") |
|||
async def read_items(token: Optional[str] = Security(oauth2_scheme)): |
|||
if token is None: |
|||
return {"msg": "Create an account first"} |
|||
return {"token": token} |
|||
|
|||
|
|||
client = TestClient(app) |
|||
|
|||
openapi_schema = { |
|||
"openapi": "3.0.2", |
|||
"info": {"title": "FastAPI", "version": "0.1.0"}, |
|||
"paths": { |
|||
"/items/": { |
|||
"get": { |
|||
"responses": { |
|||
"200": { |
|||
"description": "Successful Response", |
|||
"content": {"application/json": {"schema": {}}}, |
|||
} |
|||
}, |
|||
"summary": "Read Items", |
|||
"operationId": "read_items_items__get", |
|||
"security": [{"OAuth2PasswordBearer": []}], |
|||
} |
|||
} |
|||
}, |
|||
"components": { |
|||
"securitySchemes": { |
|||
"OAuth2PasswordBearer": { |
|||
"type": "oauth2", |
|||
"flows": {"password": {"scopes": {}, "tokenUrl": "/token"}}, |
|||
"description": "OAuth2PasswordBearer security scheme", |
|||
} |
|||
} |
|||
}, |
|||
} |
|||
|
|||
|
|||
def test_openapi_schema(): |
|||
response = client.get("/openapi.json") |
|||
assert response.status_code == 200, response.text |
|||
assert response.json() == openapi_schema |
|||
|
|||
|
|||
def test_no_token(): |
|||
response = client.get("/items") |
|||
assert response.status_code == 200, response.text |
|||
assert response.json() == {"msg": "Create an account first"} |
|||
|
|||
|
|||
def test_token(): |
|||
response = client.get("/items", headers={"Authorization": "Bearer testtoken"}) |
|||
assert response.status_code == 200, response.text |
|||
assert response.json() == {"token": "testtoken"} |
|||
|
|||
|
|||
def test_incorrect_token(): |
|||
response = client.get("/items", headers={"Authorization": "Notexistent testtoken"}) |
|||
assert response.status_code == 200, response.text |
|||
assert response.json() == {"msg": "Create an account first"} |
@ -0,0 +1,80 @@ |
|||
from fastapi import Depends, FastAPI, Security |
|||
from fastapi.security.open_id_connect_url import OpenIdConnect |
|||
from fastapi.testclient import TestClient |
|||
from pydantic import BaseModel |
|||
|
|||
app = FastAPI() |
|||
|
|||
oid = OpenIdConnect( |
|||
openIdConnectUrl="/openid", description="OpenIdConnect security scheme" |
|||
) |
|||
|
|||
|
|||
class User(BaseModel): |
|||
username: str |
|||
|
|||
|
|||
def get_current_user(oauth_header: str = Security(oid)): |
|||
user = User(username=oauth_header) |
|||
return user |
|||
|
|||
|
|||
@app.get("/users/me") |
|||
def read_current_user(current_user: User = Depends(get_current_user)): |
|||
return current_user |
|||
|
|||
|
|||
client = TestClient(app) |
|||
|
|||
openapi_schema = { |
|||
"openapi": "3.0.2", |
|||
"info": {"title": "FastAPI", "version": "0.1.0"}, |
|||
"paths": { |
|||
"/users/me": { |
|||
"get": { |
|||
"responses": { |
|||
"200": { |
|||
"description": "Successful Response", |
|||
"content": {"application/json": {"schema": {}}}, |
|||
} |
|||
}, |
|||
"summary": "Read Current User", |
|||
"operationId": "read_current_user_users_me_get", |
|||
"security": [{"OpenIdConnect": []}], |
|||
} |
|||
} |
|||
}, |
|||
"components": { |
|||
"securitySchemes": { |
|||
"OpenIdConnect": { |
|||
"type": "openIdConnect", |
|||
"openIdConnectUrl": "/openid", |
|||
"description": "OpenIdConnect security scheme", |
|||
} |
|||
} |
|||
}, |
|||
} |
|||
|
|||
|
|||
def test_openapi_schema(): |
|||
response = client.get("/openapi.json") |
|||
assert response.status_code == 200, response.text |
|||
assert response.json() == openapi_schema |
|||
|
|||
|
|||
def test_security_oauth2(): |
|||
response = client.get("/users/me", headers={"Authorization": "Bearer footokenbar"}) |
|||
assert response.status_code == 200, response.text |
|||
assert response.json() == {"username": "Bearer footokenbar"} |
|||
|
|||
|
|||
def test_security_oauth2_password_other_header(): |
|||
response = client.get("/users/me", headers={"Authorization": "Other footokenbar"}) |
|||
assert response.status_code == 200, response.text |
|||
assert response.json() == {"username": "Other footokenbar"} |
|||
|
|||
|
|||
def test_security_oauth2_password_bearer_no_header(): |
|||
response = client.get("/users/me") |
|||
assert response.status_code == 403, response.text |
|||
assert response.json() == {"detail": "Not authenticated"} |
Loading…
Reference in new issue