Browse Source

Migrate utils.py

pull/10109/head
dolfies 4 years ago
parent
commit
0489ad0f46
  1. 245
      discord/utils.py

245
discord/utils.py

@ -56,11 +56,18 @@ import functools
from inspect import isawaitable as _isawaitable, signature as _signature
from operator import attrgetter
import json
import logging
import os
import platform
import re
import subprocess
import sys
import tempfile
from threading import Timer
import types
import warnings
from .enums import BrowserEnum
from .errors import InvalidArgument
try:
@ -88,6 +95,8 @@ __all__ = (
DISCORD_EPOCH = 1420070400000
_log = logging.getLogger(__name__)
class _MissingSentinel:
def __eq__(self, other):
@ -119,6 +128,7 @@ class _cached_property:
if TYPE_CHECKING:
from aiohttp import ClientSession
from functools import cached_property as cached_property
from typing_extensions import ParamSpec
@ -273,13 +283,13 @@ def oauth_url(
scopes: Iterable[str] = MISSING,
disable_guild_select: bool = False,
) -> str:
"""A helper function that returns the OAuth2 URL for inviting the bot
"""A helper function that returns the OAuth2 URL for inviting a bot
into guilds.
Parameters
-----------
client_id: Union[:class:`int`, :class:`str`]
The client ID for your bot.
The client ID for the bot.
permissions: :class:`~discord.Permissions`
The permissions you're requesting. If not given then you won't be requesting any
permissions.
@ -288,7 +298,7 @@ def oauth_url(
redirect_uri: :class:`str`
An optional valid redirect URI.
scopes: Iterable[:class:`str`]
An optional valid list of scopes. Defaults to ``('bot',)``.
An optional valid list of scopes. Defaults to ``('bot', 'application.commands')``.
.. versionadded:: 1.7
disable_guild_select: :class:`bool`
@ -302,7 +312,7 @@ def oauth_url(
The OAuth2 URL for inviting the bot into guilds.
"""
url = f'https://discord.com/oauth2/authorize?client_id={client_id}'
url += '&scope=' + '+'.join(scopes or ('bot',))
url += '&scope=' + '+'.join(scopes or ('bot', 'application.commands'))
if permissions is not MISSING:
url += f'&permissions={permissions.value}'
if guild is not MISSING:
@ -1017,3 +1027,230 @@ def format_dt(dt: datetime.datetime, /, style: Optional[TimestampStyle] = None)
if style is None:
return f'<t:{int(dt.timestamp())}>'
return f'<t:{int(dt.timestamp())}:{style}>'
class ExpiringQueue(asyncio.Queue): # Inspired from https://github.com/NoahCardoza/CaptchaHarvester
def __init__(self, timeout: int, maxsize: int = 0) -> None:
super().__init__(maxsize)
self.timeout = timeout
self.timers: asyncio.Queue = asyncio.Queue()
async def put(self, item: str) -> None:
thread: Timer = Timer(self.timeout, self.expire)
thread.start()
await self.timers.put(thread)
await super().put(item)
async def get(self, block: bool = True) -> str:
if block:
thread = await self.timers.get()
else:
thread = self.timers.get_nowait()
thread.cancel()
if block:
return await super().get()
else:
return self.get_nowait()
def expire(self) -> None:
try:
self._queue.popleft()
except:
pass
def to_list(self) -> List[str]:
return list(self._queue)
class ExpiringString(collections.UserString):
def __init__(self, data: str, timeout: int) -> None:
super().__init__(data)
self._timer: Timer = Timer(timeout, self._destruct)
self._timer.start()
def _update(self, data: str, timeout: int) -> None:
try:
self._timer.cancel()
except:
pass
self.data = data
self._timer: Timer = Timer(timeout, self._destruct)
self._timer.start()
def _destruct(self) -> None:
self.data = ''
def destroy(self) -> None:
self._destruct()
self._timer.cancel()
class Browser: # Inspired from https://github.com/NoahCardoza/CaptchaHarvester
def __init__(self, browser: Union[BrowserEnum, str] = None) -> None:
if isinstance(browser, (BrowserEnum, type(None))):
try:
browser = self.get_browser(browser)
except Exception:
raise RuntimeError('Could not find browser. Please pass browser path manually.')
if browser is None:
raise RuntimeError('Could not find browser. Please pass browser path manually.')
self.browser: str = browser
self.proc: subprocess.Popen = MISSING
def get_mac_browser(pkg: str, binary: str) -> Optional[os.PathLike]:
import plistlib as plist
pfile: str = f'{os.environ["HOME"]}/Library/Preferences/{pkg}.plist'
if os.path.exists(pfile):
with open(pfile, 'rb') as f:
binary_path: Optional[str] = plist.load(f).get('LastRunAppBundlePath')
if binary_path is not None:
return os.path.join(binary_path, 'Contents', 'MacOS', binary)
def get_windows_browser(browser: str) -> Optional[str]:
import winreg as reg
reg_path: str = f'SOFTWARE\\Microsoft\\Windows\\CurrentVersion\\App Paths\\{browser}.exe'
exe_path: Optional[str] = None
for install_type in reg.HKEY_CURRENT_USER, reg.HKEY_LOCAL_MACHINE:
try:
reg_key: str = reg.OpenKey(install_type, reg_path, 0, reg.KEY_READ)
exe_path: Optional[str] = reg.QueryValue(reg_key, None)
reg_key.Close()
if not os.path.isfile(exe_path):
continue
except reg.WindowsError:
pass
else:
break
return exe_path
def get_linux_browser(browser: str) -> Optional[str]:
from shutil import which as exists
possibilities: List[str] = [browser + channel for channel in ('', '-beta', '-dev', '-developer', '-canary')]
for browser in possibilities:
if exists(browser):
return browser
registry: Dict[str, Dict[str, functools.partial]] = {
'Windows': {
'chrome': functools.partial(get_windows_browser, 'chrome'),
'chromium': functools.partial(get_windows_browser, 'chromium'),
'microsoft-edge': functools.partial(get_windows_browser, 'msedge'),
'opera': functools.partial(get_windows_browser, 'opera'),
},
'Darwin': {
'chrome': functools.partial(get_mac_browser, 'com.google.Chrome', 'Google Chrome'),
'chromium': functools.partial(get_mac_browser, 'org.chromium.Chromium', 'Chromium'),
'microsoft-edge': functools.partial(get_mac_browser, 'com.microsoft.Edge', 'Microsoft Edge'),
'opera': functools.partial(get_mac_browser, 'com.operasoftware.Opera', 'Opera'),
},
'Linux': {
'chrome': functools.partial(get_linux_browser, 'chrome'),
'chromium': functools.partial(get_linux_browser, 'chromium'),
'microsoft-edge': functools.partial(get_linux_browser, 'microsoft-edge'),
'opera': functools.partial(get_linux_browser, 'opera'),
}
}
def get_browser(self, browser: Optional[BrowserEnum] = None) -> Optional[str]:
if browser is not None:
return self.registry.get(platform.system())[browser.value]()
for browser in self.registry.get(platform.system()).values():
browser = browser()
if browser is not None:
return browser
@property
def running(self) -> bool:
try:
return self.proc.poll() is None
except:
return False
def launch(
self,
domain: Optional[str] = None,
server: Optional[Tuple[Optional[str], Optional[int]]] = (None, None),
width: int = 400,
height: int = 500,
browser_args: List[str] = [],
extensions: Optional[str] = None
) -> None:
browser_command: List[str] = [self.browser, *browser_args]
if extensions:
browser_command.append(f'--load-extension={extensions}')
browser_command.extend((
'--disable-default-apps',
'--no-default-browser-check',
'--no-check-default-browser',
'--no-first-run',
'--ignore-certificate-errors',
'--disable-background-networking',
'--disable-component-update',
'--disable-domain-reliability',
f'--user-data-dir={os.path.join(tempfile.TemporaryDirectory().name, "Profiles")}',
f'--host-rules=MAP {domain} {server[0]}:{server[1]}',
f'--window-size={width},{height}',
f'--app=https://{domain}'
))
self.proc = subprocess.Popen(browser_command, stdout=-1, stderr=-1)
def stop(self) -> None:
try:
self.proc.terminate()
except:
pass
async def _get_client_version(session):
try:
request = await session.get('https://discord.com/api/downloads/distributions/app/installers/latest?arch=x86&channel=stable&platform=win', headers={'Accept-Encoding': 'gzip, deflate'}, timeout=7)
url = request.headers['location']
return url.split('/')[-2]
except (asyncio.TimeoutError, RuntimeError):
_log.warning('Could not fetch client version.')
return '1.0.9003'
async def _get_build_number(session: ClientSession) -> int: # Thank you Discord-S.C.U.M
"""Fetches client build number"""
try:
login_page_request = await session.get('https://discord.com/login', headers={'Accept-Encoding': 'gzip, deflate'}, timeout=7)
login_page = await login_page_request.text()
build_url = 'https://discord.com/assets/' + re.compile(r'assets/+([a-z0-9]+)\.js').findall(login_page)[-2] + '.js'
build_request = await session.get(build_url, headers={'Accept-Encoding': 'gzip, deflate'}, timeout=7)
build_file = await build_request.text()
build_index = build_file.find('buildNumber') + 14
return int(build_file[build_index:build_index + 6])
except asyncio.TimeoutError:
_log.warning('Could not fetch client build number.')
return 103016
async def _get_user_agent(session: ClientSession) -> str:
"""Fetches the latest Windows 10/Chrome user-agent."""
try:
request = await session.request('GET', 'https://jnrbsn.github.io/user-agents/user-agents.json', timeout=7)
response = json.loads(await request.text())
return response[0]
except asyncio.TimeoutError:
_log.warning('Could not fetch user-agent.')
return 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.77 Safari/537.36'
async def _get_browser_version(session: ClientSession) -> str:
"""Fetches the latest Windows 10/Chrome version."""
try:
request = await session.request('GET', 'https://omahaproxy.appspot.com/all.json', timeout=7)
response = json.loads(await request.text())
if response[0]['versions'][4]['channel'] == 'stable':
return response[0]['versions'][4]['version']
raise RuntimeError
except (asyncio.TimeoutError, RuntimeError):
_log.warning('Could not fetch browser version.')
return '91.0.4472.77'

Loading…
Cancel
Save