committed by
GitHub
7 changed files with 270 additions and 3 deletions
@ -0,0 +1,39 @@ |
|||
from typing import Union |
|||
|
|||
from fastapi import FastAPI, HTTPException, Request |
|||
from fastapi.responses import JSONResponse |
|||
|
|||
app = FastAPI() |
|||
|
|||
FAKE_DB = { |
|||
0: {"name": "Admin", "role": "ADMIN"}, |
|||
1: {"name": "User 1", "role": "USER"}, |
|||
2: {"name": "User 2", "role": "USER"}, |
|||
} |
|||
|
|||
|
|||
@app.exception_handler([401, 403]) |
|||
async def handle_auth_errors(request: Request, exc: Exception): |
|||
return JSONResponse( |
|||
status_code=exc.status_code if isinstance(exc, HTTPException) else 403, |
|||
content={"detail": "Access denied. Check your credentials or permissions."}, |
|||
) |
|||
|
|||
|
|||
@app.get("/secrets/") |
|||
async def get_secrets(auth_user_id: Union[int, None] = None): |
|||
# Get authenticated user info (not a production-ready code) |
|||
if auth_user_id is not None: |
|||
auth_user_info = FAKE_DB.get(auth_user_id) |
|||
else: |
|||
auth_user_info = None |
|||
|
|||
# Return 401 status code if user not authenticated |
|||
if auth_user_info is None: |
|||
raise HTTPException(status_code=401) # Not authenticated |
|||
|
|||
# Return 403 status code if user is not authorized to get secret information |
|||
if auth_user_info["role"] != "ADMIN": |
|||
raise HTTPException(status_code=403) # Not authorized |
|||
|
|||
return {"data": "Secret information"} |
@ -0,0 +1,40 @@ |
|||
from fastapi import FastAPI, File, HTTPException, Request, UploadFile |
|||
from fastapi.responses import JSONResponse |
|||
|
|||
MAX_FILE_SIZE_MB = 5 |
|||
ALLOWED_TYPES = {"application/pdf", "image/jpeg"} |
|||
|
|||
app = FastAPI() |
|||
|
|||
|
|||
class FileTooLargeError(HTTPException): |
|||
def __init__(self): |
|||
super().__init__(status_code=413, detail="The uploaded file is too large.") |
|||
|
|||
|
|||
class UnsupportedFileTypeError(HTTPException): |
|||
def __init__(self): |
|||
super().__init__(status_code=415, detail="Unsupported file type") |
|||
|
|||
|
|||
@app.exception_handler((FileTooLargeError, UnsupportedFileTypeError)) |
|||
async def custom_exception_handler(request: Request, exc: HTTPException): |
|||
return JSONResponse( |
|||
status_code=exc.status_code, |
|||
content={"error": exc.detail, "hint": "Need help? Contact support@example.com"}, |
|||
) |
|||
|
|||
|
|||
@app.post("/upload/") |
|||
async def upload_file(file: UploadFile = File(...)): |
|||
# Validate file type |
|||
if file.content_type not in ALLOWED_TYPES: |
|||
raise UnsupportedFileTypeError() |
|||
|
|||
# Validate file size (read contents to check size in memory) |
|||
contents = await file.read() |
|||
size_mb = len(contents) / (1024 * 1024) |
|||
if size_mb > MAX_FILE_SIZE_MB: |
|||
raise FileTooLargeError() |
|||
|
|||
return {"filename": file.filename, "message": "File uploaded successfully!"} |
@ -0,0 +1,27 @@ |
|||
from fastapi.testclient import TestClient |
|||
|
|||
from docs_src.handling_errors.tutorial007 import app |
|||
|
|||
client = TestClient(app) |
|||
|
|||
|
|||
def test_unauthenticated(): |
|||
response = client.get("/secrets") |
|||
assert response.status_code == 401, response.text |
|||
assert response.json() == { |
|||
"detail": "Access denied. Check your credentials or permissions." |
|||
} |
|||
|
|||
|
|||
def test_unauthorized(): |
|||
response = client.get("/secrets", params={"auth_user_id": 1}) |
|||
assert response.status_code == 403, response.text |
|||
assert response.json() == { |
|||
"detail": "Access denied. Check your credentials or permissions." |
|||
} |
|||
|
|||
|
|||
def test_success(): |
|||
response = client.get("/secrets", params={"auth_user_id": 0}) |
|||
assert response.status_code == 200, response.text |
|||
assert response.json() == {"data": "Secret information"} |
@ -0,0 +1,59 @@ |
|||
from pathlib import Path |
|||
from unittest.mock import patch |
|||
|
|||
from fastapi.testclient import TestClient |
|||
|
|||
from docs_src.handling_errors import tutorial008 |
|||
from docs_src.handling_errors.tutorial008 import app |
|||
|
|||
client = TestClient(app) |
|||
|
|||
|
|||
def test_unsupported_file_type(tmp_path: Path): |
|||
file = tmp_path / "test.txt" |
|||
file.write_text("<file content>") |
|||
with open(file, "+rb") as fp: |
|||
response = client.post( |
|||
"/upload", |
|||
files={"file": ("test.txt", fp, "text/plain")}, |
|||
) |
|||
assert response.status_code == 415, response.text |
|||
assert response.json() == { |
|||
"error": "Unsupported file type", |
|||
"hint": "Need help? Contact support@example.com", |
|||
} |
|||
|
|||
|
|||
def test_file_too_large(tmp_path: Path): |
|||
file = tmp_path / "test.pdf" |
|||
file.write_text("<file content>" * 100) # ~1.37 kB |
|||
with patch.object( |
|||
tutorial008, |
|||
"MAX_FILE_SIZE_MB", |
|||
new=0.001, # MAX_FILE_SIZE_MB = 1 kB |
|||
): |
|||
with open(file, "+rb") as fp: |
|||
response = client.post( |
|||
"/upload", |
|||
files={"file": ("test.pdf", fp, "application/pdf")}, |
|||
) |
|||
assert response.status_code == 413, response.text |
|||
assert response.json() == { |
|||
"error": "The uploaded file is too large.", |
|||
"hint": "Need help? Contact support@example.com", |
|||
} |
|||
|
|||
|
|||
def test_success(tmp_path: Path): |
|||
file = tmp_path / "test.pdf" |
|||
file.write_text("<file content>") |
|||
with open(file, "+rb") as fp: |
|||
response = client.post( |
|||
"/upload", |
|||
files={"file": ("test.pdf", fp, "application/pdf")}, |
|||
) |
|||
assert response.status_code == 200, response.text |
|||
assert response.json() == { |
|||
"filename": "test.pdf", |
|||
"message": "File uploaded successfully!", |
|||
} |
Loading…
Reference in new issue