committed by
GitHub
14 changed files with 470 additions and 15 deletions
@ -0,0 +1,42 @@ |
|||
import pytest |
|||
from fastapi import Depends, FastAPI, Security |
|||
from fastapi.security import APIKeyCookie |
|||
from fastapi.testclient import TestClient |
|||
from pydantic import BaseModel |
|||
from starlette.testclient import WebSocketDenialResponse |
|||
from starlette.websockets import WebSocket |
|||
|
|||
app = FastAPI() |
|||
|
|||
api_key = APIKeyCookie(name="key") |
|||
|
|||
|
|||
class User(BaseModel): |
|||
username: str |
|||
|
|||
|
|||
def get_current_user(oauth_header: str = Security(api_key)): |
|||
user = User(username=oauth_header) |
|||
return user |
|||
|
|||
|
|||
@app.websocket("/ws/users/me") |
|||
async def read_current_user( |
|||
websocket: WebSocket, current_user: User = Depends(get_current_user) |
|||
): |
|||
await websocket.accept() |
|||
await websocket.send_text(current_user.username) |
|||
|
|||
|
|||
def test_security_api_key_ws(): |
|||
client = TestClient(app, cookies={"key": "secret"}) |
|||
with client.websocket_connect("/ws/users/me") as websocket: |
|||
data = websocket.receive_text() |
|||
assert data == "secret" |
|||
|
|||
|
|||
def test_security_api_key_no_key_ws(): |
|||
client = TestClient(app) |
|||
with pytest.raises(WebSocketDenialResponse): |
|||
with client.websocket_connect("/ws/users/me"): |
|||
pass |
|||
@ -0,0 +1,45 @@ |
|||
import pytest |
|||
from fastapi import Depends, FastAPI, Security |
|||
from fastapi.security import APIKeyHeader |
|||
from fastapi.testclient import TestClient |
|||
from pydantic import BaseModel |
|||
from starlette.testclient import WebSocketDenialResponse |
|||
from starlette.websockets import WebSocket |
|||
|
|||
app = FastAPI() |
|||
|
|||
api_key = APIKeyHeader(name="key") |
|||
|
|||
|
|||
class User(BaseModel): |
|||
username: str |
|||
|
|||
|
|||
def get_current_user(oauth_header: str = Security(api_key)): |
|||
user = User(username=oauth_header) |
|||
return user |
|||
|
|||
|
|||
@app.websocket("/ws/users/me") |
|||
async def read_current_user( |
|||
websocket: WebSocket, current_user: User = Depends(get_current_user) |
|||
): |
|||
await websocket.accept() |
|||
await websocket.send_text(current_user.username) |
|||
|
|||
|
|||
client = TestClient(app) |
|||
|
|||
|
|||
def test_security_api_key_ws(): |
|||
with client.websocket_connect( |
|||
"/ws/users/me", headers={"key": "secret"} |
|||
) as websocket: |
|||
data = websocket.receive_text() |
|||
assert data == "secret" |
|||
|
|||
|
|||
def test_security_api_key_no_key_ws(): |
|||
with pytest.raises(WebSocketDenialResponse): |
|||
with client.websocket_connect("/ws/users/me"): |
|||
pass |
|||
@ -0,0 +1,43 @@ |
|||
import pytest |
|||
from fastapi import Depends, FastAPI, Security |
|||
from fastapi.security import APIKeyQuery |
|||
from fastapi.testclient import TestClient |
|||
from pydantic import BaseModel |
|||
from starlette.testclient import WebSocketDenialResponse |
|||
from starlette.websockets import WebSocket |
|||
|
|||
app = FastAPI() |
|||
|
|||
api_key = APIKeyQuery(name="key") |
|||
|
|||
|
|||
class User(BaseModel): |
|||
username: str |
|||
|
|||
|
|||
def get_current_user(oauth_header: str = Security(api_key)): |
|||
user = User(username=oauth_header) |
|||
return user |
|||
|
|||
|
|||
@app.websocket("/ws/users/me") |
|||
async def read_current_user( |
|||
websocket: WebSocket, current_user: User = Depends(get_current_user) |
|||
): |
|||
await websocket.accept() |
|||
await websocket.send_text(current_user.username) |
|||
|
|||
|
|||
client = TestClient(app) |
|||
|
|||
|
|||
def test_security_api_key_query_ws(): |
|||
with client.websocket_connect("/ws/users/me?key=secret") as websocket: |
|||
data = websocket.receive_text() |
|||
assert data == "secret" |
|||
|
|||
|
|||
def test_security_api_key_query_no_key_ws(): |
|||
with pytest.raises(WebSocketDenialResponse): |
|||
with client.websocket_connect("/ws/users/me"): |
|||
pass |
|||
@ -0,0 +1,38 @@ |
|||
import pytest |
|||
from fastapi import FastAPI, Security |
|||
from fastapi.security.http import HTTPAuthorizationCredentials, HTTPBase |
|||
from fastapi.testclient import TestClient |
|||
from starlette.testclient import WebSocketDenialResponse |
|||
from starlette.websockets import WebSocket |
|||
|
|||
app = FastAPI() |
|||
|
|||
security = HTTPBase(scheme="Other") |
|||
|
|||
|
|||
@app.websocket("/ws/users/me") |
|||
async def read_current_user( |
|||
websocket: WebSocket, |
|||
credentials: HTTPAuthorizationCredentials = Security(security), |
|||
): |
|||
await websocket.accept() |
|||
await websocket.send_json( |
|||
{"scheme": credentials.scheme, "credentials": credentials.credentials} |
|||
) |
|||
|
|||
|
|||
client = TestClient(app) |
|||
|
|||
|
|||
def test_security_http_base_ws(): |
|||
with client.websocket_connect( |
|||
"/ws/users/me", headers={"Authorization": "Other foobar"} |
|||
) as websocket: |
|||
data = websocket.receive_json() |
|||
assert data == {"scheme": "Other", "credentials": "foobar"} |
|||
|
|||
|
|||
def test_security_http_base_no_credentials_ws(): |
|||
with pytest.raises(WebSocketDenialResponse): |
|||
with client.websocket_connect("/ws/users/me"): |
|||
pass |
|||
@ -0,0 +1,50 @@ |
|||
from base64 import b64encode |
|||
|
|||
import pytest |
|||
from fastapi import FastAPI, Security |
|||
from fastapi.security import HTTPBasic, HTTPBasicCredentials |
|||
from fastapi.testclient import TestClient |
|||
from starlette.testclient import WebSocketDenialResponse |
|||
from starlette.websockets import WebSocket |
|||
|
|||
app = FastAPI() |
|||
|
|||
security = HTTPBasic(realm="simple") |
|||
|
|||
|
|||
@app.websocket("/ws/users/me") |
|||
async def read_current_user( |
|||
websocket: WebSocket, credentials: HTTPBasicCredentials = Security(security) |
|||
): |
|||
await websocket.accept() |
|||
await websocket.send_json( |
|||
{"username": credentials.username, "password": credentials.password} |
|||
) |
|||
|
|||
|
|||
client = TestClient(app) |
|||
|
|||
|
|||
def test_security_http_basic_ws(): |
|||
# Build Basic header |
|||
payload = b64encode(b"john:secret").decode("ascii") |
|||
auth_header = f"Basic {payload}" |
|||
with client.websocket_connect( |
|||
"/ws/users/me", headers={"Authorization": auth_header} |
|||
) as websocket: |
|||
data = websocket.receive_json() |
|||
assert data == {"username": "john", "password": "secret"} |
|||
|
|||
|
|||
def test_security_http_basic_no_credentials_ws(): |
|||
with pytest.raises(WebSocketDenialResponse): |
|||
with client.websocket_connect("/ws/users/me"): |
|||
pass |
|||
|
|||
|
|||
def test_security_http_basic_invalid_credentials_ws(): |
|||
with pytest.raises(WebSocketDenialResponse): |
|||
with client.websocket_connect( |
|||
"/ws/users/me", headers={"Authorization": "Basic notbase64"} |
|||
): |
|||
pass |
|||
@ -0,0 +1,46 @@ |
|||
import pytest |
|||
from fastapi import FastAPI, Security |
|||
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer |
|||
from fastapi.testclient import TestClient |
|||
from starlette.testclient import WebSocketDenialResponse |
|||
from starlette.websockets import WebSocket |
|||
|
|||
app = FastAPI() |
|||
|
|||
security = HTTPBearer() |
|||
|
|||
|
|||
@app.websocket("/ws/users/me") |
|||
async def read_current_user( |
|||
websocket: WebSocket, |
|||
credentials: HTTPAuthorizationCredentials = Security(security), |
|||
): |
|||
await websocket.accept() |
|||
await websocket.send_json( |
|||
{"scheme": credentials.scheme, "credentials": credentials.credentials} |
|||
) |
|||
|
|||
|
|||
client = TestClient(app) |
|||
|
|||
|
|||
def test_security_http_bearer_ws(): |
|||
with client.websocket_connect( |
|||
"/ws/users/me", headers={"Authorization": "Bearer foobar"} |
|||
) as websocket: |
|||
data = websocket.receive_json() |
|||
assert data == {"scheme": "Bearer", "credentials": "foobar"} |
|||
|
|||
|
|||
def test_security_http_bearer_no_credentials_ws(): |
|||
with pytest.raises(WebSocketDenialResponse): |
|||
with client.websocket_connect("/ws/users/me"): |
|||
pass |
|||
|
|||
|
|||
def test_security_http_bearer_incorrect_scheme_ws(): |
|||
with pytest.raises(WebSocketDenialResponse): |
|||
with client.websocket_connect( |
|||
"/ws/users/me", headers={"Authorization": "Basic notreally"} |
|||
): |
|||
pass |
|||
@ -0,0 +1,46 @@ |
|||
import pytest |
|||
from fastapi import FastAPI, Security |
|||
from fastapi.security import HTTPAuthorizationCredentials, HTTPDigest |
|||
from fastapi.testclient import TestClient |
|||
from starlette.testclient import WebSocketDenialResponse |
|||
from starlette.websockets import WebSocket |
|||
|
|||
app = FastAPI() |
|||
|
|||
security = HTTPDigest() |
|||
|
|||
|
|||
@app.websocket("/ws/users/me") |
|||
async def read_current_user( |
|||
websocket: WebSocket, |
|||
credentials: HTTPAuthorizationCredentials = Security(security), |
|||
): |
|||
await websocket.accept() |
|||
await websocket.send_json( |
|||
{"scheme": credentials.scheme, "credentials": credentials.credentials} |
|||
) |
|||
|
|||
|
|||
client = TestClient(app) |
|||
|
|||
|
|||
def test_security_http_digest_ws(): |
|||
with client.websocket_connect( |
|||
"/ws/users/me", headers={"Authorization": "Digest foobar"} |
|||
) as websocket: |
|||
data = websocket.receive_json() |
|||
assert data == {"scheme": "Digest", "credentials": "foobar"} |
|||
|
|||
|
|||
def test_security_http_digest_no_credentials_ws(): |
|||
with pytest.raises(WebSocketDenialResponse): |
|||
with client.websocket_connect("/ws/users/me"): |
|||
pass |
|||
|
|||
|
|||
def test_security_http_digest_incorrect_scheme_ws(): |
|||
with pytest.raises(WebSocketDenialResponse): |
|||
with client.websocket_connect( |
|||
"/ws/users/me", headers={"Authorization": "Basic notreally"} |
|||
): |
|||
pass |
|||
@ -0,0 +1,45 @@ |
|||
import pytest |
|||
from fastapi import FastAPI, Security |
|||
from fastapi.security import OAuth2AuthorizationCodeBearer |
|||
from fastapi.testclient import TestClient |
|||
from starlette.testclient import WebSocketDenialResponse |
|||
from starlette.websockets import WebSocket |
|||
|
|||
app = FastAPI() |
|||
|
|||
oauth2_scheme = OAuth2AuthorizationCodeBearer( |
|||
authorizationUrl="/api/oauth/authorize", |
|||
tokenUrl="/api/oauth/token", |
|||
scopes={"read": "Read access", "write": "Write access"}, |
|||
) |
|||
|
|||
|
|||
@app.websocket("/ws/admin") |
|||
async def read_admin(websocket: WebSocket, token: str = Security(oauth2_scheme)): |
|||
await websocket.accept() |
|||
await websocket.send_text(token) |
|||
|
|||
|
|||
client = TestClient(app) |
|||
|
|||
|
|||
def test_security_oauth2_authorization_code_bearer_ws(): |
|||
with client.websocket_connect( |
|||
"/ws/admin", headers={"Authorization": "Bearer faketoken"} |
|||
) as websocket: |
|||
data = websocket.receive_text() |
|||
assert data == "faketoken" |
|||
|
|||
|
|||
def test_security_oauth2_authorization_code_bearer_no_header_ws(): |
|||
with pytest.raises(WebSocketDenialResponse): |
|||
with client.websocket_connect("/ws/admin"): |
|||
pass |
|||
|
|||
|
|||
def test_security_oauth2_authorization_code_bearer_wrong_scheme_ws(): |
|||
with pytest.raises(WebSocketDenialResponse): |
|||
with client.websocket_connect( |
|||
"/ws/admin", headers={"Authorization": "Basic nope"} |
|||
): |
|||
pass |
|||
@ -0,0 +1,41 @@ |
|||
import pytest |
|||
from fastapi import FastAPI, Security |
|||
from fastapi.security import OAuth2PasswordBearer |
|||
from fastapi.testclient import TestClient |
|||
from starlette.testclient import WebSocketDenialResponse |
|||
from starlette.websockets import WebSocket |
|||
|
|||
app = FastAPI() |
|||
|
|||
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") |
|||
|
|||
|
|||
@app.websocket("/ws/token") |
|||
async def read_token(websocket: WebSocket, token: str = Security(oauth2_scheme)): |
|||
await websocket.accept() |
|||
await websocket.send_text(token) |
|||
|
|||
|
|||
client = TestClient(app) |
|||
|
|||
|
|||
def test_security_oauth2_password_bearer_ws(): |
|||
with client.websocket_connect( |
|||
"/ws/token", headers={"Authorization": "Bearer faketoken"} |
|||
) as websocket: |
|||
data = websocket.receive_text() |
|||
assert data == "faketoken" |
|||
|
|||
|
|||
def test_security_oauth2_password_bearer_no_header_ws(): |
|||
with pytest.raises(WebSocketDenialResponse): |
|||
with client.websocket_connect("/ws/token"): |
|||
pass |
|||
|
|||
|
|||
def test_security_oauth2_password_bearer_wrong_scheme_ws(): |
|||
with pytest.raises(WebSocketDenialResponse): |
|||
with client.websocket_connect( |
|||
"/ws/token", headers={"Authorization": "Basic nope"} |
|||
): |
|||
pass |
|||
@ -0,0 +1,53 @@ |
|||
import pytest |
|||
from fastapi import Depends, FastAPI, Security |
|||
from fastapi.security.open_id_connect_url import OpenIdConnect |
|||
from fastapi.testclient import TestClient |
|||
from pydantic import BaseModel |
|||
from starlette.testclient import WebSocketDenialResponse |
|||
from starlette.websockets import WebSocket |
|||
|
|||
app = FastAPI() |
|||
|
|||
oid = OpenIdConnect(openIdConnectUrl="/openid") |
|||
|
|||
|
|||
class User(BaseModel): |
|||
username: str |
|||
|
|||
|
|||
def get_current_user(oauth_header: str = Security(oid)): |
|||
user = User(username=oauth_header) |
|||
return user |
|||
|
|||
|
|||
@app.websocket("/ws/users/me") |
|||
async def read_current_user( |
|||
websocket: WebSocket, current_user: User = Depends(get_current_user) |
|||
): |
|||
await websocket.accept() |
|||
await websocket.send_json({"username": current_user.username}) |
|||
|
|||
|
|||
client = TestClient(app) |
|||
|
|||
|
|||
def test_security_openid_connect_ws(): |
|||
with client.websocket_connect( |
|||
"/ws/users/me", headers={"Authorization": "Bearer footokenbar"} |
|||
) as websocket: |
|||
data = websocket.receive_json() |
|||
assert data == {"username": "Bearer footokenbar"} |
|||
|
|||
|
|||
def test_security_openid_connect_other_header_ws(): |
|||
with client.websocket_connect( |
|||
"/ws/users/me", headers={"Authorization": "Other footokenbar"} |
|||
) as websocket: |
|||
data = websocket.receive_json() |
|||
assert data == {"username": "Other footokenbar"} |
|||
|
|||
|
|||
def test_security_openid_connect_no_header_ws(): |
|||
with pytest.raises(WebSocketDenialResponse): |
|||
with client.websocket_connect("/ws/users/me"): |
|||
pass |
|||
Loading…
Reference in new issue