Source code for miru.abc.item_handler

from __future__ import annotations

import abc
import asyncio
import datetime
import itertools
import sys
import typing as t
from collections.abc import Sequence

import hikari

from ..exceptions import BootstrapFailureError, HandlerFullError, ItemAlreadyAttachedError, RowFullError
from .item import Item

if t.TYPE_CHECKING:
    import typing_extensions as te

    from ..context import Context
    from ..events import EventHandler
    from ..traits import MiruAware

__all__ = ("ItemHandler", "ItemArranger")


BuilderT = t.TypeVar("BuilderT", bound=hikari.api.ComponentBuilder)
ContextT = t.TypeVar("ContextT", bound="Context[t.Any]")
ItemT = t.TypeVar("ItemT", bound="Item[t.Any]")


[docs] class ItemArranger(t.Generic[ItemT]): """Calculate the position of an item based on it's width, and automatically arrange items if no explicit row is specified. Used internally by ItemHandler. """ __slots__ = ("_weights",) def __init__(self) -> None: self._weights = [0, 0, 0, 0, 0]
[docs] def add_item(self, item: ItemT) -> None: """Add an item to the weights. Parameters ---------- item : ItemT The item to add. Raises ------ RowFullError The item does not fit on the row specified. This error is only raised if a row is specified explicitly. HandlerFullError The item does not fit on any row. """ if item.row is not None: if item.width + self._weights[item.row] > 5: raise RowFullError(f"Item does not fit on row {item.row}!") self._weights[item.row] += item.width item._rendered_row = item.row else: for row, weight in enumerate(self._weights): if weight + item.width <= 5: self._weights[row] += item.width item._rendered_row = row return raise HandlerFullError("Item does not fit on this item handler.")
[docs] def remove_item(self, item: ItemT) -> None: """Remove an item from the weights. Parameters ---------- item : ItemT The item to remove. """ if item._rendered_row is not None: self._weights[item._rendered_row] -= item.width item._rendered_row = None
[docs] def clear(self) -> None: """Clear the weights, remove all items.""" self._weights = [0, 0, 0, 0, 0]
# TODO Type ignore: Python 3.8 doesn't support type-arg in abc.ABC, add when 3.9 is released
[docs] class ItemHandler(Sequence, abc.ABC, t.Generic[BuilderT, ContextT, ItemT]): # type: ignore[type-arg] """Abstract base class all item-handlers (e.g. views, modals) inherit from. Parameters ---------- timeout : Optional[Union[float, int, datetime.timedelta]], optional The duration after which the item handler times out, in seconds, by default 120.0 Raises ------ HandlerFullError Raised if the item handler has more than 25 components attached. BootstrapFailureError Raised if miru.install() was never called before instantiation. """ _app: t.ClassVar[t.Optional[MiruAware]] = None _events: t.ClassVar[t.Optional[EventHandler]] = None def __init__(self, *, timeout: t.Optional[t.Union[float, int, datetime.timedelta]] = 120.0) -> None: if isinstance(timeout, datetime.timedelta): timeout = timeout.total_seconds() self._timeout: t.Optional[float] = float(timeout) if timeout else None self._children: t.List[ItemT] = [] self._arranger: ItemArranger[ItemT] = ItemArranger() self._stopped: asyncio.Event = asyncio.Event() self._timeout_task: t.Optional[asyncio.Task[None]] = None self._running_tasks: t.MutableSequence[asyncio.Task[t.Any]] = [] self._last_context: t.Optional[ContextT] = None if len(self.children) > 25: raise HandlerFullError(f"{type(self).__name__} cannot have more than 25 components attached.") if self.app is None or self._events is None: raise BootstrapFailureError(f"miru.install() was not called before instantiation of {type(self).__name__}.") @t.overload def __getitem__(self, value: int) -> BuilderT: ... @t.overload def __getitem__(self, value: slice) -> t.Sequence[BuilderT]: ... def __getitem__(self, value: t.Union[slice, int]) -> t.Union[BuilderT, t.Sequence[BuilderT]]: return self.build()[value] def __iter__(self) -> t.Iterator[BuilderT]: for action_row in self.build(): yield action_row def __contains__(self, value: object) -> bool: return value in self.build() def __len__(self) -> int: return len(self.build()) def __reversed__(self) -> t.Iterator[BuilderT]: return self.build().__reversed__() @property def children(self) -> t.Sequence[ItemT]: """A list of all items attached to the item handler.""" return self._children @property def timeout(self) -> t.Optional[float]: """The amount of time the item handler is allowed to idle for, in seconds. Must be None for persistent views.""" return self._timeout @property def app(self) -> MiruAware: """The application that loaded the miru extension.""" if not self._app: raise AttributeError(f"miru was not loaded, {type(self).__name__} has no attribute app.") return self._app @property def bot(self) -> MiruAware: """The application that loaded the miru extension.""" return self.app @property def last_context(self) -> t.Optional[ContextT]: """The last context that was received by the item handler.""" return self._last_context @property @abc.abstractmethod def _builder(self) -> t.Type[BuilderT]: ...
[docs] def add_item(self, item: ItemT) -> te.Self: """Adds a new item to the item handler. Parameters ---------- item : Item[Any] The item to be added. Raises ------ ValueError ItemHandler already has 25 components attached. TypeError Parameter item is not an instance of Item. ItemAlreadyAttachedError The item is already attached to this item handler. ItemAlreadyAttachedError The item is already attached to another item handler. Returns ------- ItemHandler The item handler the item was added to. """ if len(self.children) > 25: raise HandlerFullError("Item Handler cannot have more than 25 components attached.") if not isinstance(item, Item): raise TypeError(f"Expected Item not {type(item).__name__} for parameter item.") if item in self.children: raise ItemAlreadyAttachedError(f"Item {type(item).__name__} is already attached to this item handler.") if item._handler is not None: raise ItemAlreadyAttachedError( f"Item {type(item).__name__} is already attached to another item handler: {type(item._handler).__name__}." ) self._arranger.add_item(item) item._handler = self self._children.append(item) return self
[docs] def remove_item(self, item: ItemT) -> te.Self: """Removes the specified item from the item handler. Parameters ---------- item : Item[Any] The item to be removed. Returns ------- ItemHandler The item handler the item was removed from. """ try: self._children.remove(item) except ValueError: pass else: self._arranger.remove_item(item) item._handler = None return self
[docs] def clear_items(self) -> te.Self: """Removes all items from this item handler. Returns ------- ItemHandler The item handler items were cleared from. """ for item in self.children: item._handler = None item._rendered_row = None self._children.clear() self._arranger.clear() return self
[docs] def get_item_by(self, predicate: t.Callable[[ItemT], bool]) -> t.Optional[ItemT]: """Get the first item that matches the given predicate. Parameters ---------- predicate : Callable[[Item[Any]], bool] A predicate to match the item. Returns ------- Optional[Item[Any]] The item that matched the predicate or None. """ for item in self.children: if predicate(item): return item return None
[docs] def get_item_by_id(self, custom_id: str) -> t.Optional[ItemT]: """Get the first item that matches the given custom ID. Parameters ---------- custom_id : str The custom_id of the component. Returns ------- Optional[Item[Any]] The item that matched the custom ID or None. """ return self.get_item_by(lambda item: item.custom_id == custom_id)
[docs] def build(self) -> t.Sequence[BuilderT]: """Creates the action rows the item handler represents. Returns ------- List[hikari.impl.MessageActionRowBuilder] A list of action rows containing all items attached to this item handler, converted to hikari component objects. If the item handler has no items attached, this returns an empty list. """ if not self.children: return [] self._children.sort(key=lambda i: i._rendered_row if i._rendered_row is not None else sys.maxsize) action_rows = [] for _, items in itertools.groupby(self.children, lambda i: i._rendered_row): s_items = sorted(items, key=lambda i: i.position if i.position is not None else sys.maxsize) action_row = self._builder() for item in s_items: item._build(action_row) action_rows.append(action_row) return action_rows
[docs] async def on_timeout(self) -> None: """Called when the item handler times out. Override for custom timeout logic.""" pass
[docs] def stop(self) -> None: """Stop listening for interactions.""" self._stopped.set() if self._timeout_task: self._timeout_task.cancel() if not self._events: return self._events.remove_handler(self)
@abc.abstractmethod async def _process_interactions(self, event: hikari.InteractionCreateEvent) -> None: """Process incoming interactions.""" def _reset_timeout(self) -> None: """Reset the timeout counter.""" if self.timeout is not None and self._timeout_task: self._timeout_task.cancel() self._timeout_task = self._create_task(self._handle_timeout()) async def _handle_timeout(self) -> None: """Handle the timing out of the item handler.""" if not self.timeout: return await asyncio.sleep(self.timeout) try: await self.on_timeout() except Exception as error: if on_error := getattr(self, "on_error", None): await on_error(error) self.stop() def _create_task(self, coro: t.Awaitable[t.Any], *, name: t.Optional[str] = None) -> asyncio.Task[t.Any]: """Run tasks inside the item handler internally while keeping a reference to the provided task.""" task = asyncio.create_task(coro, name=name) # type: ignore self._running_tasks.append(task) task.add_done_callback(lambda t: self._running_tasks.remove(t)) return task
[docs] async def wait(self, timeout: t.Optional[float] = None) -> None: """Wait until the item handler has stopped receiving interactions. Parameters ---------- timeout : Optional[float], optional The amount of time to wait, in seconds, by default None """ await asyncio.wait_for(self._stopped.wait(), timeout=timeout)
# MIT License # # Copyright (c) 2022-present hypergonial # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in all # copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE.