from __future__ import annotations
import asyncio
import copy
import inspect
import logging
import sys
import traceback
import typing as t
import hikari
from miru.exceptions import BootstrapFailureError, HandlerFullError
from .abc.item import DecoratedItem, ViewItem
from .abc.item_handler import ItemHandler
from .button import Button
from .context.view import ViewContext
from .select import ChannelSelect, MentionableSelect, RoleSelect, TextSelect, UserSelect
if t.TYPE_CHECKING:
import datetime
import typing_extensions as te
logger = logging.getLogger(__name__)
ViewContextT = t.TypeVar("ViewContextT", bound=ViewContext)
ViewT = t.TypeVar("ViewT", bound="View")
__all__ = ("View", "get_view")
COMPONENT_VIEW_ITEM_MAPPING: t.Mapping[hikari.ComponentType, t.Type[ViewItem]] = {
hikari.ComponentType.BUTTON: Button,
hikari.ComponentType.TEXT_SELECT_MENU: TextSelect,
hikari.ComponentType.CHANNEL_SELECT_MENU: ChannelSelect,
hikari.ComponentType.ROLE_SELECT_MENU: RoleSelect,
hikari.ComponentType.USER_SELECT_MENU: UserSelect,
hikari.ComponentType.MENTIONABLE_SELECT_MENU: MentionableSelect,
}
"""A mapping of all message component types to their respective item classes."""
[docs]
class View(ItemHandler[hikari.impl.MessageActionRowBuilder, ViewContext, ViewItem]):
"""Represents a set of Discord UI components attached to a message.
Parameters
----------
timeout : Optional[Union[float, int, datetime.timedelta]], optional
The duration after which the view times out, in seconds, by default 120.0
autodefer : bool, optional
If enabled, interactions will be automatically deferred if not responded to within 2 seconds, by default True
Raises
------
ValueError
Raised if a view has more than 25 components attached.
RuntimeError
Raised if miru.install() was never called before instantiation.
"""
_view_children: t.ClassVar[
t.MutableSequence[DecoratedItem[View, ViewItem, ViewContext]]
] = [] # Decorated callbacks that need to be turned into items
def __init_subclass__(cls) -> None:
"""Get decorated callbacks."""
children: t.MutableSequence[DecoratedItem[View, ViewItem, ViewContext]] = []
for base_cls in reversed(cls.mro()):
for value in base_cls.__dict__.values():
if isinstance(value, DecoratedItem):
children.append(value)
if len(children) > 25:
raise HandlerFullError("View cannot have more than 25 components attached.")
cls._view_children = children
def __init__(
self, *, timeout: t.Optional[t.Union[float, int, datetime.timedelta]] = 120.0, autodefer: bool = True
) -> None:
super().__init__(timeout=timeout)
self._autodefer: bool = autodefer
self._message: t.Optional[hikari.Message] = None
self._message_id: t.Optional[hikari.Snowflake] = None
self._input_event: asyncio.Event = asyncio.Event()
for decorated_item in self._view_children:
# Must deepcopy, otherwise multiple views will have the same item reference
decorated_item = copy.deepcopy(decorated_item)
item = decorated_item.build(self)
self.add_item(item)
setattr(self, decorated_item.name, item)
@property
def is_persistent(self) -> bool:
"""Determines if this view is persistent or not."""
return self.timeout is None and all(
isinstance(item, ViewItem) and item._is_persistent for item in self.children
)
@property
def message(self) -> t.Optional[hikari.Message]:
"""The message this view is bound to. Will be None if the view is not bound, or if a message_id was passed to view.start()."""
return self._message
@property
def message_id(self) -> t.Optional[hikari.Snowflake]:
"""The message ID this view is bound to. Will be None if the view is an unbound persistent view."""
return self._message_id
@property
def is_bound(self) -> bool:
"""Determines if the view is bound to a message or not. If this is False, message edits will not be supported."""
return self._message_id is not None
@property
def autodefer(self) -> bool:
"""A boolean indicating if the received interaction should automatically be deferred if not responded to or not."""
return self._autodefer
@property
def _builder(self) -> t.Type[hikari.impl.MessageActionRowBuilder]:
return hikari.impl.MessageActionRowBuilder
[docs]
@classmethod
def from_message(
cls, message: hikari.Message, *, timeout: t.Optional[float] = 120, autodefer: bool = True
) -> te.Self:
"""Create a new view from the components included in the passed message.
Returns an empty view if the message has no components attached.
Parameters
----------
message : hikari.Message
The message to read components from
timeout : Optional[float], optional
The timeout of the created view, by default 120
autodefer : bool, optional
If enabled, interactions will be automatically deferred if not responded to within 2 seconds, by default True
Returns
-------
View
The view that represents the components attached to this message.
.. warning::
This function constructs a completely new view based on the information available in the message object.
Any custom behaviour (such as callbacks) will not be re-created,
if you want to access an already running view that is bound to a message, use :obj:`miru.view.get_view` instead.
"""
view = cls(timeout=timeout, autodefer=autodefer)
if not message.components:
return view
for row, action_row in enumerate(message.components):
assert isinstance(action_row, hikari.ActionRowComponent)
for component in action_row.components:
if not isinstance(component.type, hikari.ComponentType):
continue # Unrecognized component types are ignored
comp_cls = COMPONENT_VIEW_ITEM_MAPPING[component.type]
view.add_item(comp_cls._from_component(component, row))
return view
[docs]
def add_item(self, item: ViewItem) -> te.Self:
"""Adds a new item to the view.
Parameters
----------
item : ViewItem
The item to be added.
Raises
------
ValueError
A view already has 25 components attached.
TypeError
Parameter item is not an instance of ViewItem.
RuntimeError
The item is already attached to this view.
RuntimeError
The item is already attached to another view.
Returns
-------
View
The view the item was added to.
"""
if not isinstance(item, ViewItem):
raise TypeError(f"Expected type ViewItem for parameter item, not {type(item).__name__}.")
return super().add_item(item)
[docs]
async def view_check(self, context: ViewContextT) -> bool:
"""Called before any callback in the view is called. Must evaluate to a truthy value to pass.
Override for custom check logic.
Parameters
----------
context : ViewContextT
The context for this check.
Returns
-------
bool
A boolean indicating if the check passed or not.
"""
return True
[docs]
async def on_error(
self, error: Exception, item: t.Optional[ViewItem] = None, context: t.Optional[ViewContextT] = None
) -> None:
"""Called when an error occurs in a callback function or the built-in timeout function.
Override for custom error-handling logic.
Parameters
----------
error : Exception
The exception encountered.
item : Optional[MessageItem[ViewT]], optional
The item this exception originates from, if any.
context : Optional[Context], optional
The context associated with this exception, if any.
"""
if item:
print(f"Ignoring exception in view {self} for item {item}:", file=sys.stderr)
else:
print(f"Ignoring exception in view {self}:", file=sys.stderr)
traceback.print_exception(type(error), error, error.__traceback__, file=sys.stderr)
[docs]
def get_context(
self, interaction: hikari.ComponentInteraction, *, cls: t.Type[ViewContext] = ViewContext
) -> ViewContext:
"""Get the context for this view. Override this function to provide a custom context object.
Parameters
----------
interaction : hikari.ComponentInteraction
The interaction to construct the context from.
cls : Type[ViewContext], optional
The class to use for the context, by default ViewContext.
Returns
-------
ViewContext
The context for this interaction.
"""
return cls(self, interaction)
async def _handle_callback(self, item: ViewItem, context: ViewContextT) -> None:
"""Handle the callback of a view item. Separate task in case the view is stopped in the callback."""
try:
if self._message_id == context.message.id:
self._message = context.message
self._input_event.set()
self._input_event.clear()
await item._refresh_state(context)
if self.autodefer:
context._start_autodefer()
await item.callback(context)
except Exception as error:
await self.on_error(error, item, context)
async def _process_interactions(self, event: hikari.InteractionCreateEvent) -> None:
"""Process incoming interactions."""
if not isinstance(event.interaction, hikari.ComponentInteraction):
return
items = [item for item in self.children if item.custom_id == event.interaction.custom_id]
if items:
self._reset_timeout()
context = self.get_context(event.interaction)
self._last_context = context
passed = await self.view_check(context)
if not passed:
return
for item in items:
assert isinstance(item, ViewItem)
# Create task here to ensure autodefer works even if callback stops view
self._create_task(self._handle_callback(item, context))
[docs]
def stop(self) -> None:
self._input_event.set()
return super().stop()
[docs]
async def start(
self,
message: t.Optional[
t.Union[
hikari.SnowflakeishOr[hikari.PartialMessage], t.Awaitable[hikari.SnowflakeishOr[hikari.PartialMessage]]
]
] = None,
) -> None:
"""Start up the view and begin listening for interactions.
Parameters
----------
message : Union[hikari.Message, Awaitable[hikari.Message]]
If provided, the view will be bound to this message, and if the
message is edited with a new view, this view will be stopped.
Unbound views do not support message editing with additional views.
Raises
------
TypeError
Parameter 'message' cannot be resolved to an instance of 'hikari.Message'.
BootstrapFailureError
miru.install() was not called before starting a view.
"""
if self._events is None:
raise BootstrapFailureError(f"Cannot start View {type(self).__name__} before calling miru.install() first.")
# Optimize URL-button-only views by not adding to listener
if all((isinstance(item, Button) and item.url is not None) for item in self.children):
logger.warning(f"View {type(self).__name__} only contains link buttons. Ignoring 'View.start()' call.")
return
if message is None and not self.is_persistent:
raise ValueError(f"View '{type(self).__name__}' is not persistent, parameter 'message' must be provided.")
if message is None:
self._events.add_handler(self)
return
result = (await message) if inspect.isawaitable(message) else message
if not isinstance(result, (hikari.PartialMessage, hikari.Snowflake, int)):
raise TypeError("Parameter 'message' cannot be resolved to an instance of 'hikari.Message'.")
self._message_id = hikari.Snowflake(result)
if isinstance(result, hikari.Message):
self._message = result
self._events.add_handler(self)
self._timeout_task = self._create_task(self._handle_timeout())
[docs]
def get_view(message: hikari.SnowflakeishOr[hikari.PartialMessage]) -> t.Optional[View]:
"""Get a currently running view that is attached to the provided message.
Parameters
----------
message : hikari.SnowflakeishOr[hikari.PartialMessage]
The message the view is attached to.
Returns
-------
Optional[View]
The view bound to this message, if any.
Raises
------
BootstrapFailureError
miru.install() was not called before this operation.
"""
if View._events is None:
raise BootstrapFailureError("miru is not yet initialized! Please call miru.install() first.")
message_id = hikari.Snowflake(message)
if (view := View._events._bound_handlers.get(message_id)) and isinstance(view, View):
return view
return None
# MIT License
#
# Copyright (c) 2022-present hypergonial
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.