| | """Contains the base class :class:`.BaseSimplePrompt`.""" |
| | import os |
| | import re |
| | from abc import ABC, abstractmethod |
| | from typing import ( |
| | TYPE_CHECKING, |
| | Any, |
| | Callable, |
| | Dict, |
| | List, |
| | Optional, |
| | Tuple, |
| | Union, |
| | cast, |
| | ) |
| |
|
| | from prompt_toolkit.enums import EditingMode |
| | from prompt_toolkit.filters.base import Condition, FilterOrBool |
| | from prompt_toolkit.key_binding.key_bindings import KeyBindings, KeyHandlerCallable |
| | from prompt_toolkit.keys import Keys |
| | from prompt_toolkit.styles.style import Style |
| | from prompt_toolkit.validation import Validator |
| |
|
| | from InquirerPy.enum import INQUIRERPY_KEYBOARD_INTERRUPT |
| | from InquirerPy.exceptions import RequiredKeyNotFound |
| | from InquirerPy.utils import ( |
| | InquirerPyMessage, |
| | InquirerPySessionResult, |
| | InquirerPyStyle, |
| | InquirerPyValidate, |
| | get_style, |
| | ) |
| |
|
| | if TYPE_CHECKING: |
| | from prompt_toolkit.key_binding.key_processor import KeyPressEvent |
| |
|
| |
|
| | class BaseSimplePrompt(ABC): |
| | """The base class to create a simple terminal input prompt. |
| | |
| | Note: |
| | No actual :class:`~prompt_toolkit.application.Application` is created by this class. |
| | This class only creates some common interface and attributes that can be easily used |
| | by `prompt_toolkit`. |
| | |
| | To have a functional prompt, you'll at least have to implement the :meth:`.BaseSimplePrompt._run` |
| | and :meth:`.BaseSimplePrompt._get_prompt_message`. |
| | |
| | See Also: |
| | :class:`~InquirerPy.prompts.input.InputPrompt` |
| | """ |
| |
|
| | def __init__( |
| | self, |
| | message: InquirerPyMessage, |
| | style: Optional[InquirerPyStyle] = None, |
| | vi_mode: bool = False, |
| | qmark: str = "?", |
| | amark: str = "?", |
| | instruction: str = "", |
| | validate: Optional[InquirerPyValidate] = None, |
| | invalid_message: str = "Invalid input", |
| | transformer: Optional[Callable[[Any], Any]] = None, |
| | filter: Optional[Callable[[Any], Any]] = None, |
| | default: Any = "", |
| | wrap_lines: bool = True, |
| | raise_keyboard_interrupt: bool = True, |
| | mandatory: bool = True, |
| | mandatory_message: str = "Mandatory prompt", |
| | session_result: Optional[InquirerPySessionResult] = None, |
| | ) -> None: |
| | self._mandatory = mandatory |
| | self._mandatory_message = mandatory_message |
| | self._result = session_result or {} |
| | self._message = ( |
| | message |
| | if not isinstance(message, Callable) |
| | else cast(Callable, message)(self._result) |
| | ) |
| | self._instruction = instruction |
| | self._default = ( |
| | default if not isinstance(default, Callable) else default(self._result) |
| | ) |
| | self._style = Style.from_dict(style.dict if style else get_style().dict) |
| | self._qmark = qmark |
| | self._amark = amark |
| | self._status = {"answered": False, "result": None, "skipped": False} |
| | self._kb = KeyBindings() |
| | self._lexer = "class:input" |
| | self._transformer = transformer |
| | self._filter = filter |
| | self._wrap_lines = wrap_lines |
| | self._editing_mode = ( |
| | EditingMode.VI |
| | if vi_mode or bool(os.getenv("INQUIRERPY_VI_MODE", False)) |
| | else EditingMode.EMACS |
| | ) |
| | if isinstance(validate, Validator): |
| | self._validator = validate |
| | else: |
| | self._validator = Validator.from_callable( |
| | validate if validate else lambda _: True, |
| | invalid_message, |
| | move_cursor_to_end=True, |
| | ) |
| | self._raise_kbi = not os.getenv( |
| | "INQUIRERPY_NO_RAISE_KBI", not raise_keyboard_interrupt |
| | ) |
| | self._is_rasing_kbi = Condition(lambda: self._raise_kbi) |
| |
|
| | self._kb_maps = { |
| | "answer": [{"key": Keys.Enter}], |
| | "interrupt": [ |
| | {"key": "c-c", "filter": self._is_rasing_kbi}, |
| | {"key": "c-d", "filter": ~self._is_rasing_kbi}, |
| | ], |
| | "skip": [{"key": "c-z"}, {"key": "c-c", "filter": ~self._is_rasing_kbi}], |
| | } |
| | self._kb_func_lookup = { |
| | "answer": [{"func": self._handle_enter}], |
| | "interrupt": [{"func": self._handle_interrupt}], |
| | "skip": [{"func": self._handle_skip}], |
| | } |
| |
|
| | def _keybinding_factory(self): |
| | """Register all keybindings in `self._kb_maps`. |
| | |
| | It's required to call this function at the end of prompt constructor if |
| | it inherits from :class:`~InquirerPy.base.simple.BaseSimplePrompt` or |
| | :class:`~InquirerPy.base.complex.BaseComplexPrompt`. |
| | """ |
| |
|
| | def _factory(keys, filter, action): |
| | if action not in self.kb_func_lookup: |
| | raise RequiredKeyNotFound(f"keybinding action {action} not found") |
| | if not isinstance(keys, list): |
| | keys = [keys] |
| |
|
| | @self.register_kb(*keys, filter=filter) |
| | def _(event): |
| | for method in self.kb_func_lookup[action]: |
| | method["func"](event, *method.get("args", [])) |
| |
|
| | for key, item in self.kb_maps.items(): |
| | if not isinstance(item, list): |
| | item = [item] |
| | for kb in item: |
| | _factory(kb["key"], kb.get("filter", Condition(lambda: True)), key) |
| |
|
| | @abstractmethod |
| | def _set_error(self, message: str) -> None: |
| | """Set the error message for the prompt. |
| | |
| | Args: |
| | message: Error message to set. |
| | """ |
| | pass |
| |
|
| | def _handle_skip(self, event: Optional["KeyPressEvent"]) -> None: |
| | """Handle the event when attempting to skip a prompt. |
| | |
| | Skip the prompt if the `_mandatory` field is False, otherwise |
| | show an error message that the prompt cannot be skipped. |
| | """ |
| | if not self._mandatory: |
| | self.status["answered"] = True |
| | self.status["skipped"] = True |
| | self.status["result"] = None |
| | if event: |
| | event.app.exit(result=None) |
| | else: |
| | self._set_error(message=self._mandatory_message) |
| |
|
| | def _handle_interrupt(self, event: Optional["KeyPressEvent"]) -> None: |
| | """Handle the event when a KeyboardInterrupt signal is sent.""" |
| | self.status["answered"] = True |
| | self.status["result"] = INQUIRERPY_KEYBOARD_INTERRUPT |
| | self.status["skipped"] = True |
| | if event: |
| | event.app.exit(result=INQUIRERPY_KEYBOARD_INTERRUPT) |
| |
|
| | @abstractmethod |
| | def _handle_enter(self, event: Optional["KeyPressEvent"]) -> None: |
| | """Handle the event when user attempt to answer the question.""" |
| | pass |
| |
|
| | @property |
| | def status(self) -> Dict[str, Any]: |
| | """Dict[str, Any]: Get current prompt status. |
| | |
| | The status contains 3 keys: "answered" and "result". |
| | answered: If the current prompt is answered. |
| | result: The result of the user answer. |
| | skipped: If the prompt is skipped. |
| | """ |
| | return self._status |
| |
|
| | @status.setter |
| | def status(self, value) -> None: |
| | self._status = value |
| |
|
| | def register_kb( |
| | self, *keys: Union[Keys, str], filter: FilterOrBool = True, **kwargs |
| | ) -> Callable[[KeyHandlerCallable], KeyHandlerCallable]: |
| | """Keybinding registration decorator. |
| | |
| | This decorator wraps around the :meth:`prompt_toolkit.key_binding.KeyBindings.add` with |
| | added feature to process `alt` realted keybindings. |
| | |
| | By default, `prompt_toolkit` doesn't process `alt` related keybindings, |
| | it requires `alt-ANY` to `escape` + `ANY`. |
| | |
| | Args: |
| | keys: The keys to bind that can trigger the function. |
| | filter: :class:`~prompt_toolkit.filter.Condition` to indicate if this keybinding should be active. |
| | |
| | Returns: |
| | A decorator that should be applied to the function thats intended to be active when the keys |
| | are pressed. |
| | |
| | Examples: |
| | >>> @self.register_kb("alt-j") |
| | ... def test(event): |
| | ... pass |
| | """ |
| | alt_pattern = re.compile(r"^alt-(.*)") |
| |
|
| | def decorator(func: KeyHandlerCallable) -> KeyHandlerCallable: |
| | formatted_keys = [] |
| | for key in keys: |
| | match = alt_pattern.match(key) |
| | if match: |
| | formatted_keys.append("escape") |
| | formatted_keys.append(match.group(1)) |
| | else: |
| | formatted_keys.append(key) |
| |
|
| | @self._kb.add(*formatted_keys, filter=filter, **kwargs) |
| | def executable(event) -> None: |
| | func(event) |
| |
|
| | return executable |
| |
|
| | return decorator |
| |
|
| | @abstractmethod |
| | def _get_prompt_message( |
| | self, pre_answer: Tuple[str, str], post_answer: Tuple[str, str] |
| | ) -> List[Tuple[str, str]]: |
| | """Get the question message in formatted text form to display in the prompt. |
| | |
| | This function is mainly used to render the question message dynamically based |
| | on the current status (answered or not answered) of the prompt. |
| | |
| | Note: |
| | The function requires implementation when inheriting :class:`.BaseSimplePrompt`. |
| | You should call `super()._get_prompt_message(pre_answer, post_answer)` in |
| | the implemented `_get_prompt_message`. |
| | |
| | Args: |
| | pre_answer: The message to display before the question is answered. |
| | post_answer: The information to display after the question is answered. |
| | |
| | Returns: |
| | Formatted text in list of tuple format. |
| | """ |
| | display_message = [] |
| | if self.status["skipped"]: |
| | display_message.append(("class:skipped", self._qmark)) |
| | display_message.append( |
| | ("class:skipped", "%s%s " % (" " if self._qmark else "", self._message)) |
| | ) |
| | elif self.status["answered"]: |
| | display_message.append(("class:answermark", self._amark)) |
| | display_message.append( |
| | ( |
| | "class:answered_question", |
| | "%s%s" % (" " if self._amark else "", self._message), |
| | ) |
| | ) |
| | display_message.append( |
| | post_answer |
| | if not self._transformer |
| | else ( |
| | "class:answer", |
| | " %s" % self._transformer(self.status["result"]), |
| | ) |
| | ) |
| | else: |
| | display_message.append(("class:questionmark", self._qmark)) |
| | display_message.append( |
| | ( |
| | "class:question", |
| | "%s%s" % (" " if self._qmark else "", self._message), |
| | ) |
| | ) |
| | display_message.append(pre_answer) |
| | return display_message |
| |
|
| | @abstractmethod |
| | def _run(self) -> Any: |
| | """Abstractmethod to enforce a run function is implemented. |
| | |
| | All prompt instance requires a `_run` call to initialise and run an instance of |
| | `PromptSession` or `Application`. |
| | """ |
| | pass |
| |
|
| | @abstractmethod |
| | async def _run_async(self) -> Any: |
| | """Abstractmethod to enforce a run function is implemented. |
| | |
| | All prompt instance requires a `_run_async` call to initialise and run an instance of |
| | `PromptSession` or `Application`. |
| | """ |
| | pass |
| |
|
| | def execute(self, raise_keyboard_interrupt: Optional[bool] = None) -> Any: |
| | """Run the prompt and get the result. |
| | |
| | Args: |
| | raise_keyboard_interrupt: **Deprecated**. Set this parameter on the prompt initialisation instead. |
| | |
| | Returns: |
| | Value of the user answer. Types varies depending on the prompt. |
| | |
| | Raises: |
| | KeyboardInterrupt: When `ctrl-c` is pressed and `raise_keyboard_interrupt` is True. |
| | """ |
| | result = self._run() |
| | if raise_keyboard_interrupt is not None: |
| | self._raise_kbi = not os.getenv( |
| | "INQUIRERPY_NO_RAISE_KBI", not raise_keyboard_interrupt |
| | ) |
| | if result == INQUIRERPY_KEYBOARD_INTERRUPT: |
| | raise KeyboardInterrupt |
| | if not self._filter: |
| | return result |
| | return self._filter(result) |
| |
|
| | async def execute_async(self) -> None: |
| | """Run the prompt asynchronously and get the result. |
| | |
| | Returns: |
| | Value of the user answer. Types varies depending on the prompt. |
| | |
| | Raises: |
| | KeyboardInterrupt: When `ctrl-c` is pressed and `raise_keyboard_interrupt` is True. |
| | """ |
| | result = await self._run_async() |
| | if result == INQUIRERPY_KEYBOARD_INTERRUPT: |
| | raise KeyboardInterrupt |
| | if not self._filter: |
| | return result |
| | return self._filter(result) |
| |
|
| | @property |
| | def instruction(self) -> str: |
| | """str: Instruction to display next to question.""" |
| | return self._instruction |
| |
|
| | @property |
| | def kb_maps(self) -> Dict[str, Any]: |
| | """Dict[str, Any]: Keybinding mappings.""" |
| | return self._kb_maps |
| |
|
| | @kb_maps.setter |
| | def kb_maps(self, value: Dict[str, Any]) -> None: |
| | self._kb_maps = {**self._kb_maps, **value} |
| |
|
| | @property |
| | def kb_func_lookup(self) -> Dict[str, Any]: |
| | """Dict[str, Any]: Keybinding function lookup mappings..""" |
| | return self._kb_func_lookup |
| |
|
| | @kb_func_lookup.setter |
| | def kb_func_lookup(self, value: Dict[str, Any]) -> None: |
| | self._kb_func_lookup = {**self._kb_func_lookup, **value} |
| |
|