6 changed files with 275 additions and 1 deletions
@ -1,4 +1,10 @@ |
|||||
from .api_key import APIKeyQuery, APIKeyHeader, APIKeyCookie |
from .api_key import APIKeyQuery, APIKeyHeader, APIKeyCookie |
||||
from .http import HTTPBasic, HTTPBearer, HTTPDigest |
from .http import ( |
||||
|
HTTPBasic, |
||||
|
HTTPBearer, |
||||
|
HTTPDigest, |
||||
|
HTTPBasicCredentials, |
||||
|
HTTPAuthorizationCredentials, |
||||
|
) |
||||
from .oauth2 import OAuth2PasswordRequestForm, OAuth2, OAuth2PasswordBearer |
from .oauth2 import OAuth2PasswordRequestForm, OAuth2, OAuth2PasswordBearer |
||||
from .open_id_connect_url import OpenIdConnect |
from .open_id_connect_url import OpenIdConnect |
||||
|
@ -0,0 +1,56 @@ |
|||||
|
from fastapi import FastAPI, Security |
||||
|
from fastapi.security.http import HTTPAuthorizationCredentials, HTTPBase |
||||
|
from starlette.testclient import TestClient |
||||
|
|
||||
|
app = FastAPI() |
||||
|
|
||||
|
security = HTTPBase(scheme="Other") |
||||
|
|
||||
|
|
||||
|
@app.get("/users/me") |
||||
|
def read_current_user(credentials: HTTPAuthorizationCredentials = Security(security)): |
||||
|
return {"scheme": credentials.scheme, "credentials": credentials.credentials} |
||||
|
|
||||
|
|
||||
|
client = TestClient(app) |
||||
|
|
||||
|
openapi_schema = { |
||||
|
"openapi": "3.0.2", |
||||
|
"info": {"title": "Fast API", "version": "0.1.0"}, |
||||
|
"paths": { |
||||
|
"/users/me": { |
||||
|
"get": { |
||||
|
"responses": { |
||||
|
"200": { |
||||
|
"description": "Successful Response", |
||||
|
"content": {"application/json": {"schema": {}}}, |
||||
|
} |
||||
|
}, |
||||
|
"summary": "Read Current User Get", |
||||
|
"operationId": "read_current_user_users_me_get", |
||||
|
"security": [{"HTTPBase": []}], |
||||
|
} |
||||
|
} |
||||
|
}, |
||||
|
"components": { |
||||
|
"securitySchemes": {"HTTPBase": {"type": "http", "scheme": "Other"}} |
||||
|
}, |
||||
|
} |
||||
|
|
||||
|
|
||||
|
def test_openapi_schema(): |
||||
|
response = client.get("/openapi.json") |
||||
|
assert response.status_code == 200 |
||||
|
assert response.json() == openapi_schema |
||||
|
|
||||
|
|
||||
|
def test_security_http_base(): |
||||
|
response = client.get("/users/me", headers={"Authorization": "Other foobar"}) |
||||
|
assert response.status_code == 200 |
||||
|
assert response.json() == {"scheme": "Other", "credentials": "foobar"} |
||||
|
|
||||
|
|
||||
|
def test_security_http_base_no_credentials(): |
||||
|
response = client.get("/users/me") |
||||
|
assert response.status_code == 403 |
||||
|
assert response.json() == {"detail": "Not authenticated"} |
@ -0,0 +1,76 @@ |
|||||
|
from base64 import b64encode |
||||
|
|
||||
|
from fastapi import FastAPI, Security |
||||
|
from fastapi.security import HTTPBasic, HTTPBasicCredentials |
||||
|
from requests.auth import HTTPBasicAuth |
||||
|
from starlette.testclient import TestClient |
||||
|
|
||||
|
app = FastAPI() |
||||
|
|
||||
|
security = HTTPBasic() |
||||
|
|
||||
|
|
||||
|
@app.get("/users/me") |
||||
|
def read_current_user(credentials: HTTPBasicCredentials = Security(security)): |
||||
|
return {"username": credentials.username, "password": credentials.password} |
||||
|
|
||||
|
|
||||
|
client = TestClient(app) |
||||
|
|
||||
|
openapi_schema = { |
||||
|
"openapi": "3.0.2", |
||||
|
"info": {"title": "Fast API", "version": "0.1.0"}, |
||||
|
"paths": { |
||||
|
"/users/me": { |
||||
|
"get": { |
||||
|
"responses": { |
||||
|
"200": { |
||||
|
"description": "Successful Response", |
||||
|
"content": {"application/json": {"schema": {}}}, |
||||
|
} |
||||
|
}, |
||||
|
"summary": "Read Current User Get", |
||||
|
"operationId": "read_current_user_users_me_get", |
||||
|
"security": [{"HTTPBasic": []}], |
||||
|
} |
||||
|
} |
||||
|
}, |
||||
|
"components": { |
||||
|
"securitySchemes": {"HTTPBasic": {"type": "http", "scheme": "basic"}} |
||||
|
}, |
||||
|
} |
||||
|
|
||||
|
|
||||
|
def test_openapi_schema(): |
||||
|
response = client.get("/openapi.json") |
||||
|
assert response.status_code == 200 |
||||
|
assert response.json() == openapi_schema |
||||
|
|
||||
|
|
||||
|
def test_security_http_basic(): |
||||
|
auth = HTTPBasicAuth(username="john", password="secret") |
||||
|
response = client.get("/users/me", auth=auth) |
||||
|
assert response.status_code == 200 |
||||
|
assert response.json() == {"username": "john", "password": "secret"} |
||||
|
|
||||
|
|
||||
|
def test_security_http_basic_no_credentials(): |
||||
|
response = client.get("/users/me") |
||||
|
assert response.status_code == 403 |
||||
|
assert response.json() == {"detail": "Not authenticated"} |
||||
|
|
||||
|
|
||||
|
def test_security_http_basic_invalid_credentials(): |
||||
|
response = client.get( |
||||
|
"/users/me", headers={"Authorization": "Basic notabase64token"} |
||||
|
) |
||||
|
assert response.status_code == 403 |
||||
|
assert response.json() == {"detail": "Invalid authentication credentials"} |
||||
|
|
||||
|
|
||||
|
def test_security_http_basic_non_basic_credentials(): |
||||
|
payload = b64encode(b"johnsecret").decode("ascii") |
||||
|
auth_header = f"Basic {payload}" |
||||
|
response = client.get("/users/me", headers={"Authorization": auth_header}) |
||||
|
assert response.status_code == 403 |
||||
|
assert response.json() == {"detail": "Invalid authentication credentials"} |
@ -0,0 +1,62 @@ |
|||||
|
from fastapi import FastAPI, Security |
||||
|
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer |
||||
|
from starlette.testclient import TestClient |
||||
|
|
||||
|
app = FastAPI() |
||||
|
|
||||
|
security = HTTPBearer() |
||||
|
|
||||
|
|
||||
|
@app.get("/users/me") |
||||
|
def read_current_user(credentials: HTTPAuthorizationCredentials = Security(security)): |
||||
|
return {"scheme": credentials.scheme, "credentials": credentials.credentials} |
||||
|
|
||||
|
|
||||
|
client = TestClient(app) |
||||
|
|
||||
|
openapi_schema = { |
||||
|
"openapi": "3.0.2", |
||||
|
"info": {"title": "Fast API", "version": "0.1.0"}, |
||||
|
"paths": { |
||||
|
"/users/me": { |
||||
|
"get": { |
||||
|
"responses": { |
||||
|
"200": { |
||||
|
"description": "Successful Response", |
||||
|
"content": {"application/json": {"schema": {}}}, |
||||
|
} |
||||
|
}, |
||||
|
"summary": "Read Current User Get", |
||||
|
"operationId": "read_current_user_users_me_get", |
||||
|
"security": [{"HTTPBearer": []}], |
||||
|
} |
||||
|
} |
||||
|
}, |
||||
|
"components": { |
||||
|
"securitySchemes": {"HTTPBearer": {"type": "http", "scheme": "bearer"}} |
||||
|
}, |
||||
|
} |
||||
|
|
||||
|
|
||||
|
def test_openapi_schema(): |
||||
|
response = client.get("/openapi.json") |
||||
|
assert response.status_code == 200 |
||||
|
assert response.json() == openapi_schema |
||||
|
|
||||
|
|
||||
|
def test_security_http_bearer(): |
||||
|
response = client.get("/users/me", headers={"Authorization": "Bearer foobar"}) |
||||
|
assert response.status_code == 200 |
||||
|
assert response.json() == {"scheme": "Bearer", "credentials": "foobar"} |
||||
|
|
||||
|
|
||||
|
def test_security_http_bearer_no_credentials(): |
||||
|
response = client.get("/users/me") |
||||
|
assert response.status_code == 403 |
||||
|
assert response.json() == {"detail": "Not authenticated"} |
||||
|
|
||||
|
|
||||
|
def test_security_http_bearer_incorrect_scheme_credentials(): |
||||
|
response = client.get("/users/me", headers={"Authorization": "Basic notreally"}) |
||||
|
assert response.status_code == 403 |
||||
|
assert response.json() == {"detail": "Invalid authentication credentials"} |
@ -0,0 +1,64 @@ |
|||||
|
from fastapi import FastAPI, Security |
||||
|
from fastapi.security import HTTPAuthorizationCredentials, HTTPDigest |
||||
|
from starlette.testclient import TestClient |
||||
|
|
||||
|
app = FastAPI() |
||||
|
|
||||
|
security = HTTPDigest() |
||||
|
|
||||
|
|
||||
|
@app.get("/users/me") |
||||
|
def read_current_user(credentials: HTTPAuthorizationCredentials = Security(security)): |
||||
|
return {"scheme": credentials.scheme, "credentials": credentials.credentials} |
||||
|
|
||||
|
|
||||
|
client = TestClient(app) |
||||
|
|
||||
|
openapi_schema = { |
||||
|
"openapi": "3.0.2", |
||||
|
"info": {"title": "Fast API", "version": "0.1.0"}, |
||||
|
"paths": { |
||||
|
"/users/me": { |
||||
|
"get": { |
||||
|
"responses": { |
||||
|
"200": { |
||||
|
"description": "Successful Response", |
||||
|
"content": {"application/json": {"schema": {}}}, |
||||
|
} |
||||
|
}, |
||||
|
"summary": "Read Current User Get", |
||||
|
"operationId": "read_current_user_users_me_get", |
||||
|
"security": [{"HTTPDigest": []}], |
||||
|
} |
||||
|
} |
||||
|
}, |
||||
|
"components": { |
||||
|
"securitySchemes": {"HTTPDigest": {"type": "http", "scheme": "digest"}} |
||||
|
}, |
||||
|
} |
||||
|
|
||||
|
|
||||
|
def test_openapi_schema(): |
||||
|
response = client.get("/openapi.json") |
||||
|
assert response.status_code == 200 |
||||
|
assert response.json() == openapi_schema |
||||
|
|
||||
|
|
||||
|
def test_security_http_digest(): |
||||
|
response = client.get("/users/me", headers={"Authorization": "Digest foobar"}) |
||||
|
assert response.status_code == 200 |
||||
|
assert response.json() == {"scheme": "Digest", "credentials": "foobar"} |
||||
|
|
||||
|
|
||||
|
def test_security_http_digest_no_credentials(): |
||||
|
response = client.get("/users/me") |
||||
|
assert response.status_code == 403 |
||||
|
assert response.json() == {"detail": "Not authenticated"} |
||||
|
|
||||
|
|
||||
|
def test_security_http_digest_incorrect_scheme_credentials(): |
||||
|
response = client.get( |
||||
|
"/users/me", headers={"Authorization": "Other invalidauthorization"} |
||||
|
) |
||||
|
assert response.status_code == 403 |
||||
|
assert response.json() == {"detail": "Invalid authentication credentials"} |
Loading…
Reference in new issue