You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

130 lines
3.8 KiB

from fastapi import APIRouter, Request, Response, HTTPException
import ipaddress
from typing import Dict, Any
from app.api.deps import CurrentUser
router = APIRouter(prefix="/ip", tags=["ip"])
@router.get("/", response_model=dict)
@router.get("", response_model=dict)
async def get_client_ip(request: Request):
"""
获取客户端IP地址
返回客户端的公网IP地址,通常是通过代理转发的X-Forwarded-For头获取
"""
# 尝试从各种HTTP头获取IP地址
client_ip = _extract_client_ip(request)
return {"ip": client_ip}
@router.get("/text/", response_class=Response)
async def get_client_ip_text(request: Request):
"""
获取客户端IP地址(纯文本格式)
返回客户端的公网IP地址,以纯文本格式
"""
# 尝试从各种HTTP头获取IP地址
client_ip = _extract_client_ip(request)
return Response(content=client_ip, media_type="text/plain")
@router.get("/headers/", response_model=dict)
async def get_request_headers(request: Request):
"""
获取请求的所有头信息
返回包含所有HTTP请求头的字典
"""
return {"headers": dict(request.headers)}
# 示例:添加需要登录的IP端点
@router.get("/secure/", response_model=dict)
async def get_secure_ip(request: Request, current_user: CurrentUser):
"""
获取客户端IP地址(需要登录)
这是一个需要用户登录的示例端点
"""
client_ip = _extract_client_ip(request)
return {
"ip": client_ip,
"user_id": str(current_user.id),
"user_email": current_user.email
}
@router.get("/analyze/{ip_address}", response_model=Dict[str, Any])
async def analyze_ip(ip_address: str):
"""
分析指定的IP地址
返回IP地址的基本信息,如版本、是否私有等
"""
try:
ip = ipaddress.ip_address(ip_address)
result = {
"ip": str(ip),
"version": f"IPv{ip.version}",
"is_private": ip.is_private,
"is_global": ip.is_global,
"is_multicast": ip.is_multicast,
"is_loopback": ip.is_loopback,
"is_link_local": ip.is_link_local,
}
# IPv6特有属性
if ip.version == 6:
ipv6 = ipaddress.IPv6Address(ip_address)
result.update({
"is_site_local": ipv6.is_site_local,
"ipv4_mapped": ipv6.ipv4_mapped is not None,
"teredo": ipv6.teredo is not None,
"sixtofour": ipv6.sixtofour is not None
})
# 尝试获取反向DNS查询名称
try:
import socket
result["reverse_pointer"] = socket.gethostbyaddr(ip_address)[0]
except (socket.herror, socket.gaierror):
result["reverse_pointer"] = None
return result
except ValueError:
raise HTTPException(
status_code=400,
detail=f"无效的IP地址: {ip_address}"
)
def _extract_client_ip(request: Request) -> str:
"""
从请求中提取客户端IP地址
按照优先级尝试从不同的HTTP头获取
"""
# 检查常见的代理头
for header in [
"X-Forwarded-For",
"X-Real-IP",
"CF-Connecting-IP", # Cloudflare
"True-Client-IP", # Akamai/Cloudflare
"X-Client-IP"
]:
if header_value := request.headers.get(header):
# 如果是X-Forwarded-For,可能包含多个IP,取第一个
if header == "X-Forwarded-For" and "," in header_value:
return header_value.split(",")[0].strip()
return header_value.strip()
# 如果没有代理头,使用直接连接的客户端地址
return request.client.host if request.client else "unknown"