From 0489ad0f46c6cd99fad7006c4cf5158b80d4a484 Mon Sep 17 00:00:00 2001 From: dolfies Date: Mon, 8 Nov 2021 22:31:40 -0500 Subject: [PATCH] Migrate utils.py --- discord/utils.py | 245 ++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 241 insertions(+), 4 deletions(-) diff --git a/discord/utils.py b/discord/utils.py index 4360b77a1..88f51a404 100644 --- a/discord/utils.py +++ b/discord/utils.py @@ -56,11 +56,18 @@ import functools from inspect import isawaitable as _isawaitable, signature as _signature from operator import attrgetter import json +import logging +import os +import platform import re +import subprocess import sys +import tempfile +from threading import Timer import types import warnings +from .enums import BrowserEnum from .errors import InvalidArgument try: @@ -88,6 +95,8 @@ __all__ = ( DISCORD_EPOCH = 1420070400000 +_log = logging.getLogger(__name__) + class _MissingSentinel: def __eq__(self, other): @@ -119,6 +128,7 @@ class _cached_property: if TYPE_CHECKING: + from aiohttp import ClientSession from functools import cached_property as cached_property from typing_extensions import ParamSpec @@ -273,13 +283,13 @@ def oauth_url( scopes: Iterable[str] = MISSING, disable_guild_select: bool = False, ) -> str: - """A helper function that returns the OAuth2 URL for inviting the bot + """A helper function that returns the OAuth2 URL for inviting a bot into guilds. Parameters ----------- client_id: Union[:class:`int`, :class:`str`] - The client ID for your bot. + The client ID for the bot. permissions: :class:`~discord.Permissions` The permissions you're requesting. If not given then you won't be requesting any permissions. @@ -288,7 +298,7 @@ def oauth_url( redirect_uri: :class:`str` An optional valid redirect URI. scopes: Iterable[:class:`str`] - An optional valid list of scopes. Defaults to ``('bot',)``. + An optional valid list of scopes. Defaults to ``('bot', 'application.commands')``. .. versionadded:: 1.7 disable_guild_select: :class:`bool` @@ -302,7 +312,7 @@ def oauth_url( The OAuth2 URL for inviting the bot into guilds. """ url = f'https://discord.com/oauth2/authorize?client_id={client_id}' - url += '&scope=' + '+'.join(scopes or ('bot',)) + url += '&scope=' + '+'.join(scopes or ('bot', 'application.commands')) if permissions is not MISSING: url += f'&permissions={permissions.value}' if guild is not MISSING: @@ -1017,3 +1027,230 @@ def format_dt(dt: datetime.datetime, /, style: Optional[TimestampStyle] = None) if style is None: return f'' return f'' + + +class ExpiringQueue(asyncio.Queue): # Inspired from https://github.com/NoahCardoza/CaptchaHarvester + def __init__(self, timeout: int, maxsize: int = 0) -> None: + super().__init__(maxsize) + self.timeout = timeout + self.timers: asyncio.Queue = asyncio.Queue() + + async def put(self, item: str) -> None: + thread: Timer = Timer(self.timeout, self.expire) + thread.start() + await self.timers.put(thread) + await super().put(item) + + async def get(self, block: bool = True) -> str: + if block: + thread = await self.timers.get() + else: + thread = self.timers.get_nowait() + thread.cancel() + if block: + return await super().get() + else: + return self.get_nowait() + + def expire(self) -> None: + try: + self._queue.popleft() + except: + pass + + def to_list(self) -> List[str]: + return list(self._queue) + + +class ExpiringString(collections.UserString): + def __init__(self, data: str, timeout: int) -> None: + super().__init__(data) + self._timer: Timer = Timer(timeout, self._destruct) + self._timer.start() + + def _update(self, data: str, timeout: int) -> None: + try: + self._timer.cancel() + except: + pass + self.data = data + self._timer: Timer = Timer(timeout, self._destruct) + self._timer.start() + + def _destruct(self) -> None: + self.data = '' + + def destroy(self) -> None: + self._destruct() + self._timer.cancel() + + +class Browser: # Inspired from https://github.com/NoahCardoza/CaptchaHarvester + def __init__(self, browser: Union[BrowserEnum, str] = None) -> None: + if isinstance(browser, (BrowserEnum, type(None))): + try: + browser = self.get_browser(browser) + except Exception: + raise RuntimeError('Could not find browser. Please pass browser path manually.') + + if browser is None: + raise RuntimeError('Could not find browser. Please pass browser path manually.') + + self.browser: str = browser + self.proc: subprocess.Popen = MISSING + + def get_mac_browser(pkg: str, binary: str) -> Optional[os.PathLike]: + import plistlib as plist + pfile: str = f'{os.environ["HOME"]}/Library/Preferences/{pkg}.plist' + if os.path.exists(pfile): + with open(pfile, 'rb') as f: + binary_path: Optional[str] = plist.load(f).get('LastRunAppBundlePath') + if binary_path is not None: + return os.path.join(binary_path, 'Contents', 'MacOS', binary) + + def get_windows_browser(browser: str) -> Optional[str]: + import winreg as reg + reg_path: str = f'SOFTWARE\\Microsoft\\Windows\\CurrentVersion\\App Paths\\{browser}.exe' + exe_path: Optional[str] = None + for install_type in reg.HKEY_CURRENT_USER, reg.HKEY_LOCAL_MACHINE: + try: + reg_key: str = reg.OpenKey(install_type, reg_path, 0, reg.KEY_READ) + exe_path: Optional[str] = reg.QueryValue(reg_key, None) + reg_key.Close() + if not os.path.isfile(exe_path): + continue + except reg.WindowsError: + pass + else: + break + return exe_path + + def get_linux_browser(browser: str) -> Optional[str]: + from shutil import which as exists + possibilities: List[str] = [browser + channel for channel in ('', '-beta', '-dev', '-developer', '-canary')] + for browser in possibilities: + if exists(browser): + return browser + + registry: Dict[str, Dict[str, functools.partial]] = { + 'Windows': { + 'chrome': functools.partial(get_windows_browser, 'chrome'), + 'chromium': functools.partial(get_windows_browser, 'chromium'), + 'microsoft-edge': functools.partial(get_windows_browser, 'msedge'), + 'opera': functools.partial(get_windows_browser, 'opera'), + }, + 'Darwin': { + 'chrome': functools.partial(get_mac_browser, 'com.google.Chrome', 'Google Chrome'), + 'chromium': functools.partial(get_mac_browser, 'org.chromium.Chromium', 'Chromium'), + 'microsoft-edge': functools.partial(get_mac_browser, 'com.microsoft.Edge', 'Microsoft Edge'), + 'opera': functools.partial(get_mac_browser, 'com.operasoftware.Opera', 'Opera'), + }, + 'Linux': { + 'chrome': functools.partial(get_linux_browser, 'chrome'), + 'chromium': functools.partial(get_linux_browser, 'chromium'), + 'microsoft-edge': functools.partial(get_linux_browser, 'microsoft-edge'), + 'opera': functools.partial(get_linux_browser, 'opera'), + } + } + + def get_browser(self, browser: Optional[BrowserEnum] = None) -> Optional[str]: + if browser is not None: + return self.registry.get(platform.system())[browser.value]() + + for browser in self.registry.get(platform.system()).values(): + browser = browser() + if browser is not None: + return browser + + @property + def running(self) -> bool: + try: + return self.proc.poll() is None + except: + return False + + def launch( + self, + domain: Optional[str] = None, + server: Optional[Tuple[Optional[str], Optional[int]]] = (None, None), + width: int = 400, + height: int = 500, + browser_args: List[str] = [], + extensions: Optional[str] = None + ) -> None: + browser_command: List[str] = [self.browser, *browser_args] + + if extensions: + browser_command.append(f'--load-extension={extensions}') + + browser_command.extend(( + '--disable-default-apps', + '--no-default-browser-check', + '--no-check-default-browser', + '--no-first-run', + '--ignore-certificate-errors', + '--disable-background-networking', + '--disable-component-update', + '--disable-domain-reliability', + f'--user-data-dir={os.path.join(tempfile.TemporaryDirectory().name, "Profiles")}', + f'--host-rules=MAP {domain} {server[0]}:{server[1]}', + f'--window-size={width},{height}', + f'--app=https://{domain}' + )) + + self.proc = subprocess.Popen(browser_command, stdout=-1, stderr=-1) + + def stop(self) -> None: + try: + self.proc.terminate() + except: + pass + + +async def _get_client_version(session): + try: + request = await session.get('https://discord.com/api/downloads/distributions/app/installers/latest?arch=x86&channel=stable&platform=win', headers={'Accept-Encoding': 'gzip, deflate'}, timeout=7) + url = request.headers['location'] + return url.split('/')[-2] + except (asyncio.TimeoutError, RuntimeError): + _log.warning('Could not fetch client version.') + return '1.0.9003' + + +async def _get_build_number(session: ClientSession) -> int: # Thank you Discord-S.C.U.M + """Fetches client build number""" + try: + login_page_request = await session.get('https://discord.com/login', headers={'Accept-Encoding': 'gzip, deflate'}, timeout=7) + login_page = await login_page_request.text() + build_url = 'https://discord.com/assets/' + re.compile(r'assets/+([a-z0-9]+)\.js').findall(login_page)[-2] + '.js' + build_request = await session.get(build_url, headers={'Accept-Encoding': 'gzip, deflate'}, timeout=7) + build_file = await build_request.text() + build_index = build_file.find('buildNumber') + 14 + return int(build_file[build_index:build_index + 6]) + except asyncio.TimeoutError: + _log.warning('Could not fetch client build number.') + return 103016 + + +async def _get_user_agent(session: ClientSession) -> str: + """Fetches the latest Windows 10/Chrome user-agent.""" + try: + request = await session.request('GET', 'https://jnrbsn.github.io/user-agents/user-agents.json', timeout=7) + response = json.loads(await request.text()) + return response[0] + except asyncio.TimeoutError: + _log.warning('Could not fetch user-agent.') + return 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.77 Safari/537.36' + + +async def _get_browser_version(session: ClientSession) -> str: + """Fetches the latest Windows 10/Chrome version.""" + try: + request = await session.request('GET', 'https://omahaproxy.appspot.com/all.json', timeout=7) + response = json.loads(await request.text()) + if response[0]['versions'][4]['channel'] == 'stable': + return response[0]['versions'][4]['version'] + raise RuntimeError + except (asyncio.TimeoutError, RuntimeError): + _log.warning('Could not fetch browser version.') + return '91.0.4472.77'