Source code for fedflow.core.message

"""
Process communication
=====================

classes in this source file are used for communication among processes, user should not use them directly. the start and
stop action of listener should only be called in fedflow framework.
"""

__all__ = [
    "Handler",
    "Message",
    "MessageListener"
]

import abc
import logging
import threading
import uuid
import multiprocessing
from collections import namedtuple


Message = namedtuple("Message", ["source", "cmd", "data"])
Message.__doc__ = "The message data structure communication among processes."
Message.source.__doc__ = "where message from"
Message.cmd.__doc__ = "the command of this message"
Message.data.__doc__ = "the payload data of this message"


[docs]class Handler(object): """ The basic class of message handler """
[docs] @abc.abstractmethod def handle(self, source: str, cmd: str, data: dict) -> None: """ handle message from other process. :param source: where message from, generally, it is a uuid string. :param cmd: command, it represents the action to be performed. :param data: the payload data of message. :return: """ pass
class SystemHandler(Handler): """ The self-message handler used by MessageListener """ def handle(self, source: str, cmd: str, data: dict): # Nothing to do pass class DefaultHandler(Handler): """ The default handler, it is only used to avoid null pointer exception. """ def handle(self, source: str, cmd: str, data: dict) -> None: # Nothing to do logging.getLogger("fedflow.msglistener").warning("No default handler.")
[docs]class MessageListener(object): logger = logging.getLogger("fedflow.msglistener") # uuid source __source = uuid.uuid4() # the handler for self-message __system_handler = SystemHandler() # the default handler for message which has no specify handler __default_handler = DefaultHandler() # handlers for specify source __handlers = {} # the message queue for all processes __mq = multiprocessing.Queue()
[docs] @classmethod def start(cls) -> None: """ start listen message :return: """ t = threading.Thread(target=cls.run) t.start()
[docs] @classmethod def run(cls) -> None: while True: msg: Message = cls.__mq.get() cls.logger.debug("receive message{source: %s, cmd: %s}", msg.source, msg.cmd) if msg.source == cls.__source: # the message from MessageListener if msg.cmd == "STOP": # stop listen cls.logger.info("receive STOP signal.") break else: # handle message from MessageListener(main process) cls.__system_handler.handle(msg.source, msg.cmd, msg.data) continue handler = cls.__handlers.get(msg.source) if handler is None: handler = cls.__default_handler try: handler.handle(msg.source, msg.cmd, msg.data) except Exception as e: cls.logger.error("An error occurred while handling message.", exc_info=True, stack_info=True)
[docs] @classmethod def register_handler(cls, source: str, handler: Handler, overwrite=False) -> None: """ register handler for specify source. :param source: every handler need a source, and the source cannot be same to the source of MessageListener :param handler: an instance of subclass of Handler :param overwrite: whether overwrite handler if it exists :return: """ if source == cls.__source: cls.logger.error("cannot register system handler which source is %s", source) return if overwrite or source not in cls.__handlers: cls.logger.info("register handler for %s", source) cls.__handlers[source] = handler else: cls.logger.warning("handler for %s exists.", source)
[docs] @classmethod def register_default_handler(cls, default_handler): """ register default handler and the action will overwrite previous default handler. :param default_handler: an instance of subclass of Handler, it will handle all message which has no specify handler. In init, the default handler will do nothing. :return: """ if default_handler is None: cls.logger.warning("default handler cannot be None.") return cls.logger.info("update default handler.") cls.__default_handler = default_handler
[docs] @classmethod def stop(cls): """ stop listening. :return: """ cls.logger.info("attempt stop.") # send stop message to self msg = Message(cmd="STOP", source=cls.__source, data={}) cls.__mq.put(msg)
[docs] @classmethod def mq(cls) -> multiprocessing.Queue: """ Get message queue :return: """ return cls.__mq