Browse Source

♻️ Refactor API in `utils.py` (#573)

pull/13907/head
Alejandra 2 years ago
committed by GitHub
parent
commit
d5982236ac
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 34
      src/backend/app/app/api/api_v1/endpoints/utils.py

34
src/backend/app/app/api/api_v1/endpoints/utils.py

@ -1,35 +1,35 @@
from typing import Any
from fastapi import APIRouter, Depends from fastapi import APIRouter, Depends
from pydantic.networks import EmailStr from pydantic.networks import EmailStr
from app import models, schemas from app.api.deps import get_current_active_superuser
from app.api import deps
from app.core.celery_app import celery_app from app.core.celery_app import celery_app
from app.models import Message
from app.utils import send_test_email from app.utils import send_test_email
router = APIRouter() router = APIRouter()
@router.post("/test-celery/", response_model=schemas.Msg, status_code=201) @router.post(
def test_celery( "/test-celery/",
msg: schemas.Msg, dependencies=[Depends(get_current_active_superuser)],
current_user: models.User = Depends(deps.get_current_active_superuser), status_code=201,
) -> Any: )
def test_celery(body: Message) -> Message:
""" """
Test Celery worker. Test Celery worker.
""" """
celery_app.send_task("app.worker.test_celery", args=[msg.msg]) celery_app.send_task("app.worker.test_celery", args=[body.message])
return {"msg": "Word received"} return Message(message="Word received")
@router.post("/test-email/", response_model=schemas.Msg, status_code=201) @router.post(
def test_email( "/test-email/",
email_to: EmailStr, dependencies=[Depends(get_current_active_superuser)],
current_user: models.User = Depends(deps.get_current_active_superuser), status_code=201,
) -> Any: )
def test_email(email_to: EmailStr) -> Message:
""" """
Test emails. Test emails.
""" """
send_test_email(email_to=email_to) send_test_email(email_to=email_to)
return {"msg": "Test email sent"} return Message(message="Test email sent")

Loading…
Cancel
Save