Source code for errbot.backends.text

import copyreg
import logging
import re
import sys
from time import sleep
from typing import BinaryIO, List, Optional, Union

from ansi.color import fg, fx
from markdown import Markdown
from markdown.extensions.extra import ExtraExtension
from pygments import highlight
from pygments.formatters import Terminal256Formatter
from pygments.lexers import get_lexer_by_name

from errbot.backends.base import (
    OFFLINE,
    ONLINE,
    Identifier,
    Message,
    Person,
    Presence,
    Room,
    RoomOccupant,
    Stream,
)
from errbot.core import ErrBot
from errbot.logs import console_hdlr
from errbot.rendering import ansi, imtext, text, xhtml
from errbot.rendering.ansiext import ANSI_CHRS, AnsiExtension, enable_format

log = logging.getLogger(__name__)

ENCODING_INPUT = sys.stdin.encoding
ANSI = hasattr(sys.stderr, "isatty") and sys.stderr.isatty()

enable_format("borderless", ANSI_CHRS, borders=False)


[docs] def borderless_ansi() -> Markdown: """This makes a converter from markdown to ansi (console) format. It can be called like this: from errbot.rendering import ansi md_converter = ansi() # you need to cache the converter ansi_txt = md_converter.convert(md_txt) """ md = Markdown( output_format="borderless", extensions=[ExtraExtension(), AnsiExtension()] ) md.stripTopLevelTags = False return md
[docs] class TextPerson(Person): """ Simple Person implementation which represents users as simple text strings. """
[docs] def __init__(self, person, client=None, nick=None, fullname=None): self._person = person self._client = client self._nick = nick self._fullname = fullname self._email = ""
@property def person(self) -> Person: return self._person @property def client(self) -> str: return self._client @property def nick(self) -> str: return self._nick @property def fullname(self) -> str: return self._fullname @property def email(self) -> str: return self._email @property def aclattr(self) -> str: return str(self) def __str__(self): return "@" + self._person def __eq__(self, other): if not isinstance(other, Person): return False return self.person == other.person def __hash__(self): return self.person.__hash__()
[docs] class TextRoom(Room):
[docs] def __init__(self, name: str, bot: ErrBot): self._topic = "" self._joined = False self.name = name self._bot = bot # get the bot username bot_config = self._bot.bot_config default_bot_name = "@errbot" if hasattr(bot_config, "BOT_IDENTITY"): bot_name = bot_config.BOT_IDENTITY.get("username", default_bot_name) else: bot_name = default_bot_name # fill up the room with a coherent set of identities. self._occupants = [ TextOccupant("somebody", self), TextOccupant(TextPerson(bot.bot_config.BOT_ADMINS[0]), self), TextOccupant(bot_name, self), ]
[docs] def join( self, username: Optional[str] = None, password: Optional[str] = None ) -> None: self._joined = True
[docs] def leave(self, reason: Optional[str] = None) -> None: self._joined = False
[docs] def create(self) -> None: self._joined = True
[docs] def destroy(self) -> None: self._joined = False
@property def exists(self) -> bool: return True @property def joined(self) -> bool: return self._joined @property def topic(self) -> str: return self._topic @topic.setter def topic(self, topic: str) -> None: self._topic = topic @property def occupants(self) -> List["TextOccupant"]: return self._occupants
[docs] def invite(self, *args): pass
def __str__(self): return "#" + self.name def __eq__(self, other): return self.name == other.name def __hash__(self): return self.name.__hash__()
[docs] class TextOccupant(TextPerson, RoomOccupant):
[docs] def __init__(self, person, room): super().__init__(person) self._room = room
@property def room(self) -> TextRoom: return self._room def __str__(self): return f"#{self._room.name}/{self._person.person}" def __eq__(self, other): return self.person == other.person and self.room == other.room def __hash__(self): return self.person.__hash__() + self.room.__hash__()
INTRO = """ --- You start as a **bot admin in a one-on-one conversation** with the bot. ### Context of the chat - Use `!inroom`{:color='blue'} to switch to a room conversation. - Use `!inperson`{:color='blue'} to switch back to a one-on-one conversation. - Use `!asuser`{:color='green'} to talk as a normal user. - Use `!asadmin`{:color='red'} to switch back as a bot admin. ### Preferences - Use `!ml`{:color='yellow'} to flip on/off the multiline mode (Enter twice at the end to send). --- """
[docs] class TextBackend(ErrBot):
[docs] def __init__(self, config): super().__init__(config) log.debug("Text Backend Init.") if ( hasattr(self.bot_config, "BOT_IDENTITY") and "username" in self.bot_config.BOT_IDENTITY ): self.bot_identifier = self.build_identifier( self.bot_config.BOT_IDENTITY["username"] ) else: # Just a default identity for the bot if nothing has been specified. self.bot_identifier = self.build_identifier("@errbot") log.debug("Bot username set at %s.", self.bot_identifier) self._inroom = False self._rooms = [] self._multiline = False self.demo_mode = ( self.bot_config.TEXT_DEMO_MODE if hasattr(self.bot_config, "TEXT_DEMO_MODE") else False ) if not self.demo_mode: self.md_html = xhtml() # for more debug feedback on md self.md_text = text() # for more debug feedback on md self.md_borderless_ansi = borderless_ansi() self.md_im = imtext() self.md_lexer = get_lexer_by_name("md", stripall=True) self.md_ansi = ansi() self.html_lexer = get_lexer_by_name("html", stripall=True) self.terminal_formatter = Terminal256Formatter(style="paraiso-dark") self.user = self.build_identifier(self.bot_config.BOT_ADMINS[0]) self._register_identifiers_pickling()
@staticmethod def _unpickle_identifier(identifier_str): return TextBackend.__build_identifier(identifier_str) @staticmethod def _pickle_identifier(identifier): return TextBackend._unpickle_identifier, (str(identifier),) def _register_identifiers_pickling(self): """ Register identifiers pickling. """ TextBackend.__build_identifier = self.build_identifier for cls in (TextPerson, TextOccupant, TextRoom): copyreg.pickle( cls, TextBackend._pickle_identifier, TextBackend._unpickle_identifier )
[docs] def serve_forever(self) -> None: self.readline_support() if not self._rooms: # artificially join a room if None were specified. self.query_room("#testroom").join() if self.demo_mode: # disable the console logging once it is serving in demo mode. root = logging.getLogger() root.removeHandler(console_hdlr) root.addHandler(logging.NullHandler()) self.connect_callback() # notify that the connection occured self.callback_presence(Presence(identifier=self.user, status=ONLINE)) self.send_message(Message(INTRO)) try: while True: if self._inroom: frm = TextOccupant(self.user, self.rooms()[0]) to = self.rooms()[0] else: frm = self.user to = self.bot_identifier print() full_msg = "" while True: prompt = "[␍] " if full_msg else ">>> " if ANSI or self.demo_mode: color = ( fg.red if self.user.person in self.bot_config.BOT_ADMINS[0] else fg.green ) prompt = f"{color}[{frm}{to}] {fg.cyan}{prompt}{fx.reset}" entry = input(prompt) else: entry = input(f"[{frm}{to}] {prompt}") if not self._multiline: full_msg = entry break if not entry: break full_msg += entry + "\n" msg = Message(full_msg) msg.frm = frm msg.to = to self.callback_message(msg) mentioned = [ self.build_identifier(word) for word in re.findall(r"(?<=\s)@[\w]+", entry) ] if mentioned: self.callback_mention(msg, mentioned) sleep(0.5) except EOFError: pass except KeyboardInterrupt: pass finally: # simulate some real presence self.callback_presence(Presence(identifier=self.user, status=OFFLINE)) log.debug("Trigger disconnect callback") self.disconnect_callback() log.debug("Trigger shutdown") self.shutdown()
[docs] def readline_support(self) -> None: try: # Load readline for better editing/history behaviour import readline # Implement a simple completer for commands def completer(txt, state): options = ( [i for i in self.all_commands if i.startswith(txt)] if txt else list(self.all_commands.keys()) ) if state < len(options): return options[state] readline.parse_and_bind("tab: complete") readline.set_completer(completer) except ImportError: # Readline is Unix-only log.debug("Python readline module is not available")
[docs] def send_message(self, msg: Message) -> None: if self.demo_mode: print(self.md_ansi.convert(msg.body)) else: bar = "\n╌╌[{mode}]" + ("╌" * 60) super().send_message(msg) print(bar.format(mode="MD ")) if ANSI: print(highlight(msg.body, self.md_lexer, self.terminal_formatter)) else: print(msg.body) print(bar.format(mode="HTML")) html = self.md_html.convert(msg.body) if ANSI: print(highlight(html, self.html_lexer, self.terminal_formatter)) else: print(html) print(bar.format(mode="TEXT")) print(self.md_text.convert(msg.body)) print(bar.format(mode="IM ")) print(self.md_im.convert(msg.body)) if ANSI: print(bar.format(mode="ANSI")) print(self.md_ansi.convert(msg.body)) print(bar.format(mode="BORDERLESS")) print(self.md_borderless_ansi.convert(msg.body)) print("\n\n")
[docs] def add_reaction(self, msg: Message, reaction: str) -> None: # this is like the Slack backend's add_reaction self._react("+", msg, reaction)
[docs] def remove_reaction(self, msg: Message, reaction: str) -> None: self._react("-", msg, reaction)
def _react(self, sign: str, msg: Message, reaction: str) -> None: self.send(msg.frm, f"reaction {sign}:{reaction}:", in_reply_to=msg)
[docs] def change_presence(self, status: str = ONLINE, message: str = "") -> None: log.debug("*** Changed presence to [%s] %s", status, message)
[docs] def build_identifier( self, text_representation: str ) -> Union[TextOccupant, TextRoom, TextPerson]: if text_representation.startswith("#"): rem = text_representation[1:] if "/" in text_representation: room, person = rem.split("/") return TextOccupant(TextPerson(person), TextRoom(room, self)) return self.query_room("#" + rem) if not text_representation.startswith("@"): raise ValueError( "An identifier for the Text backend needs to start with # for a room or @ for a person." ) return TextPerson(text_representation[1:])
[docs] def build_reply( self, msg: Message, text: str = None, private: bool = False, threaded: bool = False, ) -> Message: response = self.build_message(text) response.frm = self.bot_identifier if private: response.to = msg.frm else: response.to = msg.frm.room if isinstance(msg.frm, RoomOccupant) else msg.frm return response
@property def mode(self): return "text"
[docs] def query_room(self, room: str) -> TextRoom: if not room.startswith("#"): raise ValueError("A Room name must start by #.") text_room = TextRoom(room[1:], self) if text_room not in self._rooms: self._rooms.insert(0, text_room) else: self._rooms.insert(0, self._rooms.pop(self._rooms.index(text_room))) return text_room
[docs] def rooms(self) -> List[TextRoom]: return self._rooms
[docs] def prefix_groupchat_reply(self, message: Message, identifier: Identifier): message.body = f"{identifier.person} {message.body}"
[docs] def send_stream_request( self, user: Identifier, fsource: BinaryIO, name: str = None, size: int = None, stream_type: str = None, ) -> Stream: """ Starts a file transfer. For Slack, the size and stream_type are unsupported :param user: is the identifier of the person you want to send it to. :param fsource: is a file object you want to send. :param name: is an optional filename for it. :param size: not supported in Slack backend :param stream_type: not supported in Slack backend :return Stream: object on which you can monitor the progress of it. """ stream = Stream(user, fsource, name, size, stream_type) log.debug( "Requesting upload of %s to %s (size hint: %d, stream type: %s).", stream.name, stream.identifier, size, stream_type, ) return stream