Source code for errbot.backends.telegram_messenger

import logging
import sys
from typing import Any, BinaryIO, List, Optional, Union

from errbot.backends.base import (
    ONLINE,
    Identifier,
    Message,
    Person,
    Room,
    RoomError,
    RoomOccupant,
    Stream,
)
from errbot.core import ErrBot
from errbot.rendering import text
from errbot.rendering.ansiext import TEXT_CHRS, enable_format

log = logging.getLogger(__name__)

UPDATES_OFFSET_KEY = "_telegram_updates_offset"

try:
    import telegram
except ImportError:
    log.exception("Could not start the Telegram back-end")
    log.fatal(
        "You need to install the telegram support in order "
        "to use the Telegram backend.\n"
        "You should be able to install this package using:\n"
        "pip install errbot[telegram]"
    )
    sys.exit(1)


[docs] class RoomsNotSupportedError(RoomError):
[docs] def __init__(self, message: Optional[str] = None): if message is None: message = ( "Room operations are not supported on Telegram. " "While Telegram itself has groupchat functionality, it does not " "expose any APIs to bots to get group membership or otherwise " "interact with groupchats." ) super().__init__(message)
[docs] class TelegramBotFilter: """ This is a filter for the logging library that filters the "No new updates found." log message generated by telegram.bot. This is an INFO-level log message that gets logged for every getUpdates() call where there are no new messages, so is way too verbose. """
[docs] @staticmethod def filter(record): if record.getMessage() == "No new updates found.": return 0
[docs] class TelegramIdentifier(Identifier):
[docs] def __init__(self, id): self._id = str(id)
@property def id(self) -> str: return self._id def __unicode__(self): return str(self._id) def __eq__(self, other): return self._id == other.id __str__ = __unicode__ aclattr = id
[docs] class TelegramPerson(TelegramIdentifier, Person):
[docs] def __init__(self, id, first_name=None, last_name=None, username=None): super().__init__(id) self._first_name = first_name self._last_name = last_name self._username = username
@property def id(self) -> str: return self._id @property def first_name(self) -> str: return self._first_name @property def last_name(self) -> str: return self._last_name @property def fullname(self) -> str: fullname = self.first_name if self.last_name is not None: fullname += " " + self.last_name return fullname @property def username(self) -> str: return self._username @property def client(self) -> None: return None person = id nick = username
[docs] class TelegramRoom(TelegramIdentifier, Room):
[docs] def __init__(self, id, title=None): super().__init__(id) self._title = title
@property def id(self) -> str: return self._id @property def title(self): """Return the groupchat title (only applies to groupchats)""" return self._title
[docs] def join(self, username: str = None, password: str = None) -> None: raise RoomsNotSupportedError()
[docs] def create(self) -> None: raise RoomsNotSupportedError()
[docs] def leave(self, reason: str = None) -> None: raise RoomsNotSupportedError()
[docs] def destroy(self) -> None: raise RoomsNotSupportedError()
@property def joined(self) -> None: raise RoomsNotSupportedError() @property def exists(self) -> None: raise RoomsNotSupportedError() @property def topic(self) -> None: raise RoomsNotSupportedError() @property def occupants(self) -> None: raise RoomsNotSupportedError()
[docs] def invite(self, *args) -> None: raise RoomsNotSupportedError()
[docs] class TelegramMUCOccupant(TelegramPerson, RoomOccupant): """ This class represents a person inside a MUC. """
[docs] def __init__( self, id, room: TelegramRoom, first_name=None, last_name=None, username=None ): super().__init__( id=id, first_name=first_name, last_name=last_name, username=username ) self._room = room
@property def room(self) -> TelegramRoom: return self._room @property def username(self) -> str: return self._username
[docs] class TelegramBackend(ErrBot):
[docs] def __init__(self, config): super().__init__(config) logging.getLogger("telegram.bot").addFilter(TelegramBotFilter()) identity = config.BOT_IDENTITY self.token = identity.get("token", None) if not self.token: log.fatal( "You need to supply a token for me to use. You can obtain " "a token by registering your bot with the Bot Father (@BotFather)" ) sys.exit(1) self.telegram = None # Will be initialized in serve_once self.bot_instance = None # Will be set in serve_once compact = config.COMPACT_OUTPUT if hasattr(config, "COMPACT_OUTPUT") else False enable_format("text", TEXT_CHRS, borders=not compact) self.md_converter = text()
[docs] def set_message_size_limit(self, limit: int = 1024, hard_limit: int = 1024) -> None: """ Telegram message size limit """ super().set_message_size_limit(limit, hard_limit)
[docs] def serve_once(self) -> None: log.info("Initializing connection") try: self.telegram = telegram.Bot(token=self.token) me = self.telegram.getMe() except telegram.TelegramError as e: log.error("Connection failure: %s", e.message) return False self.bot_identifier = TelegramPerson( id=me.id, first_name=me.first_name, last_name=me.last_name, username=me.username, ) log.info("Connected") self.reset_reconnection_count() self.connect_callback() try: offset = self[UPDATES_OFFSET_KEY] except KeyError: offset = 0 try: while True: log.debug("Getting updates with offset %s", offset) for update in self.telegram.getUpdates(offset=offset, timeout=60): offset = update.update_id + 1 self[UPDATES_OFFSET_KEY] = offset log.debug("Processing update: %s", update) if not hasattr(update, "message"): log.warning("Unknown update type (no message present)") continue try: self._handle_message(update.message) except Exception: log.exception("An exception occurred while processing update") log.debug("All updates processed, new offset is %s", offset) except KeyboardInterrupt: log.info("Interrupt received, shutting down..") return True except Exception: log.exception("Error reading from Telegram updates stream:") finally: log.debug("Triggering disconnect callback") self.disconnect_callback()
def _handle_message(self, message: Message) -> None: """ Handle a received message. :param message: A message with a structure as defined at https://core.telegram.org/bots/api#message """ if message.text is None: log.warning("Unhandled message type (not a text message) ignored") return message_instance = self.build_message(message.text) if message.chat["type"] == "private": message_instance.frm = TelegramPerson( id=message.from_user.id, first_name=message.from_user.first_name, last_name=message.from_user.last_name, username=message.from_user.username, ) message_instance.to = self.bot_identifier else: room = TelegramRoom(id=message.chat.id, title=message.chat.title) message_instance.frm = TelegramMUCOccupant( id=message.from_user.id, room=room, first_name=message.from_user.first_name, last_name=message.from_user.last_name, username=message.from_user.username, ) message_instance.to = room message_instance.extras["message_id"] = message.message_id self.callback_message(message_instance)
[docs] def send_message(self, msg: Message) -> None: super().send_message(msg) body = self.md_converter.convert(msg.body) try: self.telegram.sendMessage(msg.to.id, body) except Exception: log.exception( f"An exception occurred while trying to send the following message to {msg.to.id}: {msg.body}" ) raise
[docs] def change_presence(self, status: str = ONLINE, message: str = "") -> None: # It looks like telegram doesn't supports online presence for privacy reason. pass
[docs] def build_identifier(self, txtrep: str) -> Union[TelegramPerson, TelegramRoom]: """ Convert a textual representation into a :class:`~TelegramPerson` or :class:`~TelegramRoom`. """ log.debug("building an identifier from %s.", txtrep) if not self._is_numeric(txtrep): raise ValueError("Telegram identifiers must be numeric.") id_ = int(txtrep) if id_ > 0: return TelegramPerson(id=id_) else: return TelegramRoom(id=id_)
[docs] def build_reply( self, msg: Message, text: Optional[str] = None, private: bool = False, threaded: bool = False, ) -> Message: response = self.build_message(text) response.frm = self.bot_identifier if private: response.to = msg.frm else: response.to = msg.frm if msg.is_direct else msg.to return response
@property def mode(self) -> text: return "telegram"
[docs] def query_room(self, room: TelegramRoom) -> None: """ Not supported on Telegram. :raises: :class:`~RoomsNotSupportedError` """ raise RoomsNotSupportedError()
[docs] def rooms(self) -> None: """ Not supported on Telegram. :raises: :class:`~RoomsNotSupportedError` """ raise RoomsNotSupportedError()
[docs] def prefix_groupchat_reply(self, message: Message, identifier: Identifier): super().prefix_groupchat_reply(message, identifier) message.body = f"@{identifier.nick}: {message.body}"
def _telegram_special_message( self, chat_id: Any, content: Any, msg_type: str, **kwargs ) -> telegram.Message: """Send special message.""" if msg_type == "document": msg = self.telegram.sendDocument( chat_id=chat_id, document=content, **kwargs ) elif msg_type == "photo": msg = self.telegram.sendPhoto(chat_id=chat_id, photo=content, **kwargs) elif msg_type == "audio": msg = self.telegram.sendAudio(chat_id=chat_id, audio=content, **kwargs) elif msg_type == "video": msg = self.telegram.sendVideo(chat_id=chat_id, video=content, **kwargs) elif msg_type == "sticker": msg = self.telegram.sendSticker(chat_id=chat_id, sticker=content, **kwargs) elif msg_type == "location": msg = self.telegram.sendLocation( chat_id=chat_id, latitude=kwargs.pop("latitude", ""), longitude=kwargs.pop("longitude", ""), **kwargs, ) else: raise ValueError( f"Expected a valid choice for `msg_type`, got: {msg_type}." ) return msg def _telegram_upload_stream(self, stream: Stream, **kwargs) -> None: """Perform upload defined in a stream.""" msg = None try: stream.accept() msg = self._telegram_special_message( chat_id=stream.identifier.id, content=stream.raw, msg_type=stream.stream_type, **kwargs, ) except Exception: log.exception(f"Upload of {stream.name} to {stream.identifier} failed.") else: if msg is None: stream.error() else: stream.success()
[docs] def send_stream_request( self, identifier: Union[TelegramPerson, TelegramMUCOccupant], fsource: Union[str, dict, BinaryIO], name: Optional[str] = "file", size: Optional[int] = None, stream_type: Optional[str] = None, ) -> Union[str, Stream]: """Starts a file transfer. :param identifier: TelegramPerson or TelegramMUCOccupant Identifier of the Person or Room to send the stream to. :param fsource: str, dict or binary data File URL or binary content from a local file. Optionally a dict with binary content plus metadata can be given. See `stream_type` for more details. :param name: str, optional Name of the file. Not sure if this works always. :param size: str, optional Size of the file obtained with os.path.getsize. This is only used for debug logging purposes. :param stream_type: str, optional Type of the stream. Choices: 'document', 'photo', 'audio', 'video', 'sticker', 'location'. If 'video', a dict is optional as {'content': fsource, 'duration': str}. If 'voice', a dict is optional as {'content': fsource, 'duration': str}. If 'audio', a dict is optional as {'content': fsource, 'duration': str, 'performer': str, 'title': str}. For 'location' a dict is mandatory as {'latitude': str, 'longitude': str}. For 'venue': TODO # see: https://core.telegram.org/bots/api#sendvenue :return stream: str or Stream If `fsource` is str will return str, else return Stream. """ def _telegram_metadata(fsource): if isinstance(fsource, dict): return fsource.pop("content"), fsource else: return fsource, None def _is_valid_url(url) -> bool: try: from urlparse import urlparse except Exception: from urllib.parse import urlparse return bool(urlparse(url).scheme) content, meta = _telegram_metadata(fsource) if isinstance(content, str): if not _is_valid_url(content): raise ValueError(f"Not valid URL: {content}") self._telegram_special_message( chat_id=identifier.id, content=content, msg_type=stream_type, **meta ) log.debug( "Requesting upload of %s to %s (size hint: %d, stream type: %s).", name, identifier.username, size, stream_type, ) stream = content else: stream = Stream(identifier, content, name, size, stream_type) log.debug( "Requesting upload of %s to %s (size hint: %d, stream type: %s)", name, identifier, size, stream_type, ) self.thread_pool.apply_async(self._telegram_upload_stream, (stream,)) return stream
@staticmethod def _is_numeric(input_) -> bool: """Return true if input is a number""" try: int(input_) return True except ValueError: return False