pythonasyncioapiasyncfastapiframeworkjsonjson-schemaopenapiopenapi3pydanticpython-typespython3redocreststarletteswaggerswagger-uiuvicornweb
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
401 lines
12 KiB
401 lines
12 KiB
import logging
|
|
import secrets
|
|
import subprocess
|
|
import time
|
|
from collections import Counter
|
|
from datetime import datetime, timedelta, timezone
|
|
from pathlib import Path
|
|
from typing import Any, Container, Union
|
|
|
|
import httpx
|
|
import yaml
|
|
from github import Github
|
|
from pydantic import BaseModel, SecretStr
|
|
from pydantic_settings import BaseSettings
|
|
|
|
github_graphql_url = "https://api.github.com/graphql"
|
|
questions_category_id = "MDE4OkRpc2N1c3Npb25DYXRlZ29yeTMyMDAxNDM0"
|
|
|
|
discussions_query = """
|
|
query Q($after: String, $category_id: ID) {
|
|
repository(name: "fastapi", owner: "fastapi") {
|
|
discussions(first: 100, after: $after, categoryId: $category_id) {
|
|
edges {
|
|
cursor
|
|
node {
|
|
number
|
|
author {
|
|
login
|
|
avatarUrl
|
|
url
|
|
}
|
|
createdAt
|
|
comments(first: 50) {
|
|
totalCount
|
|
nodes {
|
|
createdAt
|
|
author {
|
|
login
|
|
avatarUrl
|
|
url
|
|
}
|
|
isAnswer
|
|
replies(first: 10) {
|
|
totalCount
|
|
nodes {
|
|
createdAt
|
|
author {
|
|
login
|
|
avatarUrl
|
|
url
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
"""
|
|
|
|
|
|
class Author(BaseModel):
|
|
login: str
|
|
avatarUrl: str | None = None
|
|
url: str | None = None
|
|
|
|
|
|
class CommentsNode(BaseModel):
|
|
createdAt: datetime
|
|
author: Union[Author, None] = None
|
|
|
|
|
|
class Replies(BaseModel):
|
|
totalCount: int
|
|
nodes: list[CommentsNode]
|
|
|
|
|
|
class DiscussionsCommentsNode(CommentsNode):
|
|
replies: Replies
|
|
|
|
|
|
class DiscussionsComments(BaseModel):
|
|
totalCount: int
|
|
nodes: list[DiscussionsCommentsNode]
|
|
|
|
|
|
class DiscussionsNode(BaseModel):
|
|
number: int
|
|
author: Union[Author, None] = None
|
|
title: str | None = None
|
|
createdAt: datetime
|
|
comments: DiscussionsComments
|
|
|
|
|
|
class DiscussionsEdge(BaseModel):
|
|
cursor: str
|
|
node: DiscussionsNode
|
|
|
|
|
|
class Discussions(BaseModel):
|
|
edges: list[DiscussionsEdge]
|
|
|
|
|
|
class DiscussionsRepository(BaseModel):
|
|
discussions: Discussions
|
|
|
|
|
|
class DiscussionsResponseData(BaseModel):
|
|
repository: DiscussionsRepository
|
|
|
|
|
|
class DiscussionsResponse(BaseModel):
|
|
data: DiscussionsResponseData
|
|
|
|
|
|
class Settings(BaseSettings):
|
|
github_token: SecretStr
|
|
github_repository: str
|
|
httpx_timeout: int = 30
|
|
|
|
|
|
def get_graphql_response(
|
|
*,
|
|
settings: Settings,
|
|
query: str,
|
|
after: Union[str, None] = None,
|
|
category_id: Union[str, None] = None,
|
|
) -> dict[str, Any]:
|
|
headers = {"Authorization": f"token {settings.github_token.get_secret_value()}"}
|
|
variables = {"after": after, "category_id": category_id}
|
|
response = httpx.post(
|
|
github_graphql_url,
|
|
headers=headers,
|
|
timeout=settings.httpx_timeout,
|
|
json={"query": query, "variables": variables, "operationName": "Q"},
|
|
)
|
|
if response.status_code != 200:
|
|
logging.error(
|
|
f"Response was not 200, after: {after}, category_id: {category_id}"
|
|
)
|
|
logging.error(response.text)
|
|
raise RuntimeError(response.text)
|
|
data = response.json()
|
|
if "errors" in data:
|
|
logging.error(f"Errors in response, after: {after}, category_id: {category_id}")
|
|
logging.error(data["errors"])
|
|
logging.error(response.text)
|
|
raise RuntimeError(response.text)
|
|
return data
|
|
|
|
|
|
def get_graphql_question_discussion_edges(
|
|
*,
|
|
settings: Settings,
|
|
after: Union[str, None] = None,
|
|
) -> list[DiscussionsEdge]:
|
|
data = get_graphql_response(
|
|
settings=settings,
|
|
query=discussions_query,
|
|
after=after,
|
|
category_id=questions_category_id,
|
|
)
|
|
graphql_response = DiscussionsResponse.model_validate(data)
|
|
return graphql_response.data.repository.discussions.edges
|
|
|
|
|
|
class DiscussionExpertsResults(BaseModel):
|
|
commenters: Counter[str]
|
|
last_month_commenters: Counter[str]
|
|
three_months_commenters: Counter[str]
|
|
six_months_commenters: Counter[str]
|
|
one_year_commenters: Counter[str]
|
|
authors: dict[str, Author]
|
|
|
|
|
|
def get_discussion_nodes(settings: Settings) -> list[DiscussionsNode]:
|
|
discussion_nodes: list[DiscussionsNode] = []
|
|
discussion_edges = get_graphql_question_discussion_edges(settings=settings)
|
|
|
|
while discussion_edges:
|
|
for discussion_edge in discussion_edges:
|
|
discussion_nodes.append(discussion_edge.node)
|
|
last_edge = discussion_edges[-1]
|
|
# Handle GitHub secondary rate limits, requests per minute
|
|
time.sleep(5)
|
|
discussion_edges = get_graphql_question_discussion_edges(
|
|
settings=settings, after=last_edge.cursor
|
|
)
|
|
return discussion_nodes
|
|
|
|
|
|
def get_discussions_experts(
|
|
discussion_nodes: list[DiscussionsNode],
|
|
) -> DiscussionExpertsResults:
|
|
commenters = Counter[str]()
|
|
last_month_commenters = Counter[str]()
|
|
three_months_commenters = Counter[str]()
|
|
six_months_commenters = Counter[str]()
|
|
one_year_commenters = Counter[str]()
|
|
authors: dict[str, Author] = {}
|
|
|
|
now = datetime.now(tz=timezone.utc)
|
|
one_month_ago = now - timedelta(days=30)
|
|
three_months_ago = now - timedelta(days=90)
|
|
six_months_ago = now - timedelta(days=180)
|
|
one_year_ago = now - timedelta(days=365)
|
|
|
|
for discussion in discussion_nodes:
|
|
discussion_author_name = None
|
|
if discussion.author:
|
|
authors[discussion.author.login] = discussion.author
|
|
discussion_author_name = discussion.author.login
|
|
discussion_commentors: dict[str, datetime] = {}
|
|
for comment in discussion.comments.nodes:
|
|
if comment.author:
|
|
authors[comment.author.login] = comment.author
|
|
if comment.author.login != discussion_author_name:
|
|
author_time = discussion_commentors.get(
|
|
comment.author.login, comment.createdAt
|
|
)
|
|
discussion_commentors[comment.author.login] = max(
|
|
author_time, comment.createdAt
|
|
)
|
|
for reply in comment.replies.nodes:
|
|
if reply.author:
|
|
authors[reply.author.login] = reply.author
|
|
if reply.author.login != discussion_author_name:
|
|
author_time = discussion_commentors.get(
|
|
reply.author.login, reply.createdAt
|
|
)
|
|
discussion_commentors[reply.author.login] = max(
|
|
author_time, reply.createdAt
|
|
)
|
|
for author_name, author_time in discussion_commentors.items():
|
|
commenters[author_name] += 1
|
|
if author_time > one_month_ago:
|
|
last_month_commenters[author_name] += 1
|
|
if author_time > three_months_ago:
|
|
three_months_commenters[author_name] += 1
|
|
if author_time > six_months_ago:
|
|
six_months_commenters[author_name] += 1
|
|
if author_time > one_year_ago:
|
|
one_year_commenters[author_name] += 1
|
|
discussion_experts_results = DiscussionExpertsResults(
|
|
authors=authors,
|
|
commenters=commenters,
|
|
last_month_commenters=last_month_commenters,
|
|
three_months_commenters=three_months_commenters,
|
|
six_months_commenters=six_months_commenters,
|
|
one_year_commenters=one_year_commenters,
|
|
)
|
|
return discussion_experts_results
|
|
|
|
|
|
def get_top_users(
|
|
*,
|
|
counter: Counter[str],
|
|
authors: dict[str, Author],
|
|
skip_users: Container[str],
|
|
min_count: int = 2,
|
|
) -> list[dict[str, Any]]:
|
|
users: list[dict[str, Any]] = []
|
|
for commenter, count in counter.most_common(50):
|
|
if commenter in skip_users:
|
|
continue
|
|
if count >= min_count:
|
|
author = authors[commenter]
|
|
users.append(
|
|
{
|
|
"login": commenter,
|
|
"count": count,
|
|
"avatarUrl": author.avatarUrl,
|
|
"url": author.url,
|
|
}
|
|
)
|
|
return users
|
|
|
|
|
|
def get_users_to_write(
|
|
*,
|
|
counter: Counter[str],
|
|
authors: dict[str, Author],
|
|
min_count: int = 2,
|
|
) -> list[dict[str, Any]]:
|
|
users: dict[str, Any] = {}
|
|
users_list: list[dict[str, Any]] = []
|
|
for user, count in counter.most_common(60):
|
|
if count >= min_count:
|
|
author = authors[user]
|
|
user_data = {
|
|
"login": user,
|
|
"count": count,
|
|
"avatarUrl": author.avatarUrl,
|
|
"url": author.url,
|
|
}
|
|
users[user] = user_data
|
|
users_list.append(user_data)
|
|
return users_list
|
|
|
|
|
|
def update_content(*, content_path: Path, new_content: Any) -> bool:
|
|
old_content = content_path.read_text(encoding="utf-8")
|
|
|
|
new_content = yaml.dump(new_content, sort_keys=False, width=200, allow_unicode=True)
|
|
if old_content == new_content:
|
|
logging.info(f"The content hasn't changed for {content_path}")
|
|
return False
|
|
content_path.write_text(new_content, encoding="utf-8")
|
|
logging.info(f"Updated {content_path}")
|
|
return True
|
|
|
|
|
|
def main() -> None:
|
|
logging.basicConfig(level=logging.INFO)
|
|
settings = Settings()
|
|
logging.info(f"Using config: {settings.model_dump_json()}")
|
|
g = Github(settings.github_token.get_secret_value())
|
|
repo = g.get_repo(settings.github_repository)
|
|
|
|
discussion_nodes = get_discussion_nodes(settings=settings)
|
|
experts_results = get_discussions_experts(discussion_nodes=discussion_nodes)
|
|
|
|
authors = experts_results.authors
|
|
maintainers_logins = {"tiangolo"}
|
|
maintainers = []
|
|
for login in maintainers_logins:
|
|
user = authors[login]
|
|
maintainers.append(
|
|
{
|
|
"login": login,
|
|
"answers": experts_results.commenters[login],
|
|
"avatarUrl": user.avatarUrl,
|
|
"url": user.url,
|
|
}
|
|
)
|
|
|
|
experts = get_users_to_write(
|
|
counter=experts_results.commenters,
|
|
authors=authors,
|
|
)
|
|
last_month_experts = get_users_to_write(
|
|
counter=experts_results.last_month_commenters,
|
|
authors=authors,
|
|
)
|
|
three_months_experts = get_users_to_write(
|
|
counter=experts_results.three_months_commenters,
|
|
authors=authors,
|
|
)
|
|
six_months_experts = get_users_to_write(
|
|
counter=experts_results.six_months_commenters,
|
|
authors=authors,
|
|
)
|
|
one_year_experts = get_users_to_write(
|
|
counter=experts_results.one_year_commenters,
|
|
authors=authors,
|
|
)
|
|
|
|
people = {
|
|
"maintainers": maintainers,
|
|
"experts": experts,
|
|
"last_month_experts": last_month_experts,
|
|
"three_months_experts": three_months_experts,
|
|
"six_months_experts": six_months_experts,
|
|
"one_year_experts": one_year_experts,
|
|
}
|
|
|
|
# For local development
|
|
# people_path = Path("../docs/en/data/people.yml")
|
|
people_path = Path("./docs/en/data/people.yml")
|
|
|
|
updated = update_content(content_path=people_path, new_content=people)
|
|
|
|
if not updated:
|
|
logging.info("The data hasn't changed, finishing.")
|
|
return
|
|
|
|
logging.info("Setting up GitHub Actions git user")
|
|
subprocess.run(["git", "config", "user.name", "github-actions"], check=True)
|
|
subprocess.run(
|
|
["git", "config", "user.email", "[email protected]"], check=True
|
|
)
|
|
branch_name = f"fastapi-people-experts-{secrets.token_hex(4)}"
|
|
logging.info(f"Creating a new branch {branch_name}")
|
|
subprocess.run(["git", "checkout", "-b", branch_name], check=True)
|
|
logging.info("Adding updated file")
|
|
subprocess.run(["git", "add", str(people_path)], check=True)
|
|
logging.info("Committing updated file")
|
|
message = "👥 Update FastAPI People - Experts"
|
|
subprocess.run(["git", "commit", "-m", message], check=True)
|
|
logging.info("Pushing branch")
|
|
subprocess.run(["git", "push", "origin", branch_name], check=True)
|
|
logging.info("Creating PR")
|
|
pr = repo.create_pull(title=message, body=message, base="master", head=branch_name)
|
|
logging.info(f"Created PR: {pr.number}")
|
|
logging.info("Finished")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|
|
|