from __future__ import annotations
import logging
import sys
from datetime import datetime
from functools import lru_cache
from time import sleep
from typing import Callable, List, Optional, Tuple
from errbot.backends.base import (
AWAY,
DND,
OFFLINE,
ONLINE,
Identifier,
Message,
Person,
Presence,
Room,
RoomNotJoinedError,
RoomOccupant,
)
from errbot.core import ErrBot
from errbot.rendering import text, xhtml, xhtmlim
log = logging.getLogger(__name__)
try:
from slixmpp import JID, ClientXMPP
from slixmpp.exceptions import IqError
from slixmpp.xmlstream import cert, resolver
except ImportError:
log.exception("Could not start the XMPP backend")
log.fatal(
"""
If you intend to use the XMPP backend please install the support for XMPP with:
pip install errbot[XMPP]
"""
)
sys.exit(-1)
# LRU to cache the JID queries.
IDENTIFIERS_LRU = 1024
[docs]
class XMPPIdentifier(Identifier):
"""
This class is the parent and the basic contract of all the ways the backends
are identifying a person on their system.
"""
[docs]
def __init__(self, node, domain, resource):
if not node:
raise Exception("An XMPPIdentifier needs to have a node.")
if not domain:
raise Exception("An XMPPIdentifier needs to have a domain.")
self._node = node
self._domain = domain
self._resource = resource
self._email = ""
@property
def node(self) -> str:
return self._node
@property
def domain(self) -> str:
return self._domain
@property
def resource(self) -> str:
return self._resource
@property
def person(self) -> str:
return self._node + "@" + self._domain
@property
def nick(self) -> str:
return self._node
@property
def fullname(self) -> None:
return None # Not supported by default on XMPP.
@property
def email(self):
return self._email
@property
def client(self):
return self._resource
def __str__(self):
answer = self._node + "@" + self._domain # don't call .person: see below
if self._resource:
answer += "/" + self._resource
return answer
def __unicode__(self):
return str(self.__str__())
def __eq__(self, other):
if not isinstance(other, XMPPIdentifier):
log.debug("Weird, you are comparing an XMPPIdentifier to a %s", type(other))
return False
return (
self._domain == other._domain
and self._node == other._node
and self._resource == other._resource
)
[docs]
class XMPPPerson(XMPPIdentifier, Person):
aclattr = XMPPIdentifier.person
def __eq__(self, other):
if not isinstance(other, XMPPPerson):
log.debug("Weird, you are comparing an XMPPPerson to a %s", type(other))
return False
return self._domain == other._domain and self._node == other._node
[docs]
class XMPPRoom(XMPPIdentifier, Room):
[docs]
def __init__(self, room_jid, bot: ErrBot):
self._bot = bot
self.xep0045 = self._bot.conn.client.plugin["xep_0045"]
node, domain, resource = split_identifier(room_jid)
super().__init__(node, domain, resource)
[docs]
def join(
self, username: Optional[str] = None, password: Optional[str] = None
) -> None:
"""
Join the room.
If the room does not exist yet, this will automatically call
:meth:`create` on it first.
"""
room = str(self)
self.xep0045.join_muc(room, username, password=password)
self._bot.conn.add_event_handler(
f"muc::{room}::got_online", self._bot.user_joined_chat
)
self._bot.conn.add_event_handler(
f"muc::{room}::got_offline", self._bot.user_left_chat
)
self.configure()
self._bot.callback_room_joined(self, self._bot.bot_identifier)
log.info("Joined room %s.", room)
[docs]
def leave(self, reason: Optional[str] = None) -> None:
"""
Leave the room.
:param reason:
An optional string explaining the reason for leaving the room
"""
if reason is None:
reason = ""
room = str(self)
try:
self.xep0045.leave_muc(
room=room, nick=self.xep0045.ourNicks[room], msg=reason
)
self._bot.conn.del_event_handler(
f"muc::{room}::got_online", self._bot.user_joined_chat
)
self._bot.conn.del_event_handler(
f"muc::{room}::got_offline", self._bot.user_left_chat
)
log.info("Left room %s.", room)
self._bot.callback_room_left(self, self._bot.bot_identifier)
except KeyError:
log.debug("Trying to leave %s while not in this room.", room)
[docs]
def create(self) -> None:
"""
Not supported on this back-end (Slixmpp doesn't support it).
Will join the room to ensure it exists, instead.
"""
logging.warning(
"XMPP back-end does not support explicit creation, joining room "
"instead to ensure it exists."
)
self.join(username=str(self))
[docs]
def destroy(self) -> None:
"""
Destroy the room.
Calling this on a non-existing room is a no-op.
"""
self.xep0045.destroy(str(self))
log.info("Destroyed room %s.", self)
@property
def exists(self) -> bool:
"""
Boolean indicating whether this room already exists or not.
:getter:
Returns `True` if the room exists, `False` otherwise.
"""
logging.warning(
"XMPP back-end does not support determining if a room exists. Returning the result of joined instead."
)
return self.joined
@property
def joined(self) -> bool:
"""
Boolean indicating whether this room has already been joined.
:getter:
Returns `True` if the room has been joined, `False` otherwise.
"""
return str(self) in self.xep0045.get_joined_rooms()
@property
def topic(self) -> Optional[str]:
"""
The room topic.
:getter:
Returns the topic (a string) if one is set, `None` if no
topic has been set at all.
:raises:
:class:`~RoomNotJoinedError` if the room has not yet been joined.
"""
if not self.joined:
raise RoomNotJoinedError("Must be in a room in order to see the topic.")
try:
return self._bot._room_topics[str(self)]
except KeyError:
return None
@topic.setter
def topic(self, topic: str) -> None:
"""
Set the room's topic.
:param topic:
The topic to set.
"""
# Not supported by Slixmpp at the moment :(
raise NotImplementedError(
"Setting the topic is not supported on this back-end."
)
@property
def occupants(self) -> List[XMPPRoomOccupant]:
"""
The room's occupants.
:getter:
Returns a list of :class:`~errbot.backends.base.MUCOccupant` instances.
:raises:
:class:`~MUCNotJoinedError` if the room has not yet been joined.
"""
occupants = []
try:
for occupant in self.xep0045.rooms[str(self)].values():
room_node, room_domain, _ = split_identifier(occupant["room"])
nick = occupant["nick"]
occupants.append(XMPPRoomOccupant(room_node, room_domain, nick, self))
except KeyError:
raise RoomNotJoinedError("Must be in a room in order to see occupants.")
return occupants
[docs]
def invite(self, *args) -> None:
"""
Invite one or more people into the room.
:*args:
One or more JID's to invite into the room.
"""
room = str(self)
for jid in args:
self.xep0045.invite(room, jid)
log.info("Invited %s to %s.", jid, room)
[docs]
class XMPPRoomOccupant(XMPPPerson, RoomOccupant):
[docs]
def __init__(self, node, domain, resource, room):
super().__init__(node, domain, resource)
self._room = room
@property
def person(self):
return str(self) # this is the full identifier.
@property
def real_jid(self) -> str:
"""
The JID of the room occupant, they used to login.
Will only work if the errbot is moderator in the MUC or it is not anonymous.
"""
room_jid = self._node + "@" + self._domain
jid = JID(self._room.xep0045.get_jid_property(room_jid, self.resource, "jid"))
return jid.bare
@property
def room(self) -> XMPPRoom:
return self._room
nick = XMPPPerson.resource
[docs]
class XMPPConnection:
[docs]
def __init__(
self,
jid,
password,
feature=None,
keepalive=None,
ca_cert=None,
server=None,
use_ipv6=None,
bot=None,
ssl_version=None,
):
if feature is None:
feature = {}
self._bot = bot
self.connected = False
self.server = server
self.client = ClientXMPP(
jid, password, plugin_config={"feature_mechanisms": feature}
)
self.client.register_plugin("xep_0030") # Service Discovery
self.client.register_plugin("xep_0045") # Multi-User Chat
self.client.register_plugin("xep_0199") # XMPP Ping
self.client.register_plugin("xep_0203") # XMPP Delayed messages
self.client.register_plugin("xep_0249") # XMPP direct MUC invites
if keepalive is not None:
self.client.whitespace_keepalive = (
True # Just in case Slixmpp's default changes to False in the future
)
self.client.whitespace_keepalive_interval = keepalive
if use_ipv6 is not None:
self.client.use_ipv6 = use_ipv6
if ssl_version:
self.client.ssl_version = ssl_version
self.client.ca_certs = ca_cert # Used for TLS certificate validation
self.client.add_event_handler("session_start", self.session_start)
[docs]
def session_start(self, _):
self.client.send_presence()
self.client.get_roster()
[docs]
def connect(self) -> XMPPConnection:
if not self.connected:
if self.server is not None:
self.client.connect(self.server)
else:
self.client.connect()
self.connected = True
return self
[docs]
def disconnect(self) -> None:
self.client.disconnect(wait=True)
self.connected = False
[docs]
def serve_forever(self) -> None:
self.client.process()
[docs]
def add_event_handler(self, name: str, cb: Callable) -> None:
self.client.add_event_handler(name, cb)
[docs]
def del_event_handler(self, name: str, cb: Callable) -> None:
self.client.del_event_handler(name, cb)
XMPP_TO_ERR_STATUS = {
"available": ONLINE,
"away": AWAY,
"dnd": DND,
"unavailable": OFFLINE,
}
[docs]
def split_identifier(txtrep: str) -> Tuple[str, str, str]:
split_jid = txtrep.split("@", 1)
node, domain = "@".join(split_jid[:-1]), split_jid[-1]
if domain.find("/") != -1:
domain, resource = domain.split("/", 1)
else:
resource = None
return node, domain, resource
[docs]
class XMPPBackend(ErrBot):
room_factory = XMPPRoom
roomoccupant_factory = XMPPRoomOccupant
[docs]
def __init__(self, config):
super().__init__(config)
identity = config.BOT_IDENTITY
self.jid = identity["username"] # backward compatibility
self.password = identity["password"]
self.server = identity.get("server", None)
self.feature = config.__dict__.get("XMPP_FEATURE_MECHANISMS", {})
self.keepalive = config.__dict__.get("XMPP_KEEPALIVE_INTERVAL", None)
self.ca_cert = config.__dict__.get(
"XMPP_CA_CERT_FILE", "/etc/ssl/certs/ca-certificates.crt"
)
self.xhtmlim = config.__dict__.get("XMPP_XHTML_IM", False)
self.use_ipv6 = config.__dict__.get("XMPP_USE_IPV6", None)
self.ssl_version = config.__dict__.get("XMPP_SSL_VERSION", None)
# generic backend compatibility
self.bot_identifier = self._build_person(self.jid)
self.conn = self.create_connection()
self.conn.add_event_handler("message", self.incoming_message)
self.conn.add_event_handler("session_start", self.connected)
self.conn.add_event_handler("disconnected", self.disconnected)
# presence related handlers
self.conn.add_event_handler("got_online", self.contact_online)
self.conn.add_event_handler("got_offline", self.contact_offline)
self.conn.add_event_handler("changed_status", self.user_changed_status)
# MUC subject events
self.conn.add_event_handler("groupchat_subject", self.chat_topic)
self._room_topics = {}
self.md_xhtml = xhtml()
self.md_text = text()
[docs]
def create_connection(self) -> XMPPConnection:
return XMPPConnection(
jid=self.jid, # textual and original representation
password=self.password,
feature=self.feature,
keepalive=self.keepalive,
ca_cert=self.ca_cert,
server=self.server,
use_ipv6=self.use_ipv6,
bot=self,
ssl_version=self.ssl_version,
)
def _build_room_occupant(self, txtrep: str) -> XMPPRoomOccupant:
node, domain, resource = split_identifier(txtrep)
return self.roomoccupant_factory(
node, domain, resource, self.query_room(node + "@" + domain)
)
def _build_person(self, txtrep: str) -> XMPPPerson:
return XMPPPerson(*split_identifier(txtrep))
[docs]
def incoming_message(self, xmppmsg: dict) -> None:
"""Callback for message events"""
if xmppmsg["type"] == "error":
log.warning("Received error message: %s", xmppmsg)
return
msg = Message(xmppmsg["body"])
if "html" in xmppmsg.keys():
msg.html = xmppmsg["html"]
log.debug("incoming_message from: %s", msg.frm)
if xmppmsg["type"] == "groupchat":
msg.frm = self._build_room_occupant(xmppmsg["from"].full)
msg.to = msg.frm.room
else:
msg.frm = self._build_person(xmppmsg["from"].full)
msg.to = self._build_person(xmppmsg["to"].full)
msg.nick = xmppmsg["mucnick"]
now = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ")
delay = xmppmsg["delay"]._get_attr(
"stamp"
) # this is a bug in sleekxmpp it should be ['from']
msg.delayed = bool(delay and delay != now)
self.callback_message(msg)
def _idd_from_event(self, event) -> Union[XMPPRoomOccupant, XMPPPerson]:
txtrep = event["from"].full
return (
self._build_room_occupant(txtrep)
if "muc" in event
else self._build_person(txtrep)
)
[docs]
def user_joined_chat(self, event) -> None:
log.debug("user_join_chat %s", event)
self.callback_presence(
Presence(identifier=self._idd_from_event(event), status=ONLINE)
)
[docs]
def user_left_chat(self, event) -> None:
log.debug("user_left_chat %s", event)
self.callback_presence(
Presence(identifier=self._idd_from_event(event), status=OFFLINE)
)
[docs]
def chat_topic(self, event) -> None:
log.debug("chat_topic %s.", event)
room = event.values["mucroom"]
topic = event.values["subject"]
if topic == "":
topic = None
self._room_topics[room] = topic
room = XMPPRoom(event.values["mucroom"], self)
self.callback_room_topic(room)
[docs]
def user_changed_status(self, event) -> None:
log.debug("user_changed_status %s.", event)
errstatus = XMPP_TO_ERR_STATUS.get(event["type"], None)
message = event["status"]
if not errstatus:
errstatus = event["type"]
self.callback_presence(
Presence(
identifier=self._idd_from_event(event),
status=errstatus,
message=message,
)
)
[docs]
def connected(self, data) -> None:
"""Callback for connection events"""
self.connect_callback()
[docs]
def disconnected(self, data) -> None:
"""Callback for disconnection events"""
self.disconnect_callback()
[docs]
def send_message(self, msg: Message) -> None:
super().send_message(msg)
log.debug("send_message to %s", msg.to)
# We need to unescape the unicode characters (not the markup incompatible ones)
mhtml = (
xhtmlim.unescape(self.md_xhtml.convert(msg.body)) if self.xhtmlim else None
)
self.conn.client.send_message(
mto=str(msg.to),
mbody=self.md_text.convert(msg.body),
mhtml=mhtml,
mtype="chat" if msg.is_direct else "groupchat",
)
[docs]
def change_presence(self, status: str = ONLINE, message: str = "") -> None:
log.debug("Change bot status to %s, message %s.", status, message)
self.conn.client.send_presence(pshow=status, pstatus=message)
[docs]
def serve_forever(self) -> None:
self.conn.connect()
try:
self.conn.serve_forever()
finally:
log.debug("Trigger disconnect callback")
self.disconnect_callback()
log.debug("Trigger shutdown")
self.shutdown()
[docs]
@lru_cache(IDENTIFIERS_LRU)
def build_identifier(
self, txtrep: str
) -> Union[XMPPRoomOccupant, XMPPRoom, XMPPPerson]:
log.debug("build identifier for %s", txtrep)
try:
xep0030 = self.conn.client.plugin["xep_0030"]
info = xep0030.get_info(jid=txtrep)
disco_info = info["disco_info"]
if disco_info:
for category, typ, _, name in disco_info["identities"]:
if category == "conference":
log.debug("This is a room ! %s", txtrep)
return self.query_room(txtrep)
if (
category == "client"
and "http://jabber.org/protocol/muc"
in info["disco_info"]["features"]
):
log.debug("This is room occupant ! %s", txtrep)
return self._build_room_occupant(txtrep)
except IqError as iq:
log.debug("xep_0030 is probably not implemented on this server. %s.", iq)
log.debug("This is a person ! %s", txtrep)
return self._build_person(txtrep)
[docs]
def build_reply(
self,
msg: Message,
text: str = None,
private: bool = False,
threaded: bool = False,
) -> Message:
response = self.build_message(text)
response.frm = self.bot_identifier
if msg.is_group and not private:
# stripped returns the full bot@conference.domain.tld/chat_username
# but in case of a groupchat, we should only try to send to the MUC address
# itself (bot@conference.domain.tld)
response.to = XMPPRoom(msg.frm.node + "@" + msg.frm.domain, self)
elif msg.is_direct:
# preserve from in case of a simple chat message.
# it is either a user to user or user_in_chatroom to user case.
# so we need resource.
response.to = msg.frm
elif (
hasattr(msg.to, "person")
and msg.to.person == self.bot_config.BOT_IDENTITY["username"]
):
# This is a direct private message, not initiated through a MUC. Use
# stripped to remove the resource so that the response goes to the
# client with the highest priority
response.to = XMPPPerson(msg.frm.node, msg.frm.domain, None)
else:
# This is a private message that was initiated through a MUC. Don't use
# stripped here to retain the resource, else the XMPP server doesn't
# know which user we're actually responding to.
response.to = msg.frm
return response
@property
def mode(self):
return "xmpp"
[docs]
def rooms(self) -> List[XMPPRoom]:
"""
Return a list of rooms the bot is currently in.
:returns:
A list of :class:`~errbot.backends.base.XMPPMUCRoom` instances.
"""
xep0045 = self.conn.client.plugin["xep_0045"]
return [XMPPRoom(room, self) for room in xep0045.get_joined_rooms()]
[docs]
def query_room(self, room) -> XMPPRoom:
"""
Query a room for information.
:param room:
The JID/identifier of the room to query for.
:returns:
An instance of :class:`~XMPPMUCRoom`.
"""
return XMPPRoom(room, self)
[docs]
def prefix_groupchat_reply(self, message: Message, identifier: Identifier):
super().prefix_groupchat_reply(message, identifier)
message.body = f"@{identifier.nick} {message.body}"
def __hash__(self):
return 0