You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

549 lines
18 KiB

"""
The MIT License (MIT)
Copyright (c) 2015-present Rapptz
Permission is hereby granted, free of charge, to any person obtaining a
copy of this software and associated documentation files (the "Software"),
to deal in the Software without restriction, including without limitation
the rights to use, copy, modify, merge, publish, distribute, sublicense,
and/or sell copies of the Software, and to permit persons to whom the
Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.
"""
from __future__ import annotations
from datetime import datetime
from typing import TYPE_CHECKING, AsyncIterator, Dict, Optional, Union
from .asset import Asset
from .enums import EventStatus, EntityType, PrivacyLevel, try_enum
from .mixins import Hashable
from .object import Object, OLDEST_OBJECT
from .utils import parse_time, _get_as_snowflake, _bytes_to_base64_data, MISSING
if TYPE_CHECKING:
from .types.scheduled_event import (
GuildScheduledEvent as GuildScheduledEventPayload,
GuildScheduledEventWithUserCount as GuildScheduledEventWithUserCountPayload,
EntityMetadata,
)
from .abc import Snowflake
from .channel import VoiceChannel, StageChannel
from .state import ConnectionState
from .user import User
GuildScheduledEventPayload = Union[GuildScheduledEventPayload, GuildScheduledEventWithUserCountPayload]
# fmt: off
__all__ = (
"ScheduledEvent",
)
# fmt: on
class ScheduledEvent(Hashable):
"""Represents a scheduled event in a guild.
.. versionadded:: 2.0
.. container:: operations
.. describe:: x == y
Checks if two scheduled events are equal.
.. describe:: x != y
Checks if two scheduled events are not equal.
.. describe:: hash(x)
Returns the scheduled event's hash.
Attributes
----------
id: :class:`int`
The scheduled event's ID.
name: :class:`str`
The name of the scheduled event.
description: :class:`str`
The description of the scheduled event.
entity_type: :class:`EntityType`
The type of entity this event is for.
entity_id: :class:`int`
The ID of the entity this event is for.
start_time: :class:`datetime.datetime`
The time that the scheduled event will start in UTC.
end_time: :class:`datetime.datetime`
The time that the scheduled event will end in UTC.
privacy_level: :class:`PrivacyLevel`
The privacy level of the scheduled event.
status: :class:`EventStatus`
The status of the scheduled event.
user_count: :class:`int`
The number of users subscribed to the scheduled event.
creator: Optional[:class:`User`]
The user that created the scheduled event.
location: Optional[:class:`str`]
The location of the scheduled event.
"""
__slots__ = (
'_state',
'_users',
'id',
'guild_id',
'name',
'description',
'entity_type',
'entity_id',
'start_time',
'end_time',
'privacy_level',
'status',
'_cover_image',
'user_count',
'creator',
'channel_id',
'location',
)
def __init__(self, *, state: ConnectionState, data: GuildScheduledEventPayload) -> None:
self._state = state
self._users: Dict[int, User] = {}
self._update(data)
def _update(self, data: GuildScheduledEventPayload) -> None:
self.id: int = int(data['id'])
self.guild_id: int = int(data['guild_id'])
self.name: str = data['name']
self.description: str = data.get('description', '')
self.entity_type = try_enum(EntityType, data['entity_type'])
self.entity_id: int = int(data['id'])
self.start_time: datetime = parse_time(data['scheduled_start_time'])
self.privacy_level: PrivacyLevel = try_enum(PrivacyLevel, data['status'])
self.status: EventStatus = try_enum(EventStatus, data['status'])
self._cover_image: Optional[str] = data['image']
self.user_count: int = data.get('user_count', 0)
creator = data.get('creator')
self.creator: Optional[User] = self._state.store_user(creator) if creator else None
self.end_time: Optional[datetime] = parse_time(data.get('scheduled_end_time'))
self.channel_id: Optional[int] = _get_as_snowflake(data, 'channel_id')
metadata = data.get('metadata')
if metadata:
self._unroll_metadata(metadata)
def _unroll_metadata(self, data: EntityMetadata):
self.location: Optional[str] = data.get('location')
@classmethod
def from_creation(cls, *, state: ConnectionState, data: GuildScheduledEventPayload):
creator_id = data.get('creator_id')
self = cls(state=state, data=data)
if creator_id:
self.creator = self._state.get_user(int(creator_id))
def __repr__(self) -> str:
return f'<GuildScheduledEvent id={self.id} name={self.name!r} guild_id={self.guild_id!r} creator={self.creator!r}>'
@property
def cover_image(self) -> Optional[Asset]:
"""Optional[:class:`Asset`]: The scheduled event's cover image."""
if self._cover_image is None:
return None
return Asset._from_scheduled_event_cover_image(self._state, self.id, self._cover_image)
@property
def channel(self) -> Optional[Union[VoiceChannel, StageChannel]]:
"""Optional[Union[:class:`VoiceChannel`, :class:`StageChannel`]]: The channel this scheduled event is in."""
return self.guild.get_channel(self.channel_id) # type: ignore
async def start(self, *, reason: Optional[str] = None) -> ScheduledEvent:
"""|coro|
Starts the scheduled event.
Shorthand for:
.. code-block:: python3
await event.edit(status=EventStatus.active)
Parameters
-----------
reason: Optional[:class:`str`]
The reason for starting the scheduled event.
Raises
------
ValueError
The scheduled event has already started or has ended.
Forbidden
You do not have the proper permissions to start the scheduled event.
HTTPException
The scheduled event could not be started.
Returns
-------
:class:`ScheduledEvent`
The scheduled event that was started.
"""
if self.status is not EventStatus.scheduled:
raise ValueError('This scheduled event is already running.')
return await self.edit(status=EventStatus.active, reason=reason)
async def end(self, *, reason: Optional[str] = None) -> ScheduledEvent:
"""|coro|
Ends the scheduled event.
Shorthand for:
.. code-block:: python3
await event.edit(status=EventStatus.completed)
Parameters
-----------
reason: Optional[:class:`str`]
The reason for ending the scheduled event.
Raises
------
ValueError
The scheduled event is not active or has already ended.
Forbidden
You do not have the proper permissions to end the scheduled event.
HTTPException
The scheduled event could not be ended.
Returns
-------
:class:`ScheduledEvent`
The scheduled event that was ended.
"""
if self.status is not EventStatus.active:
raise ValueError('This scheduled event is not active.')
return await self.edit(status=EventStatus.ended, reason=reason)
async def cancel(self, *, reason: Optional[str] = None) -> ScheduledEvent:
"""|coro|
Cancels the scheduled event.
Shorthand for:
.. code-block:: python3
await event.edit(status=EventStatus.cancelled)
Parameters
-----------
reason: Optional[:class:`str`]
The reason for cancelling the scheduled event.
Raises
------
ValueError
The scheduled event is already running.
Forbidden
You do not have the proper permissions to cancel the scheduled event.
HTTPException
The scheduled event could not be cancelled.
Returns
-------
:class:`ScheduledEvent`
The scheduled event that was cancelled.
"""
if self.status is not EventStatus.scheduled:
raise ValueError('This scheduled event is already running.')
return await self.edit(status=EventStatus.cancelled, reason=reason)
async def edit(
self,
*,
name: str = MISSING,
description: str = MISSING,
channel: Optional[Snowflake] = MISSING,
start_time: datetime = MISSING,
end_time: datetime = MISSING,
privacy_level: PrivacyLevel = MISSING,
entity_type: EntityType = MISSING,
status: EventStatus = MISSING,
image: bytes = MISSING,
location: str = MISSING,
reason: Optional[str] = None,
) -> ScheduledEvent:
r"""|coro|
Edits the scheduled event.
Requires :attr:`~Permissions.manage_events` permissions.
Parameters
-----------
name: :class:`str`
The name of the scheduled event.
description: :class:`str`
The description of the scheduled event.
channel: Optional[:class:`~discord.abc.Snowflake`]
The channel to put the scheduled event in.
Required if the entity type is either :attr:`EntityType.voice` or
:attr:`EntityType.stage_instance`.
start_time: :class:`datetime.datetime`
The time that the scheduled event will start. This must be a timezone-aware
datetime object. Consider using :func:`utils.utcnow`.
end_time: :class:`datetime.datetime`
The time that the scheduled event will end. This must be a timezone-aware
datetime object. Consider using :func:`utils.utcnow`.
Required if the entity type is :attr:`EntityType.external`.
privacy_level: :class:`PrivacyLevel`
The privacy level of the scheduled event.
entity_type: :class:`EntityType`
The new entity type.
status: :class:`EventStatus`
The new status of the scheduled event.
image: :class:`bytes`
The new image of the scheduled event.
location: :class:`str`
The new location of the scheduled event.
Required if the entity type is :attr:`EntityType.external`.
reason: Optional[:class:`str`]
The reason for editing the scheduled event. Shows up on the audit log.
Raises
-------
TypeError
`image` was not a :term:`py:bytes-like object`, or ``privacy_level``
was not a :class:`PrivacyLevel`, or ``entity_type`` was not an
:class:`EntityType`, ``status`` was not an :class:`EventStatus`, or
an argument was provided that was incompatible with the scheduled event's
entity type.
ValueError
``start_time`` or ``end_time`` was not a timezone-aware datetime object.
Forbidden
You do not have permissions to edit the scheduled event.
HTTPException
Editing the scheduled event failed.
Returns
--------
:class:`ScheduledEvent`
The edited scheduled event.
"""
payload = {}
metadata = {}
if name is not MISSING:
payload['name'] = name
if start_time is not MISSING:
if start_time.tzinfo is None:
raise ValueError(
'start_time must be an aware datetime. Consider using discord.utils.utcnow() or datetime.datetime.now().astimezone() for local time.'
)
payload['scheduled_start_time'] = start_time.isoformat()
if description is not MISSING:
payload['description'] = description
if privacy_level is not MISSING:
if not isinstance(privacy_level, PrivacyLevel):
raise TypeError('privacy_level must be of type PrivacyLevel.')
payload['privacy_level'] = privacy_level.value
if status is not MISSING:
if not isinstance(status, EventStatus):
raise TypeError('status must be of type EventStatus')
payload['status'] = status.value
if image is not MISSING:
image_as_str: str = _bytes_to_base64_data(image)
payload['image'] = image_as_str
if entity_type is not MISSING:
if not isinstance(entity_type, EntityType):
raise TypeError('entity_type must be of type EntityType')
payload['entity_type'] = entity_type.value
_entity_type = entity_type or self.entity_type
if _entity_type in (EntityType.stage_instance, EntityType.voice):
if channel is MISSING or channel is None:
raise TypeError('channel must be set when entity_type is voice or stage_instance')
payload['channel_id'] = channel.id
if location is not MISSING:
raise TypeError('location cannot be set when entity_type is voice or stage_instance')
else:
if channel is not MISSING:
raise TypeError('channel cannot be set when entity_type is external')
if location is MISSING or location is None:
raise TypeError('location must be set when entity_type is external')
metadata['location'] = location
if end_time is MISSING:
raise TypeError('end_time must be set when entity_type is external')
if end_time.tzinfo is None:
raise ValueError(
'end_time must be an aware datetime. Consider using discord.utils.utcnow() or datetime.datetime.now().astimezone() for local time.'
)
payload['scheduled_end_time'] = end_time.isoformat()
if metadata:
payload['entity_metadata'] = metadata
data = await self._state.http.edit_scheduled_event(self.guild_id, self.id, **payload, reason=reason)
s = ScheduledEvent(state=self._state, data=data)
s._users = self._users
return s
async def delete(self, *, reason: Optional[str] = None) -> None:
"""|coro|
Deletes the scheduled event.
Requires :attr:`~Permissions.manage_events` permissions.
Parameters
-----------
reason: Optional[:class:`str`]
The reason for deleting the scheduled event. Shows up on the audit log.
Raises
------
Forbidden
You do not have permissions to delete the scheduled event.
HTTPException
Deleting the scheduled event failed.
"""
await self._state.http.delete_scheduled_event(self.guild_id, self.id, reason=reason)
async def users(
self,
*,
limit: Optional[int] = None,
before: Optional[Snowflake] = None,
after: Optional[Snowflake] = None,
oldest_first: bool = MISSING,
) -> AsyncIterator[User]:
"""|coro|
Retrieves all :class:`User` that are in this thread.
This requires :attr:`Intents.members` to get information about members
other than yourself.
Raises
-------
HTTPException
Retrieving the members failed.
Returns
--------
List[:class:`User`]
All thread members in the thread.
"""
async def _before_strategy(retrieve, before, limit):
before_id = before.id if before else None
users = await self._state.http.get_scheduled_event_users(
self.guild_id, self.id, limit=retrieve, with_member=False, before=before_id
)
if users:
if limit is not None:
limit -= len(users)
before = Object(id=users[-1]['user']['id'])
return users, before, limit
async def _after_strategy(retrieve, after, limit):
after_id = after.id if after else None
users = await self._state.http.get_scheduled_event_users(
self.guild_id, self.id, limit=retrieve, with_member=False, after=after_id
)
if users:
if limit is not None:
limit -= len(users)
after = Object(id=users[0]['user']['id'])
return users, after, limit
if limit is None:
limit = self.user_count or None
if oldest_first is MISSING:
reverse = after is not None
else:
reverse = oldest_first
predicate = None
if reverse:
strategy, state = _after_strategy, after
if before:
predicate = lambda u: u['user']['id'] < before.id
else:
strategy, state = _before_strategy, before
if after and after != OLDEST_OBJECT:
predicate = lambda u: u['user']['id'] > after.id
while True:
retrieve = min(100 if limit is None else limit, 100)
if retrieve < 1:
return
data, state, limit = await strategy(retrieve, state, limit)
if len(data) < 100:
limit = 0
if reverse:
data = reversed(data)
if predicate:
data = filter(predicate, data)
users = (self._state.store_user(raw_user['user']) for raw_user in data)
for user in users:
yield user
def _add_user(self, user: User) -> None:
self._users[user.id] = user
def _pop_user(self, user_id: int) -> None:
self._users.pop(user_id)