kombu.pidbox¶
Generic process mailbox.
Introduction¶
Creating the applications Mailbox¶
>>> mailbox = pidbox.Mailbox("celerybeat", type="direct")
>>> @mailbox.handler
>>> def reload_schedule(state, **kwargs):
... state["beat"].reload_schedule()
>>> @mailbox.handler
>>> def connection_info(state, **kwargs):
... return {"connection": state["connection"].info()}
Example Node¶
>>> connection = kombu.Connection()
>>> state = {"beat": beat,
"connection": connection}
>>> consumer = mailbox(connection).Node(hostname).listen()
>>> try:
... while True:
... connection.drain_events(timeout=1)
... finally:
... consumer.cancel()
Example Client¶
>>> mailbox.cast("reload_schedule") # cast is async.
>>> info = celerybeat.call("connection_info", timeout=1)
Mailbox¶
-
class
kombu.pidbox.
Mailbox
(namespace, type='direct', connection=None, clock=None, accept=None, serializer=None)[source]¶ -
namespace
= None¶ Name of application.
-
connection
= None¶ Connection (if bound).
-
type
= 'direct'¶ Exchange type (usually direct, or fanout for broadcast).
-
exchange
= None¶ mailbox exchange (init by constructor).
-
reply_exchange
= None¶ exchange to send replies to.
-
Node¶
-
class
kombu.pidbox.
Node
(hostname, state=None, channel=None, handlers=None, mailbox=None)[source]¶ -
hostname
= None¶ hostname of the node.
-
handlers
= None¶ map of method name/handlers.
-
state
= None¶ current context (passed on to handlers)
-
channel
= None¶ current channel.
-
dispatch_from_message
(body, message=None)¶
-