Merge "Sync RPC module from Oslo"
This commit is contained in:
commit
e3986285d2
@ -1,5 +1,3 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2011 OpenStack Foundation.
|
||||
# Copyright 2012, Red Hat, Inc.
|
||||
#
|
||||
@ -19,16 +17,17 @@
|
||||
Exception related utilities.
|
||||
"""
|
||||
|
||||
import contextlib
|
||||
import logging
|
||||
import sys
|
||||
import time
|
||||
import traceback
|
||||
|
||||
import six
|
||||
|
||||
from cinder.openstack.common.gettextutils import _
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def save_and_reraise_exception():
|
||||
class save_and_reraise_exception(object):
|
||||
"""Save current exception, run some code and then re-raise.
|
||||
|
||||
In some cases the exception context can be cleared, resulting in None
|
||||
@ -40,12 +39,61 @@ def save_and_reraise_exception():
|
||||
To work around this, we save the exception state, run handler code, and
|
||||
then re-raise the original exception. If another exception occurs, the
|
||||
saved exception is logged and the new exception is re-raised.
|
||||
|
||||
In some cases the caller may not want to re-raise the exception, and
|
||||
for those circumstances this context provides a reraise flag that
|
||||
can be used to suppress the exception. For example::
|
||||
|
||||
except Exception:
|
||||
with save_and_reraise_exception() as ctxt:
|
||||
decide_if_need_reraise()
|
||||
if not should_be_reraised:
|
||||
ctxt.reraise = False
|
||||
"""
|
||||
type_, value, tb = sys.exc_info()
|
||||
try:
|
||||
yield
|
||||
except Exception:
|
||||
logging.error(_('Original exception being dropped: %s'),
|
||||
traceback.format_exception(type_, value, tb))
|
||||
raise
|
||||
raise type_, value, tb
|
||||
def __init__(self):
|
||||
self.reraise = True
|
||||
|
||||
def __enter__(self):
|
||||
self.type_, self.value, self.tb, = sys.exc_info()
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
if exc_type is not None:
|
||||
logging.error(_('Original exception being dropped: %s'),
|
||||
traceback.format_exception(self.type_,
|
||||
self.value,
|
||||
self.tb))
|
||||
return False
|
||||
if self.reraise:
|
||||
six.reraise(self.type_, self.value, self.tb)
|
||||
|
||||
|
||||
def forever_retry_uncaught_exceptions(infunc):
|
||||
def inner_func(*args, **kwargs):
|
||||
last_log_time = 0
|
||||
last_exc_message = None
|
||||
exc_count = 0
|
||||
while True:
|
||||
try:
|
||||
return infunc(*args, **kwargs)
|
||||
except Exception as exc:
|
||||
this_exc_message = six.u(str(exc))
|
||||
if this_exc_message == last_exc_message:
|
||||
exc_count += 1
|
||||
else:
|
||||
exc_count = 1
|
||||
# Do not log any more frequently than once a minute unless
|
||||
# the exception message changes
|
||||
cur_time = int(time.time())
|
||||
if (cur_time - last_log_time > 60 or
|
||||
this_exc_message != last_exc_message):
|
||||
logging.exception(
|
||||
_('Unexpected exception occurred %d time(s)... '
|
||||
'retrying.') % exc_count)
|
||||
last_log_time = cur_time
|
||||
last_exc_message = this_exc_message
|
||||
exc_count = 0
|
||||
# This should be a very rare event. In case it isn't, do
|
||||
# a sleep.
|
||||
time.sleep(1)
|
||||
return inner_func
|
||||
|
@ -1,5 +1,3 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2010 United States Government as represented by the
|
||||
# Administrator of the National Aeronautics and Space Administration.
|
||||
# All Rights Reserved.
|
||||
@ -26,13 +24,13 @@ For some wrappers that add message versioning to rpc, see:
|
||||
"""
|
||||
|
||||
import inspect
|
||||
import logging
|
||||
|
||||
from oslo.config import cfg
|
||||
|
||||
from cinder.openstack.common.gettextutils import _
|
||||
from cinder.openstack.common import importutils
|
||||
from cinder.openstack.common import local
|
||||
from cinder.openstack.common import log as logging
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
@ -61,7 +59,7 @@ rpc_opts = [
|
||||
'exceptions',
|
||||
],
|
||||
help='Modules of exceptions that are permitted to be recreated'
|
||||
'upon receiving exception data from an rpc call.'),
|
||||
' upon receiving exception data from an rpc call.'),
|
||||
cfg.BoolOpt('fake_rabbit',
|
||||
default=False,
|
||||
help='If passed, use a fake RabbitMQ provider'),
|
||||
@ -227,7 +225,7 @@ def notify(context, topic, msg, envelope=False):
|
||||
|
||||
|
||||
def cleanup():
|
||||
"""Clean up resoruces in use by implementation.
|
||||
"""Clean up resources in use by implementation.
|
||||
|
||||
Clean up any resources that have been allocated by the RPC implementation.
|
||||
This is typically open connections to a messaging service. This function
|
||||
@ -286,7 +284,7 @@ def queue_get_for(context, topic, host):
|
||||
Messages sent to the 'foo.<host>' topic are sent to the nova-foo service on
|
||||
<host>.
|
||||
"""
|
||||
return '%s:%s' % (topic, host) if host else topic
|
||||
return '%s.%s' % (topic, host) if host else topic
|
||||
|
||||
|
||||
_RPCIMPL = None
|
||||
|
@ -1,5 +1,3 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2010 United States Government as represented by the
|
||||
# Administrator of the National Aeronautics and Space Administration.
|
||||
# All Rights Reserved.
|
||||
@ -20,9 +18,9 @@
|
||||
"""
|
||||
Shared code between AMQP based openstack.common.rpc implementations.
|
||||
|
||||
The code in this module is shared between the rpc implemenations based on AMQP.
|
||||
Specifically, this includes impl_kombu and impl_qpid. impl_carrot also uses
|
||||
AMQP, but is deprecated and predates this code.
|
||||
The code in this module is shared between the rpc implementations based on
|
||||
AMQP. Specifically, this includes impl_kombu and impl_qpid. impl_carrot also
|
||||
uses AMQP, but is deprecated and predates this code.
|
||||
"""
|
||||
|
||||
import collections
|
||||
@ -34,10 +32,9 @@ from eventlet import greenpool
|
||||
from eventlet import pools
|
||||
from eventlet import queue
|
||||
from eventlet import semaphore
|
||||
# TODO(pekowsk): Remove import cfg and below comment in Havana.
|
||||
# This import should no longer be needed when the amqp_rpc_single_reply_queue
|
||||
# option is removed.
|
||||
from oslo.config import cfg
|
||||
import six
|
||||
|
||||
|
||||
from cinder.openstack.common import excutils
|
||||
from cinder.openstack.common.gettextutils import _
|
||||
@ -47,11 +44,6 @@ from cinder.openstack.common.rpc import common as rpc_common
|
||||
|
||||
|
||||
amqp_opts = [
|
||||
# TODO(pekowski): Remove this option in Havana.
|
||||
cfg.BoolOpt('amqp_rpc_single_reply_queue',
|
||||
default=False,
|
||||
help='Enable a fast single reply queue if using AMQP based '
|
||||
'RPC like RabbitMQ or Qpid.'),
|
||||
cfg.BoolOpt('amqp_durable_queues',
|
||||
default=False,
|
||||
deprecated_name='rabbit_durable_queues',
|
||||
@ -91,7 +83,7 @@ class Pool(pools.Pool):
|
||||
# is the above "while loop" gets all the cached connections from the
|
||||
# pool and closes them, but never returns them to the pool, a pool
|
||||
# leak. The unit tests hang waiting for an item to be returned to the
|
||||
# pool. The unit tests get here via the teatDown() method. In the run
|
||||
# pool. The unit tests get here via the tearDown() method. In the run
|
||||
# time code, it gets here via cleanup() and only appears in service.py
|
||||
# just before doing a sys.exit(), so cleanup() only happens once and
|
||||
# the leakage is not a problem.
|
||||
@ -110,19 +102,19 @@ def get_connection_pool(conf, connection_cls):
|
||||
|
||||
|
||||
class ConnectionContext(rpc_common.Connection):
|
||||
"""The class that is actually returned to the caller of
|
||||
create_connection(). This is essentially a wrapper around
|
||||
Connection that supports 'with'. It can also return a new
|
||||
Connection, or one from a pool. The function will also catch
|
||||
when an instance of this class is to be deleted. With that
|
||||
we can return Connections to the pool on exceptions and so
|
||||
forth without making the caller be responsible for catching
|
||||
them. If possible the function makes sure to return a
|
||||
connection to the pool.
|
||||
"""The class that is actually returned to the create_connection() caller.
|
||||
|
||||
This is essentially a wrapper around Connection that supports 'with'.
|
||||
It can also return a new Connection, or one from a pool.
|
||||
|
||||
The function will also catch when an instance of this class is to be
|
||||
deleted. With that we can return Connections to the pool on exceptions
|
||||
and so forth without making the caller be responsible for catching them.
|
||||
If possible the function makes sure to return a connection to the pool.
|
||||
"""
|
||||
|
||||
def __init__(self, conf, connection_pool, pooled=True, server_params=None):
|
||||
"""Create a new connection, or get one from the pool"""
|
||||
"""Create a new connection, or get one from the pool."""
|
||||
self.connection = None
|
||||
self.conf = conf
|
||||
self.connection_pool = connection_pool
|
||||
@ -135,7 +127,7 @@ class ConnectionContext(rpc_common.Connection):
|
||||
self.pooled = pooled
|
||||
|
||||
def __enter__(self):
|
||||
"""When with ConnectionContext() is used, return self"""
|
||||
"""When with ConnectionContext() is used, return self."""
|
||||
return self
|
||||
|
||||
def _done(self):
|
||||
@ -173,17 +165,19 @@ class ConnectionContext(rpc_common.Connection):
|
||||
def create_worker(self, topic, proxy, pool_name):
|
||||
self.connection.create_worker(topic, proxy, pool_name)
|
||||
|
||||
def join_consumer_pool(self, callback, pool_name, topic, exchange_name):
|
||||
def join_consumer_pool(self, callback, pool_name, topic, exchange_name,
|
||||
ack_on_error=True):
|
||||
self.connection.join_consumer_pool(callback,
|
||||
pool_name,
|
||||
topic,
|
||||
exchange_name)
|
||||
exchange_name,
|
||||
ack_on_error)
|
||||
|
||||
def consume_in_thread(self):
|
||||
self.connection.consume_in_thread()
|
||||
|
||||
def __getattr__(self, key):
|
||||
"""Proxy all other calls to the Connection instance"""
|
||||
"""Proxy all other calls to the Connection instance."""
|
||||
if self.connection:
|
||||
return getattr(self.connection, key)
|
||||
else:
|
||||
@ -191,11 +185,11 @@ class ConnectionContext(rpc_common.Connection):
|
||||
|
||||
|
||||
class ReplyProxy(ConnectionContext):
|
||||
""" Connection class for RPC replies / callbacks """
|
||||
"""Connection class for RPC replies / callbacks."""
|
||||
def __init__(self, conf, connection_pool):
|
||||
self._call_waiters = {}
|
||||
self._num_call_waiters = 0
|
||||
self._num_call_waiters_wrn_threshhold = 10
|
||||
self._num_call_waiters_wrn_threshold = 10
|
||||
self._reply_q = 'reply_' + uuid.uuid4().hex
|
||||
super(ReplyProxy, self).__init__(conf, connection_pool, pooled=False)
|
||||
self.declare_direct_consumer(self._reply_q, self._process_data)
|
||||
@ -205,18 +199,20 @@ class ReplyProxy(ConnectionContext):
|
||||
msg_id = message_data.pop('_msg_id', None)
|
||||
waiter = self._call_waiters.get(msg_id)
|
||||
if not waiter:
|
||||
LOG.warn(_('no calling threads waiting for msg_id : %s'
|
||||
', message : %s') % (msg_id, message_data))
|
||||
LOG.warn(_('No calling threads waiting for msg_id : %(msg_id)s'
|
||||
', message : %(data)s'), {'msg_id': msg_id,
|
||||
'data': message_data})
|
||||
LOG.warn(_('_call_waiters: %s') % str(self._call_waiters))
|
||||
else:
|
||||
waiter.put(message_data)
|
||||
|
||||
def add_call_waiter(self, waiter, msg_id):
|
||||
self._num_call_waiters += 1
|
||||
if self._num_call_waiters > self._num_call_waiters_wrn_threshhold:
|
||||
if self._num_call_waiters > self._num_call_waiters_wrn_threshold:
|
||||
LOG.warn(_('Number of call waiters is greater than warning '
|
||||
'threshhold: %d. There could be a MulticallProxyWaiter '
|
||||
'leak.') % self._num_call_waiters_wrn_threshhold)
|
||||
self._num_call_waiters_wrn_threshhold *= 2
|
||||
'threshold: %d. There could be a MulticallProxyWaiter '
|
||||
'leak.') % self._num_call_waiters_wrn_threshold)
|
||||
self._num_call_waiters_wrn_threshold *= 2
|
||||
self._call_waiters[msg_id] = waiter
|
||||
|
||||
def del_call_waiter(self, msg_id):
|
||||
@ -239,18 +235,13 @@ def msg_reply(conf, msg_id, reply_q, connection_pool, reply=None,
|
||||
failure = rpc_common.serialize_remote_exception(failure,
|
||||
log_failure)
|
||||
|
||||
try:
|
||||
msg = {'result': reply, 'failure': failure}
|
||||
except TypeError:
|
||||
msg = {'result': dict((k, repr(v))
|
||||
for k, v in reply.__dict__.iteritems()),
|
||||
'failure': failure}
|
||||
msg = {'result': reply, 'failure': failure}
|
||||
if ending:
|
||||
msg['ending'] = True
|
||||
_add_unique_id(msg)
|
||||
# If a reply_q exists, add the msg_id to the reply and pass the
|
||||
# reply_q to direct_send() to use it as the response queue.
|
||||
# Otherwise use the msg_id for backward compatibilty.
|
||||
# Otherwise use the msg_id for backward compatibility.
|
||||
if reply_q:
|
||||
msg['_msg_id'] = msg_id
|
||||
conn.direct_send(reply_q, rpc_common.serialize_msg(msg))
|
||||
@ -259,7 +250,7 @@ def msg_reply(conf, msg_id, reply_q, connection_pool, reply=None,
|
||||
|
||||
|
||||
class RpcContext(rpc_common.CommonRpcContext):
|
||||
"""Context that supports replying to a rpc.call"""
|
||||
"""Context that supports replying to a rpc.call."""
|
||||
def __init__(self, **kwargs):
|
||||
self.msg_id = kwargs.pop('msg_id', None)
|
||||
self.reply_q = kwargs.pop('reply_q', None)
|
||||
@ -309,8 +300,14 @@ def pack_context(msg, context):
|
||||
for args at some point.
|
||||
|
||||
"""
|
||||
context_d = dict([('_context_%s' % key, value)
|
||||
for (key, value) in context.to_dict().iteritems()])
|
||||
if isinstance(context, dict):
|
||||
context_d = dict([('_context_%s' % key, value)
|
||||
for (key, value) in six.iteritems(context)])
|
||||
else:
|
||||
context_d = dict([('_context_%s' % key, value)
|
||||
for (key, value) in
|
||||
six.iteritems(context.to_dict())])
|
||||
|
||||
msg.update(context_d)
|
||||
|
||||
|
||||
@ -346,8 +343,9 @@ def _add_unique_id(msg):
|
||||
|
||||
|
||||
class _ThreadPoolWithWait(object):
|
||||
"""Base class for a delayed invocation manager used by
|
||||
the Connection class to start up green threads
|
||||
"""Base class for a delayed invocation manager.
|
||||
|
||||
Used by the Connection class to start up green threads
|
||||
to handle incoming messages.
|
||||
"""
|
||||
|
||||
@ -362,25 +360,48 @@ class _ThreadPoolWithWait(object):
|
||||
|
||||
|
||||
class CallbackWrapper(_ThreadPoolWithWait):
|
||||
"""Wraps a straight callback to allow it to be invoked in a green
|
||||
thread.
|
||||
"""Wraps a straight callback.
|
||||
|
||||
Allows it to be invoked in a green thread.
|
||||
"""
|
||||
|
||||
def __init__(self, conf, callback, connection_pool):
|
||||
"""
|
||||
def __init__(self, conf, callback, connection_pool,
|
||||
wait_for_consumers=False):
|
||||
"""Initiates CallbackWrapper object.
|
||||
|
||||
:param conf: cfg.CONF instance
|
||||
:param callback: a callable (probably a function)
|
||||
:param connection_pool: connection pool as returned by
|
||||
get_connection_pool()
|
||||
:param wait_for_consumers: wait for all green threads to
|
||||
complete and raise the last
|
||||
caught exception, if any.
|
||||
|
||||
"""
|
||||
super(CallbackWrapper, self).__init__(
|
||||
conf=conf,
|
||||
connection_pool=connection_pool,
|
||||
)
|
||||
self.callback = callback
|
||||
self.wait_for_consumers = wait_for_consumers
|
||||
self.exc_info = None
|
||||
|
||||
def _wrap(self, message_data, **kwargs):
|
||||
"""Wrap the callback invocation to catch exceptions.
|
||||
"""
|
||||
try:
|
||||
self.callback(message_data, **kwargs)
|
||||
except Exception:
|
||||
self.exc_info = sys.exc_info()
|
||||
|
||||
def __call__(self, message_data):
|
||||
self.pool.spawn_n(self.callback, message_data)
|
||||
self.exc_info = None
|
||||
self.pool.spawn_n(self._wrap, message_data)
|
||||
|
||||
if self.wait_for_consumers:
|
||||
self.pool.waitall()
|
||||
if self.exc_info:
|
||||
six.reraise(self.exc_info[1], None, self.exc_info[2])
|
||||
|
||||
|
||||
class ProxyCallback(_ThreadPoolWithWait):
|
||||
@ -498,7 +519,7 @@ class MulticallProxyWaiter(object):
|
||||
return result
|
||||
|
||||
def __iter__(self):
|
||||
"""Return a result until we get a reply with an 'ending" flag"""
|
||||
"""Return a result until we get a reply with an 'ending' flag."""
|
||||
if self._done:
|
||||
raise StopIteration
|
||||
while True:
|
||||
@ -520,61 +541,8 @@ class MulticallProxyWaiter(object):
|
||||
yield result
|
||||
|
||||
|
||||
#TODO(pekowski): Remove MulticallWaiter() in Havana.
|
||||
class MulticallWaiter(object):
|
||||
def __init__(self, conf, connection, timeout):
|
||||
self._connection = connection
|
||||
self._iterator = connection.iterconsume(timeout=timeout or
|
||||
conf.rpc_response_timeout)
|
||||
self._result = None
|
||||
self._done = False
|
||||
self._got_ending = False
|
||||
self._conf = conf
|
||||
self.msg_id_cache = _MsgIdCache()
|
||||
|
||||
def done(self):
|
||||
if self._done:
|
||||
return
|
||||
self._done = True
|
||||
self._iterator.close()
|
||||
self._iterator = None
|
||||
self._connection.close()
|
||||
|
||||
def __call__(self, data):
|
||||
"""The consume() callback will call this. Store the result."""
|
||||
self.msg_id_cache.check_duplicate_message(data)
|
||||
if data['failure']:
|
||||
failure = data['failure']
|
||||
self._result = rpc_common.deserialize_remote_exception(self._conf,
|
||||
failure)
|
||||
|
||||
elif data.get('ending', False):
|
||||
self._got_ending = True
|
||||
else:
|
||||
self._result = data['result']
|
||||
|
||||
def __iter__(self):
|
||||
"""Return a result until we get a 'None' response from consumer"""
|
||||
if self._done:
|
||||
raise StopIteration
|
||||
while True:
|
||||
try:
|
||||
self._iterator.next()
|
||||
except Exception:
|
||||
with excutils.save_and_reraise_exception():
|
||||
self.done()
|
||||
if self._got_ending:
|
||||
self.done()
|
||||
raise StopIteration
|
||||
result = self._result
|
||||
if isinstance(result, Exception):
|
||||
self.done()
|
||||
raise result
|
||||
yield result
|
||||
|
||||
|
||||
def create_connection(conf, new, connection_pool):
|
||||
"""Create a connection"""
|
||||
"""Create a connection."""
|
||||
return ConnectionContext(conf, connection_pool, pooled=not new)
|
||||
|
||||
|
||||
@ -583,14 +551,6 @@ _reply_proxy_create_sem = semaphore.Semaphore()
|
||||
|
||||
def multicall(conf, context, topic, msg, timeout, connection_pool):
|
||||
"""Make a call that returns multiple times."""
|
||||
# TODO(pekowski): Remove all these comments in Havana.
|
||||
# For amqp_rpc_single_reply_queue = False,
|
||||
# Can't use 'with' for multicall, as it returns an iterator
|
||||
# that will continue to use the connection. When it's done,
|
||||
# connection.close() will get called which will put it back into
|
||||
# the pool
|
||||
# For amqp_rpc_single_reply_queue = True,
|
||||
# The 'with' statement is mandatory for closing the connection
|
||||
LOG.debug(_('Making synchronous call on %s ...'), topic)
|
||||
msg_id = uuid.uuid4().hex
|
||||
msg.update({'_msg_id': msg_id})
|
||||
@ -598,21 +558,13 @@ def multicall(conf, context, topic, msg, timeout, connection_pool):
|
||||
_add_unique_id(msg)
|
||||
pack_context(msg, context)
|
||||
|
||||
# TODO(pekowski): Remove this flag and the code under the if clause
|
||||
# in Havana.
|
||||
if not conf.amqp_rpc_single_reply_queue:
|
||||
conn = ConnectionContext(conf, connection_pool)
|
||||
wait_msg = MulticallWaiter(conf, conn, timeout)
|
||||
conn.declare_direct_consumer(msg_id, wait_msg)
|
||||
with _reply_proxy_create_sem:
|
||||
if not connection_pool.reply_proxy:
|
||||
connection_pool.reply_proxy = ReplyProxy(conf, connection_pool)
|
||||
msg.update({'_reply_q': connection_pool.reply_proxy.get_reply_q()})
|
||||
wait_msg = MulticallProxyWaiter(conf, msg_id, timeout, connection_pool)
|
||||
with ConnectionContext(conf, connection_pool) as conn:
|
||||
conn.topic_send(topic, rpc_common.serialize_msg(msg), timeout)
|
||||
else:
|
||||
with _reply_proxy_create_sem:
|
||||
if not connection_pool.reply_proxy:
|
||||
connection_pool.reply_proxy = ReplyProxy(conf, connection_pool)
|
||||
msg.update({'_reply_q': connection_pool.reply_proxy.get_reply_q()})
|
||||
wait_msg = MulticallProxyWaiter(conf, msg_id, timeout, connection_pool)
|
||||
with ConnectionContext(conf, connection_pool) as conn:
|
||||
conn.topic_send(topic, rpc_common.serialize_msg(msg), timeout)
|
||||
return wait_msg
|
||||
|
||||
|
||||
|
@ -1,5 +1,3 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2010 United States Government as represented by the
|
||||
# Administrator of the National Aeronautics and Space Administration.
|
||||
# All Rights Reserved.
|
||||
@ -22,18 +20,21 @@ import sys
|
||||
import traceback
|
||||
|
||||
from oslo.config import cfg
|
||||
import six
|
||||
|
||||
from cinder.openstack.common.gettextutils import _
|
||||
from cinder.openstack.common import importutils
|
||||
from cinder.openstack.common import jsonutils
|
||||
from cinder.openstack.common import local
|
||||
from cinder.openstack.common import log as logging
|
||||
from cinder.openstack.common import versionutils
|
||||
|
||||
|
||||
CONF = cfg.CONF
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
_RPC_ENVELOPE_VERSION = '2.0'
|
||||
'''RPC Envelope Version.
|
||||
|
||||
This version number applies to the top level structure of messages sent out.
|
||||
@ -46,7 +47,7 @@ This version number applies to the message envelope that is used in the
|
||||
serialization done inside the rpc layer. See serialize_msg() and
|
||||
deserialize_msg().
|
||||
|
||||
The current message format (version 2.0) is very simple. It is:
|
||||
The current message format (version 2.0) is very simple. It is::
|
||||
|
||||
{
|
||||
'oslo.version': <RPC Envelope Version as a String>,
|
||||
@ -64,30 +65,31 @@ We will JSON encode the application message payload. The message envelope,
|
||||
which includes the JSON encoded application message body, will be passed down
|
||||
to the messaging libraries as a dict.
|
||||
'''
|
||||
_RPC_ENVELOPE_VERSION = '2.0'
|
||||
|
||||
_VERSION_KEY = 'oslo.version'
|
||||
_MESSAGE_KEY = 'oslo.message'
|
||||
|
||||
_REMOTE_POSTFIX = '_Remote'
|
||||
|
||||
|
||||
class RPCException(Exception):
|
||||
message = _("An unknown RPC related exception occurred.")
|
||||
msg_fmt = _("An unknown RPC related exception occurred.")
|
||||
|
||||
def __init__(self, message=None, **kwargs):
|
||||
self.kwargs = kwargs
|
||||
|
||||
if not message:
|
||||
try:
|
||||
message = self.message % kwargs
|
||||
message = self.msg_fmt % kwargs
|
||||
|
||||
except Exception:
|
||||
# kwargs doesn't match a variable in the message
|
||||
# log the issue and the kwargs
|
||||
LOG.exception(_('Exception in string format operation'))
|
||||
for name, value in kwargs.iteritems():
|
||||
for name, value in six.iteritems(kwargs):
|
||||
LOG.error("%s: %s" % (name, value))
|
||||
# at least get the core message out if something happened
|
||||
message = self.message
|
||||
message = self.msg_fmt
|
||||
|
||||
super(RPCException, self).__init__(message)
|
||||
|
||||
@ -101,7 +103,7 @@ class RemoteError(RPCException):
|
||||
contains all of the relevant info.
|
||||
|
||||
"""
|
||||
message = _("Remote error: %(exc_type)s %(value)s\n%(traceback)s.")
|
||||
msg_fmt = _("Remote error: %(exc_type)s %(value)s\n%(traceback)s.")
|
||||
|
||||
def __init__(self, exc_type=None, value=None, traceback=None):
|
||||
self.exc_type = exc_type
|
||||
@ -118,12 +120,13 @@ class Timeout(RPCException):
|
||||
This exception is raised if the rpc_response_timeout is reached while
|
||||
waiting for a response from the remote side.
|
||||
"""
|
||||
message = _('Timeout while waiting on RPC response - '
|
||||
msg_fmt = _('Timeout while waiting on RPC response - '
|
||||
'topic: "%(topic)s", RPC method: "%(method)s" '
|
||||
'info: "%(info)s"')
|
||||
|
||||
def __init__(self, info=None, topic=None, method=None):
|
||||
"""
|
||||
"""Initiates Timeout object.
|
||||
|
||||
:param info: Extra info to convey to the user
|
||||
:param topic: The topic that the rpc call was sent to
|
||||
:param rpc_method_name: The name of the rpc method being
|
||||
@ -140,23 +143,27 @@ class Timeout(RPCException):
|
||||
|
||||
|
||||
class DuplicateMessageError(RPCException):
|
||||
message = _("Found duplicate message(%(msg_id)s). Skipping it.")
|
||||
msg_fmt = _("Found duplicate message(%(msg_id)s). Skipping it.")
|
||||
|
||||
|
||||
class InvalidRPCConnectionReuse(RPCException):
|
||||
message = _("Invalid reuse of an RPC connection.")
|
||||
msg_fmt = _("Invalid reuse of an RPC connection.")
|
||||
|
||||
|
||||
class UnsupportedRpcVersion(RPCException):
|
||||
message = _("Specified RPC version, %(version)s, not supported by "
|
||||
msg_fmt = _("Specified RPC version, %(version)s, not supported by "
|
||||
"this endpoint.")
|
||||
|
||||
|
||||
class UnsupportedRpcEnvelopeVersion(RPCException):
|
||||
message = _("Specified RPC envelope version, %(version)s, "
|
||||
msg_fmt = _("Specified RPC envelope version, %(version)s, "
|
||||
"not supported by this endpoint.")
|
||||
|
||||
|
||||
class RpcVersionCapError(RPCException):
|
||||
msg_fmt = _("Specified RPC version cap, %(version_cap)s, is too low")
|
||||
|
||||
|
||||
class Connection(object):
|
||||
"""A connection, returned by rpc.create_connection().
|
||||
|
||||
@ -216,9 +223,9 @@ class Connection(object):
|
||||
raise NotImplementedError()
|
||||
|
||||
def join_consumer_pool(self, callback, pool_name, topic, exchange_name):
|
||||
"""Register as a member of a group of consumers for a given topic from
|
||||
the specified exchange.
|
||||
"""Register as a member of a group of consumers.
|
||||
|
||||
Uses given topic from the specified exchange.
|
||||
Exactly one member of a given pool will receive each message.
|
||||
|
||||
A message will be delivered to multiple pools, if more than
|
||||
@ -253,41 +260,24 @@ class Connection(object):
|
||||
|
||||
def _safe_log(log_func, msg, msg_data):
|
||||
"""Sanitizes the msg_data field before logging."""
|
||||
SANITIZE = {'set_admin_password': [('args', 'new_pass')],
|
||||
'run_instance': [('args', 'admin_password')],
|
||||
'route_message': [('args', 'message', 'args', 'method_info',
|
||||
'method_kwargs', 'password'),
|
||||
('args', 'message', 'args', 'method_info',
|
||||
'method_kwargs', 'admin_password')]}
|
||||
SANITIZE = ['_context_auth_token', 'auth_token', 'new_pass']
|
||||
|
||||
has_method = 'method' in msg_data and msg_data['method'] in SANITIZE
|
||||
has_context_token = '_context_auth_token' in msg_data
|
||||
has_token = 'auth_token' in msg_data
|
||||
def _fix_passwords(d):
|
||||
"""Sanitizes the password fields in the dictionary."""
|
||||
for k in six.iterkeys(d):
|
||||
if k.lower().find('password') != -1:
|
||||
d[k] = '<SANITIZED>'
|
||||
elif k.lower() in SANITIZE:
|
||||
d[k] = '<SANITIZED>'
|
||||
elif isinstance(d[k], list):
|
||||
for e in d[k]:
|
||||
if isinstance(e, dict):
|
||||
_fix_passwords(e)
|
||||
elif isinstance(d[k], dict):
|
||||
_fix_passwords(d[k])
|
||||
return d
|
||||
|
||||
if not any([has_method, has_context_token, has_token]):
|
||||
return log_func(msg, msg_data)
|
||||
|
||||
msg_data = copy.deepcopy(msg_data)
|
||||
|
||||
if has_method:
|
||||
for arg in SANITIZE.get(msg_data['method'], []):
|
||||
try:
|
||||
d = msg_data
|
||||
for elem in arg[:-1]:
|
||||
d = d[elem]
|
||||
d[arg[-1]] = '<SANITIZED>'
|
||||
except KeyError, e:
|
||||
LOG.info(_('Failed to sanitize %(item)s. Key error %(err)s'),
|
||||
{'item': arg,
|
||||
'err': e})
|
||||
|
||||
if has_context_token:
|
||||
msg_data['_context_auth_token'] = '<SANITIZED>'
|
||||
|
||||
if has_token:
|
||||
msg_data['auth_token'] = '<SANITIZED>'
|
||||
|
||||
return log_func(msg, msg_data)
|
||||
return log_func(msg, _fix_passwords(copy.deepcopy(msg_data)))
|
||||
|
||||
|
||||
def serialize_remote_exception(failure_info, log_failure=True):
|
||||
@ -299,17 +289,27 @@ def serialize_remote_exception(failure_info, log_failure=True):
|
||||
tb = traceback.format_exception(*failure_info)
|
||||
failure = failure_info[1]
|
||||
if log_failure:
|
||||
LOG.error(_("Returning exception %s to caller"), unicode(failure))
|
||||
LOG.error(_("Returning exception %s to caller"),
|
||||
six.text_type(failure))
|
||||
LOG.error(tb)
|
||||
|
||||
kwargs = {}
|
||||
if hasattr(failure, 'kwargs'):
|
||||
kwargs = failure.kwargs
|
||||
|
||||
# NOTE(matiu): With cells, it's possible to re-raise remote, remote
|
||||
# exceptions. Lets turn it back into the original exception type.
|
||||
cls_name = str(failure.__class__.__name__)
|
||||
mod_name = str(failure.__class__.__module__)
|
||||
if (cls_name.endswith(_REMOTE_POSTFIX) and
|
||||
mod_name.endswith(_REMOTE_POSTFIX)):
|
||||
cls_name = cls_name[:-len(_REMOTE_POSTFIX)]
|
||||
mod_name = mod_name[:-len(_REMOTE_POSTFIX)]
|
||||
|
||||
data = {
|
||||
'class': str(failure.__class__.__name__),
|
||||
'module': str(failure.__class__.__module__),
|
||||
'message': unicode(failure),
|
||||
'class': cls_name,
|
||||
'module': mod_name,
|
||||
'message': six.text_type(failure),
|
||||
'tb': tb,
|
||||
'args': failure.args,
|
||||
'kwargs': kwargs
|
||||
@ -345,8 +345,9 @@ def deserialize_remote_exception(conf, data):
|
||||
|
||||
ex_type = type(failure)
|
||||
str_override = lambda self: message
|
||||
new_ex_type = type(ex_type.__name__ + "_Remote", (ex_type,),
|
||||
new_ex_type = type(ex_type.__name__ + _REMOTE_POSTFIX, (ex_type,),
|
||||
{'__str__': str_override, '__unicode__': str_override})
|
||||
new_ex_type.__module__ = '%s%s' % (module, _REMOTE_POSTFIX)
|
||||
try:
|
||||
# NOTE(ameade): Dynamically create a new exception type and swap it in
|
||||
# as the new type for the exception. This only works on user defined
|
||||
@ -408,10 +409,11 @@ class CommonRpcContext(object):
|
||||
|
||||
|
||||
class ClientException(Exception):
|
||||
"""This encapsulates some actual exception that is expected to be
|
||||
hit by an RPC proxy object. Merely instantiating it records the
|
||||
current exception information, which will be passed back to the
|
||||
RPC client without exceptional logging."""
|
||||
"""Encapsulates actual exception expected to be hit by a RPC proxy object.
|
||||
|
||||
Merely instantiating it records the current exception information, which
|
||||
will be passed back to the RPC client without exceptional logging.
|
||||
"""
|
||||
def __init__(self):
|
||||
self._exc_info = sys.exc_info()
|
||||
|
||||
@ -419,7 +421,7 @@ class ClientException(Exception):
|
||||
def catch_client_exception(exceptions, func, *args, **kwargs):
|
||||
try:
|
||||
return func(*args, **kwargs)
|
||||
except Exception, e:
|
||||
except Exception as e:
|
||||
if type(e) in exceptions:
|
||||
raise ClientException()
|
||||
else:
|
||||
@ -428,11 +430,13 @@ def catch_client_exception(exceptions, func, *args, **kwargs):
|
||||
|
||||
def client_exceptions(*exceptions):
|
||||
"""Decorator for manager methods that raise expected exceptions.
|
||||
|
||||
Marking a Manager method with this decorator allows the declaration
|
||||
of expected exceptions that the RPC layer should not consider fatal,
|
||||
and not log as if they were generated in a real error scenario. Note
|
||||
that this will cause listed exceptions to be wrapped in a
|
||||
ClientException, which is used internally by the RPC layer."""
|
||||
ClientException, which is used internally by the RPC layer.
|
||||
"""
|
||||
def outer(func):
|
||||
def inner(*args, **kwargs):
|
||||
return catch_client_exception(exceptions, func, *args, **kwargs)
|
||||
@ -440,19 +444,15 @@ def client_exceptions(*exceptions):
|
||||
return outer
|
||||
|
||||
|
||||
# TODO(sirp): we should deprecate this in favor of
|
||||
# using `versionutils.is_compatible` directly
|
||||
def version_is_compatible(imp_version, version):
|
||||
"""Determine whether versions are compatible.
|
||||
|
||||
:param imp_version: The version implemented
|
||||
:param version: The version requested by an incoming message.
|
||||
"""
|
||||
version_parts = version.split('.')
|
||||
imp_version_parts = imp_version.split('.')
|
||||
if int(version_parts[0]) != int(imp_version_parts[0]): # Major
|
||||
return False
|
||||
if int(version_parts[1]) > int(imp_version_parts[1]): # Minor
|
||||
return False
|
||||
return True
|
||||
return versionutils.is_compatible(version, imp_version)
|
||||
|
||||
|
||||
def serialize_msg(raw_msg):
|
||||
|
@ -1,5 +1,3 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2012 Red Hat, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
@ -83,7 +81,10 @@ On the client side, the same changes should be made as in example 1. The
|
||||
minimum version that supports the new parameter should be specified.
|
||||
"""
|
||||
|
||||
import six
|
||||
|
||||
from cinder.openstack.common.rpc import common as rpc_common
|
||||
from cinder.openstack.common.rpc import serializer as rpc_serializer
|
||||
|
||||
|
||||
class RpcDispatcher(object):
|
||||
@ -93,16 +94,38 @@ class RpcDispatcher(object):
|
||||
contains a list of underlying managers that have an API_VERSION attribute.
|
||||
"""
|
||||
|
||||
def __init__(self, callbacks):
|
||||
def __init__(self, callbacks, serializer=None):
|
||||
"""Initialize the rpc dispatcher.
|
||||
|
||||
:param callbacks: List of proxy objects that are an instance
|
||||
of a class with rpc methods exposed. Each proxy
|
||||
object should have an RPC_API_VERSION attribute.
|
||||
:param serializer: The Serializer object that will be used to
|
||||
deserialize arguments before the method call and
|
||||
to serialize the result after it returns.
|
||||
"""
|
||||
self.callbacks = callbacks
|
||||
if serializer is None:
|
||||
serializer = rpc_serializer.NoOpSerializer()
|
||||
self.serializer = serializer
|
||||
super(RpcDispatcher, self).__init__()
|
||||
|
||||
def _deserialize_args(self, context, kwargs):
|
||||
"""Helper method called to deserialize args before dispatch.
|
||||
|
||||
This calls our serializer on each argument, returning a new set of
|
||||
args that have been deserialized.
|
||||
|
||||
:param context: The request context
|
||||
:param kwargs: The arguments to be deserialized
|
||||
:returns: A new set of deserialized args
|
||||
"""
|
||||
new_kwargs = dict()
|
||||
for argname, arg in six.iteritems(kwargs):
|
||||
new_kwargs[argname] = self.serializer.deserialize_entity(context,
|
||||
arg)
|
||||
return new_kwargs
|
||||
|
||||
def dispatch(self, ctxt, version, method, namespace, **kwargs):
|
||||
"""Dispatch a message based on a requested version.
|
||||
|
||||
@ -145,7 +168,9 @@ class RpcDispatcher(object):
|
||||
if not hasattr(proxyobj, method):
|
||||
continue
|
||||
if is_compatible:
|
||||
return getattr(proxyobj, method)(ctxt, **kwargs)
|
||||
kwargs = self._deserialize_args(ctxt, kwargs)
|
||||
result = getattr(proxyobj, method)(ctxt, **kwargs)
|
||||
return self.serializer.serialize_entity(ctxt, result)
|
||||
|
||||
if had_compatible:
|
||||
raise AttributeError("No such RPC function '%s'" % method)
|
||||
|
@ -1,5 +1,3 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2011 OpenStack Foundation
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
@ -26,6 +24,7 @@ import json
|
||||
import time
|
||||
|
||||
import eventlet
|
||||
import six
|
||||
|
||||
from cinder.openstack.common.rpc import common as rpc_common
|
||||
|
||||
@ -69,7 +68,7 @@ class Consumer(object):
|
||||
# Caller might have called ctxt.reply() manually
|
||||
for (reply, failure) in ctxt._response:
|
||||
if failure:
|
||||
raise failure[0], failure[1], failure[2]
|
||||
six.reraise(failure[0], failure[1], failure[2])
|
||||
res.append(reply)
|
||||
# if ending not 'sent'...we might have more data to
|
||||
# return from the function itself
|
||||
@ -122,7 +121,7 @@ class Connection(object):
|
||||
|
||||
|
||||
def create_connection(conf, new=True):
|
||||
"""Create a connection"""
|
||||
"""Create a connection."""
|
||||
return Connection()
|
||||
|
||||
|
||||
@ -146,7 +145,7 @@ def multicall(conf, context, topic, msg, timeout=None):
|
||||
try:
|
||||
consumer = CONSUMERS[topic][0]
|
||||
except (KeyError, IndexError):
|
||||
return iter([None])
|
||||
raise rpc_common.Timeout("No consumers available")
|
||||
else:
|
||||
return consumer.call(context, version, method, namespace, args,
|
||||
timeout)
|
||||
@ -179,7 +178,7 @@ def cleanup():
|
||||
|
||||
|
||||
def fanout_cast(conf, context, topic, msg):
|
||||
"""Cast to all consumers of a topic"""
|
||||
"""Cast to all consumers of a topic."""
|
||||
check_serialize(msg)
|
||||
method = msg.get('method')
|
||||
if not method:
|
||||
|
@ -1,5 +1,3 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2011 OpenStack Foundation
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
@ -18,7 +16,6 @@ import functools
|
||||
import itertools
|
||||
import socket
|
||||
import ssl
|
||||
import sys
|
||||
import time
|
||||
import uuid
|
||||
|
||||
@ -29,16 +26,22 @@ import kombu.connection
|
||||
import kombu.entity
|
||||
import kombu.messaging
|
||||
from oslo.config import cfg
|
||||
import six
|
||||
|
||||
from cinder.openstack.common import excutils
|
||||
from cinder.openstack.common.gettextutils import _
|
||||
from cinder.openstack.common import network_utils
|
||||
from cinder.openstack.common.rpc import amqp as rpc_amqp
|
||||
from cinder.openstack.common.rpc import common as rpc_common
|
||||
from cinder.openstack.common import sslutils
|
||||
|
||||
kombu_opts = [
|
||||
cfg.StrOpt('kombu_ssl_version',
|
||||
default='',
|
||||
help='SSL version to use (valid only if SSL enabled)'),
|
||||
help='SSL version to use (valid only if SSL enabled). '
|
||||
'valid values are TLSv1, SSLv23 and SSLv3. SSLv2 may '
|
||||
'be available on some distributions'
|
||||
),
|
||||
cfg.StrOpt('kombu_ssl_keyfile',
|
||||
default='',
|
||||
help='SSL key file (valid only if SSL enabled)'),
|
||||
@ -126,15 +129,40 @@ class ConsumerBase(object):
|
||||
self.tag = str(tag)
|
||||
self.kwargs = kwargs
|
||||
self.queue = None
|
||||
self.ack_on_error = kwargs.get('ack_on_error', True)
|
||||
self.reconnect(channel)
|
||||
|
||||
def reconnect(self, channel):
|
||||
"""Re-declare the queue after a rabbit reconnect"""
|
||||
"""Re-declare the queue after a rabbit reconnect."""
|
||||
self.channel = channel
|
||||
self.kwargs['channel'] = channel
|
||||
self.queue = kombu.entity.Queue(**self.kwargs)
|
||||
self.queue.declare()
|
||||
|
||||
def _callback_handler(self, message, callback):
|
||||
"""Call callback with deserialized message.
|
||||
|
||||
Messages that are processed without exception are ack'ed.
|
||||
|
||||
If the message processing generates an exception, it will be
|
||||
ack'ed if ack_on_error=True. Otherwise it will be .requeue()'ed.
|
||||
"""
|
||||
|
||||
try:
|
||||
msg = rpc_common.deserialize_msg(message.payload)
|
||||
callback(msg)
|
||||
except Exception:
|
||||
if self.ack_on_error:
|
||||
LOG.exception(_("Failed to process message"
|
||||
" ... skipping it."))
|
||||
message.ack()
|
||||
else:
|
||||
LOG.exception(_("Failed to process message"
|
||||
" ... will requeue."))
|
||||
message.requeue()
|
||||
else:
|
||||
message.ack()
|
||||
|
||||
def consume(self, *args, **kwargs):
|
||||
"""Actually declare the consumer on the amqp channel. This will
|
||||
start the flow of messages from the queue. Using the
|
||||
@ -147,8 +175,6 @@ class ConsumerBase(object):
|
||||
If kwargs['nowait'] is True, then this call will block until
|
||||
a message is read.
|
||||
|
||||
Messages will automatically be acked if the callback doesn't
|
||||
raise an exception
|
||||
"""
|
||||
|
||||
options = {'consumer_tag': self.tag}
|
||||
@ -159,21 +185,15 @@ class ConsumerBase(object):
|
||||
|
||||
def _callback(raw_message):
|
||||
message = self.channel.message_to_python(raw_message)
|
||||
try:
|
||||
msg = rpc_common.deserialize_msg(message.payload)
|
||||
callback(msg)
|
||||
except Exception:
|
||||
LOG.exception(_("Failed to process message... skipping it."))
|
||||
finally:
|
||||
message.ack()
|
||||
self._callback_handler(message, callback)
|
||||
|
||||
self.queue.consume(*args, callback=_callback, **options)
|
||||
|
||||
def cancel(self):
|
||||
"""Cancel the consuming from the queue, if it has started"""
|
||||
"""Cancel the consuming from the queue, if it has started."""
|
||||
try:
|
||||
self.queue.cancel(self.tag)
|
||||
except KeyError, e:
|
||||
except KeyError as e:
|
||||
# NOTE(comstud): Kludge to get around a amqplib bug
|
||||
if str(e) != "u'%s'" % self.tag:
|
||||
raise
|
||||
@ -181,7 +201,7 @@ class ConsumerBase(object):
|
||||
|
||||
|
||||
class DirectConsumer(ConsumerBase):
|
||||
"""Queue/consumer class for 'direct'"""
|
||||
"""Queue/consumer class for 'direct'."""
|
||||
|
||||
def __init__(self, conf, channel, msg_id, callback, tag, **kwargs):
|
||||
"""Init a 'direct' queue.
|
||||
@ -213,7 +233,7 @@ class DirectConsumer(ConsumerBase):
|
||||
|
||||
|
||||
class TopicConsumer(ConsumerBase):
|
||||
"""Consumer class for 'topic'"""
|
||||
"""Consumer class for 'topic'."""
|
||||
|
||||
def __init__(self, conf, channel, topic, callback, tag, name=None,
|
||||
exchange_name=None, **kwargs):
|
||||
@ -250,7 +270,7 @@ class TopicConsumer(ConsumerBase):
|
||||
|
||||
|
||||
class FanoutConsumer(ConsumerBase):
|
||||
"""Consumer class for 'fanout'"""
|
||||
"""Consumer class for 'fanout'."""
|
||||
|
||||
def __init__(self, conf, channel, topic, callback, tag, **kwargs):
|
||||
"""Init a 'fanout' queue.
|
||||
@ -283,7 +303,7 @@ class FanoutConsumer(ConsumerBase):
|
||||
|
||||
|
||||
class Publisher(object):
|
||||
"""Base Publisher class"""
|
||||
"""Base Publisher class."""
|
||||
|
||||
def __init__(self, channel, exchange_name, routing_key, **kwargs):
|
||||
"""Init the Publisher class with the exchange_name, routing_key,
|
||||
@ -295,7 +315,7 @@ class Publisher(object):
|
||||
self.reconnect(channel)
|
||||
|
||||
def reconnect(self, channel):
|
||||
"""Re-establish the Producer after a rabbit reconnection"""
|
||||
"""Re-establish the Producer after a rabbit reconnection."""
|
||||
self.exchange = kombu.entity.Exchange(name=self.exchange_name,
|
||||
**self.kwargs)
|
||||
self.producer = kombu.messaging.Producer(exchange=self.exchange,
|
||||
@ -303,7 +323,7 @@ class Publisher(object):
|
||||
routing_key=self.routing_key)
|
||||
|
||||
def send(self, msg, timeout=None):
|
||||
"""Send a message"""
|
||||
"""Send a message."""
|
||||
if timeout:
|
||||
#
|
||||
# AMQP TTL is in milliseconds when set in the header.
|
||||
@ -314,7 +334,7 @@ class Publisher(object):
|
||||
|
||||
|
||||
class DirectPublisher(Publisher):
|
||||
"""Publisher class for 'direct'"""
|
||||
"""Publisher class for 'direct'."""
|
||||
def __init__(self, conf, channel, msg_id, **kwargs):
|
||||
"""init a 'direct' publisher.
|
||||
|
||||
@ -330,7 +350,7 @@ class DirectPublisher(Publisher):
|
||||
|
||||
|
||||
class TopicPublisher(Publisher):
|
||||
"""Publisher class for 'topic'"""
|
||||
"""Publisher class for 'topic'."""
|
||||
def __init__(self, conf, channel, topic, **kwargs):
|
||||
"""init a 'topic' publisher.
|
||||
|
||||
@ -349,7 +369,7 @@ class TopicPublisher(Publisher):
|
||||
|
||||
|
||||
class FanoutPublisher(Publisher):
|
||||
"""Publisher class for 'fanout'"""
|
||||
"""Publisher class for 'fanout'."""
|
||||
def __init__(self, conf, channel, topic, **kwargs):
|
||||
"""init a 'fanout' publisher.
|
||||
|
||||
@ -364,7 +384,7 @@ class FanoutPublisher(Publisher):
|
||||
|
||||
|
||||
class NotifyPublisher(TopicPublisher):
|
||||
"""Publisher class for 'notify'"""
|
||||
"""Publisher class for 'notify'."""
|
||||
|
||||
def __init__(self, conf, channel, topic, **kwargs):
|
||||
self.durable = kwargs.pop('durable', conf.amqp_durable_queues)
|
||||
@ -425,7 +445,7 @@ class Connection(object):
|
||||
'virtual_host': self.conf.rabbit_virtual_host,
|
||||
}
|
||||
|
||||
for sp_key, value in server_params.iteritems():
|
||||
for sp_key, value in six.iteritems(server_params):
|
||||
p_key = server_params_to_kombu_params.get(sp_key, sp_key)
|
||||
params[p_key] = value
|
||||
|
||||
@ -444,13 +464,15 @@ class Connection(object):
|
||||
self.reconnect()
|
||||
|
||||
def _fetch_ssl_params(self):
|
||||
"""Handles fetching what ssl params
|
||||
should be used for the connection (if any)"""
|
||||
"""Handles fetching what ssl params should be used for the connection
|
||||
(if any).
|
||||
"""
|
||||
ssl_params = dict()
|
||||
|
||||
# http://docs.python.org/library/ssl.html - ssl.wrap_socket
|
||||
if self.conf.kombu_ssl_version:
|
||||
ssl_params['ssl_version'] = self.conf.kombu_ssl_version
|
||||
ssl_params['ssl_version'] = sslutils.validate_ssl_version(
|
||||
self.conf.kombu_ssl_version)
|
||||
if self.conf.kombu_ssl_keyfile:
|
||||
ssl_params['keyfile'] = self.conf.kombu_ssl_keyfile
|
||||
if self.conf.kombu_ssl_certfile:
|
||||
@ -461,12 +483,8 @@ class Connection(object):
|
||||
# future with this?
|
||||
ssl_params['cert_reqs'] = ssl.CERT_REQUIRED
|
||||
|
||||
if not ssl_params:
|
||||
# Just have the default behavior
|
||||
return True
|
||||
else:
|
||||
# Return the extended behavior
|
||||
return ssl_params
|
||||
# Return the extended behavior or just have the default behavior
|
||||
return ssl_params or True
|
||||
|
||||
def _connect(self, params):
|
||||
"""Connect to rabbit. Re-establish any queues that may have
|
||||
@ -517,7 +535,7 @@ class Connection(object):
|
||||
return
|
||||
except (IOError, self.connection_errors) as e:
|
||||
pass
|
||||
except Exception, e:
|
||||
except Exception as e:
|
||||
# NOTE(comstud): Unfortunately it's possible for amqplib
|
||||
# to return an error not covered by its transport
|
||||
# connection_errors in the case of a timeout waiting for
|
||||
@ -533,13 +551,11 @@ class Connection(object):
|
||||
log_info.update(params)
|
||||
|
||||
if self.max_retries and attempt == self.max_retries:
|
||||
LOG.error(_('Unable to connect to AMQP server on '
|
||||
'%(hostname)s:%(port)d after %(max_retries)d '
|
||||
'tries: %(err_str)s') % log_info)
|
||||
# NOTE(comstud): Copied from original code. There's
|
||||
# really no better recourse because if this was a queue we
|
||||
# need to consume on, we have no way to consume anymore.
|
||||
sys.exit(1)
|
||||
msg = _('Unable to connect to AMQP server on '
|
||||
'%(hostname)s:%(port)d after %(max_retries)d '
|
||||
'tries: %(err_str)s') % log_info
|
||||
LOG.error(msg)
|
||||
raise rpc_common.RPCException(msg)
|
||||
|
||||
if attempt == 1:
|
||||
sleep_time = self.interval_start or 1
|
||||
@ -558,10 +574,10 @@ class Connection(object):
|
||||
while True:
|
||||
try:
|
||||
return method(*args, **kwargs)
|
||||
except (self.connection_errors, socket.timeout, IOError), e:
|
||||
except (self.connection_errors, socket.timeout, IOError) as e:
|
||||
if error_callback:
|
||||
error_callback(e)
|
||||
except Exception, e:
|
||||
except Exception as e:
|
||||
# NOTE(comstud): Unfortunately it's possible for amqplib
|
||||
# to return an error not covered by its transport
|
||||
# connection_errors in the case of a timeout waiting for
|
||||
@ -575,18 +591,18 @@ class Connection(object):
|
||||
self.reconnect()
|
||||
|
||||
def get_channel(self):
|
||||
"""Convenience call for bin/clear_rabbit_queues"""
|
||||
"""Convenience call for bin/clear_rabbit_queues."""
|
||||
return self.channel
|
||||
|
||||
def close(self):
|
||||
"""Close/release this connection"""
|
||||
"""Close/release this connection."""
|
||||
self.cancel_consumer_thread()
|
||||
self.wait_on_proxy_callbacks()
|
||||
self.connection.release()
|
||||
self.connection = None
|
||||
|
||||
def reset(self):
|
||||
"""Reset a connection so it can be used again"""
|
||||
"""Reset a connection so it can be used again."""
|
||||
self.cancel_consumer_thread()
|
||||
self.wait_on_proxy_callbacks()
|
||||
self.channel.close()
|
||||
@ -608,14 +624,14 @@ class Connection(object):
|
||||
|
||||
def _declare_consumer():
|
||||
consumer = consumer_cls(self.conf, self.channel, topic, callback,
|
||||
self.consumer_num.next())
|
||||
six.next(self.consumer_num))
|
||||
self.consumers.append(consumer)
|
||||
return consumer
|
||||
|
||||
return self.ensure(_connect_error, _declare_consumer)
|
||||
|
||||
def iterconsume(self, limit=None, timeout=None):
|
||||
"""Return an iterator that will consume from all queues/consumers"""
|
||||
"""Return an iterator that will consume from all queues/consumers."""
|
||||
|
||||
info = {'do_consume': True}
|
||||
|
||||
@ -631,8 +647,8 @@ class Connection(object):
|
||||
|
||||
def _consume():
|
||||
if info['do_consume']:
|
||||
queues_head = self.consumers[:-1]
|
||||
queues_tail = self.consumers[-1]
|
||||
queues_head = self.consumers[:-1] # not fanout.
|
||||
queues_tail = self.consumers[-1] # fanout
|
||||
for queue in queues_head:
|
||||
queue.consume(nowait=True)
|
||||
queues_tail.consume(nowait=False)
|
||||
@ -645,7 +661,7 @@ class Connection(object):
|
||||
yield self.ensure(_error_callback, _consume)
|
||||
|
||||
def cancel_consumer_thread(self):
|
||||
"""Cancel a consumer thread"""
|
||||
"""Cancel a consumer thread."""
|
||||
if self.consumer_thread is not None:
|
||||
self.consumer_thread.kill()
|
||||
try:
|
||||
@ -660,7 +676,7 @@ class Connection(object):
|
||||
proxy_cb.wait()
|
||||
|
||||
def publisher_send(self, cls, topic, msg, timeout=None, **kwargs):
|
||||
"""Send to a publisher based on the publisher class"""
|
||||
"""Send to a publisher based on the publisher class."""
|
||||
|
||||
def _error_callback(exc):
|
||||
log_info = {'topic': topic, 'err_str': str(exc)}
|
||||
@ -681,45 +697,47 @@ class Connection(object):
|
||||
self.declare_consumer(DirectConsumer, topic, callback)
|
||||
|
||||
def declare_topic_consumer(self, topic, callback=None, queue_name=None,
|
||||
exchange_name=None):
|
||||
exchange_name=None, ack_on_error=True):
|
||||
"""Create a 'topic' consumer."""
|
||||
self.declare_consumer(functools.partial(TopicConsumer,
|
||||
name=queue_name,
|
||||
exchange_name=exchange_name,
|
||||
ack_on_error=ack_on_error,
|
||||
),
|
||||
topic, callback)
|
||||
|
||||
def declare_fanout_consumer(self, topic, callback):
|
||||
"""Create a 'fanout' consumer"""
|
||||
"""Create a 'fanout' consumer."""
|
||||
self.declare_consumer(FanoutConsumer, topic, callback)
|
||||
|
||||
def direct_send(self, msg_id, msg):
|
||||
"""Send a 'direct' message"""
|
||||
"""Send a 'direct' message."""
|
||||
self.publisher_send(DirectPublisher, msg_id, msg)
|
||||
|
||||
def topic_send(self, topic, msg, timeout=None):
|
||||
"""Send a 'topic' message"""
|
||||
"""Send a 'topic' message."""
|
||||
self.publisher_send(TopicPublisher, topic, msg, timeout)
|
||||
|
||||
def fanout_send(self, topic, msg):
|
||||
"""Send a 'fanout' message"""
|
||||
"""Send a 'fanout' message."""
|
||||
self.publisher_send(FanoutPublisher, topic, msg)
|
||||
|
||||
def notify_send(self, topic, msg, **kwargs):
|
||||
"""Send a notify message on a topic"""
|
||||
"""Send a notify message on a topic."""
|
||||
self.publisher_send(NotifyPublisher, topic, msg, None, **kwargs)
|
||||
|
||||
def consume(self, limit=None):
|
||||
"""Consume from all queues/consumers"""
|
||||
"""Consume from all queues/consumers."""
|
||||
it = self.iterconsume(limit=limit)
|
||||
while True:
|
||||
try:
|
||||
it.next()
|
||||
six.next(it)
|
||||
except StopIteration:
|
||||
return
|
||||
|
||||
def consume_in_thread(self):
|
||||
"""Consumer from all queues/consumers in a greenthread"""
|
||||
"""Consumer from all queues/consumers in a greenthread."""
|
||||
@excutils.forever_retry_uncaught_exceptions
|
||||
def _consumer_thread():
|
||||
try:
|
||||
self.consume()
|
||||
@ -730,7 +748,7 @@ class Connection(object):
|
||||
return self.consumer_thread
|
||||
|
||||
def create_consumer(self, topic, proxy, fanout=False):
|
||||
"""Create a consumer that calls a method in a proxy object"""
|
||||
"""Create a consumer that calls a method in a proxy object."""
|
||||
proxy_cb = rpc_amqp.ProxyCallback(
|
||||
self.conf, proxy,
|
||||
rpc_amqp.get_connection_pool(self.conf, Connection))
|
||||
@ -742,7 +760,7 @@ class Connection(object):
|
||||
self.declare_topic_consumer(topic, proxy_cb)
|
||||
|
||||
def create_worker(self, topic, proxy, pool_name):
|
||||
"""Create a worker that calls a method in a proxy object"""
|
||||
"""Create a worker that calls a method in a proxy object."""
|
||||
proxy_cb = rpc_amqp.ProxyCallback(
|
||||
self.conf, proxy,
|
||||
rpc_amqp.get_connection_pool(self.conf, Connection))
|
||||
@ -750,7 +768,7 @@ class Connection(object):
|
||||
self.declare_topic_consumer(topic, proxy_cb, pool_name)
|
||||
|
||||
def join_consumer_pool(self, callback, pool_name, topic,
|
||||
exchange_name=None):
|
||||
exchange_name=None, ack_on_error=True):
|
||||
"""Register as a member of a group of consumers for a given topic from
|
||||
the specified exchange.
|
||||
|
||||
@ -764,6 +782,7 @@ class Connection(object):
|
||||
callback=callback,
|
||||
connection_pool=rpc_amqp.get_connection_pool(self.conf,
|
||||
Connection),
|
||||
wait_for_consumers=not ack_on_error
|
||||
)
|
||||
self.proxy_callbacks.append(callback_wrapper)
|
||||
self.declare_topic_consumer(
|
||||
@ -771,11 +790,12 @@ class Connection(object):
|
||||
topic=topic,
|
||||
exchange_name=exchange_name,
|
||||
callback=callback_wrapper,
|
||||
ack_on_error=ack_on_error,
|
||||
)
|
||||
|
||||
|
||||
def create_connection(conf, new=True):
|
||||
"""Create a connection"""
|
||||
"""Create a connection."""
|
||||
return rpc_amqp.create_connection(
|
||||
conf, new,
|
||||
rpc_amqp.get_connection_pool(conf, Connection))
|
||||
|
@ -1,5 +1,3 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2011 OpenStack Foundation
|
||||
# Copyright 2011 - 2012, Red Hat, Inc.
|
||||
#
|
||||
@ -22,7 +20,9 @@ import time
|
||||
import eventlet
|
||||
import greenlet
|
||||
from oslo.config import cfg
|
||||
import six
|
||||
|
||||
from cinder.openstack.common import excutils
|
||||
from cinder.openstack.common.gettextutils import _
|
||||
from cinder.openstack.common import importutils
|
||||
from cinder.openstack.common import jsonutils
|
||||
@ -30,6 +30,7 @@ from cinder.openstack.common import log as logging
|
||||
from cinder.openstack.common.rpc import amqp as rpc_amqp
|
||||
from cinder.openstack.common.rpc import common as rpc_common
|
||||
|
||||
qpid_codec = importutils.try_import("qpid.codec010")
|
||||
qpid_messaging = importutils.try_import("qpid.messaging")
|
||||
qpid_exceptions = importutils.try_import("qpid.messaging.exceptions")
|
||||
|
||||
@ -79,6 +80,8 @@ qpid_opts = [
|
||||
|
||||
cfg.CONF.register_opts(qpid_opts)
|
||||
|
||||
JSON_CONTENT_TYPE = 'application/json; charset=utf8'
|
||||
|
||||
|
||||
def raise_invalid_topology_version(conf):
|
||||
msg = (_("Invalid value for qpid_topology_version: %d") %
|
||||
@ -146,31 +149,59 @@ class ConsumerBase(object):
|
||||
|
||||
self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts))
|
||||
|
||||
self.reconnect(session)
|
||||
self.connect(session)
|
||||
|
||||
def connect(self, session):
|
||||
"""Declare the receiver on connect."""
|
||||
self._declare_receiver(session)
|
||||
|
||||
def reconnect(self, session):
|
||||
"""Re-declare the receiver after a qpid reconnect"""
|
||||
"""Re-declare the receiver after a qpid reconnect."""
|
||||
self._declare_receiver(session)
|
||||
|
||||
def _declare_receiver(self, session):
|
||||
self.session = session
|
||||
self.receiver = session.receiver(self.address)
|
||||
self.receiver.capacity = 1
|
||||
|
||||
def _unpack_json_msg(self, msg):
|
||||
"""Load the JSON data in msg if msg.content_type indicates that it
|
||||
is necessary. Put the loaded data back into msg.content and
|
||||
update msg.content_type appropriately.
|
||||
|
||||
A Qpid Message containing a dict will have a content_type of
|
||||
'amqp/map', whereas one containing a string that needs to be converted
|
||||
back from JSON will have a content_type of JSON_CONTENT_TYPE.
|
||||
|
||||
:param msg: a Qpid Message object
|
||||
:returns: None
|
||||
"""
|
||||
if msg.content_type == JSON_CONTENT_TYPE:
|
||||
msg.content = jsonutils.loads(msg.content)
|
||||
msg.content_type = 'amqp/map'
|
||||
|
||||
def consume(self):
|
||||
"""Fetch the message and pass it to the callback object"""
|
||||
"""Fetch the message and pass it to the callback object."""
|
||||
message = self.receiver.fetch()
|
||||
try:
|
||||
self._unpack_json_msg(message)
|
||||
msg = rpc_common.deserialize_msg(message.content)
|
||||
self.callback(msg)
|
||||
except Exception:
|
||||
LOG.exception(_("Failed to process message... skipping it."))
|
||||
finally:
|
||||
# TODO(sandy): Need support for optional ack_on_error.
|
||||
self.session.acknowledge(message)
|
||||
|
||||
def get_receiver(self):
|
||||
return self.receiver
|
||||
|
||||
def get_node_name(self):
|
||||
return self.address.split(';')[0]
|
||||
|
||||
|
||||
class DirectConsumer(ConsumerBase):
|
||||
"""Queue/consumer class for 'direct'"""
|
||||
"""Queue/consumer class for 'direct'."""
|
||||
|
||||
def __init__(self, conf, session, msg_id, callback):
|
||||
"""Init a 'direct' queue.
|
||||
@ -203,7 +234,7 @@ class DirectConsumer(ConsumerBase):
|
||||
|
||||
|
||||
class TopicConsumer(ConsumerBase):
|
||||
"""Consumer class for 'topic'"""
|
||||
"""Consumer class for 'topic'."""
|
||||
|
||||
def __init__(self, conf, session, topic, callback, name=None,
|
||||
exchange_name=None):
|
||||
@ -234,7 +265,7 @@ class TopicConsumer(ConsumerBase):
|
||||
|
||||
|
||||
class FanoutConsumer(ConsumerBase):
|
||||
"""Consumer class for 'fanout'"""
|
||||
"""Consumer class for 'fanout'."""
|
||||
|
||||
def __init__(self, conf, session, topic, callback):
|
||||
"""Init a 'fanout' queue.
|
||||
@ -243,6 +274,7 @@ class FanoutConsumer(ConsumerBase):
|
||||
'topic' is the topic to listen on
|
||||
'callback' is the callback to call when messages are received
|
||||
"""
|
||||
self.conf = conf
|
||||
|
||||
link_opts = {"exclusive": True}
|
||||
|
||||
@ -261,7 +293,7 @@ class FanoutConsumer(ConsumerBase):
|
||||
|
||||
|
||||
class Publisher(object):
|
||||
"""Base Publisher class"""
|
||||
"""Base Publisher class."""
|
||||
|
||||
def __init__(self, conf, session, node_name, node_opts=None):
|
||||
"""Init the Publisher class with the exchange_name, routing_key,
|
||||
@ -295,18 +327,46 @@ class Publisher(object):
|
||||
self.reconnect(session)
|
||||
|
||||
def reconnect(self, session):
|
||||
"""Re-establish the Sender after a reconnection"""
|
||||
"""Re-establish the Sender after a reconnection."""
|
||||
self.sender = session.sender(self.address)
|
||||
|
||||
def _pack_json_msg(self, msg):
|
||||
"""Qpid cannot serialize dicts containing strings longer than 65535
|
||||
characters. This function dumps the message content to a JSON
|
||||
string, which Qpid is able to handle.
|
||||
|
||||
:param msg: May be either a Qpid Message object or a bare dict.
|
||||
:returns: A Qpid Message with its content field JSON encoded.
|
||||
"""
|
||||
try:
|
||||
msg.content = jsonutils.dumps(msg.content)
|
||||
except AttributeError:
|
||||
# Need to have a Qpid message so we can set the content_type.
|
||||
msg = qpid_messaging.Message(jsonutils.dumps(msg))
|
||||
msg.content_type = JSON_CONTENT_TYPE
|
||||
return msg
|
||||
|
||||
def send(self, msg):
|
||||
"""Send a message"""
|
||||
"""Send a message."""
|
||||
try:
|
||||
# Check if Qpid can encode the message
|
||||
check_msg = msg
|
||||
if not hasattr(check_msg, 'content_type'):
|
||||
check_msg = qpid_messaging.Message(msg)
|
||||
content_type = check_msg.content_type
|
||||
enc, dec = qpid_messaging.message.get_codec(content_type)
|
||||
enc(check_msg.content)
|
||||
except qpid_codec.CodecException:
|
||||
# This means the message couldn't be serialized as a dict.
|
||||
msg = self._pack_json_msg(msg)
|
||||
self.sender.send(msg)
|
||||
|
||||
|
||||
class DirectPublisher(Publisher):
|
||||
"""Publisher class for 'direct'"""
|
||||
"""Publisher class for 'direct'."""
|
||||
def __init__(self, conf, session, msg_id):
|
||||
"""Init a 'direct' publisher."""
|
||||
|
||||
if conf.qpid_topology_version == 1:
|
||||
node_name = msg_id
|
||||
node_opts = {"type": "direct"}
|
||||
@ -321,9 +381,9 @@ class DirectPublisher(Publisher):
|
||||
|
||||
|
||||
class TopicPublisher(Publisher):
|
||||
"""Publisher class for 'topic'"""
|
||||
"""Publisher class for 'topic'."""
|
||||
def __init__(self, conf, session, topic):
|
||||
"""init a 'topic' publisher.
|
||||
"""Init a 'topic' publisher.
|
||||
"""
|
||||
exchange_name = rpc_amqp.get_control_exchange(conf)
|
||||
|
||||
@ -338,9 +398,9 @@ class TopicPublisher(Publisher):
|
||||
|
||||
|
||||
class FanoutPublisher(Publisher):
|
||||
"""Publisher class for 'fanout'"""
|
||||
"""Publisher class for 'fanout'."""
|
||||
def __init__(self, conf, session, topic):
|
||||
"""init a 'fanout' publisher.
|
||||
"""Init a 'fanout' publisher.
|
||||
"""
|
||||
|
||||
if conf.qpid_topology_version == 1:
|
||||
@ -357,9 +417,9 @@ class FanoutPublisher(Publisher):
|
||||
|
||||
|
||||
class NotifyPublisher(Publisher):
|
||||
"""Publisher class for notifications"""
|
||||
"""Publisher class for notifications."""
|
||||
def __init__(self, conf, session, topic):
|
||||
"""init a 'topic' publisher.
|
||||
"""Init a 'topic' publisher.
|
||||
"""
|
||||
exchange_name = rpc_amqp.get_control_exchange(conf)
|
||||
node_opts = {"durable": True}
|
||||
@ -433,7 +493,7 @@ class Connection(object):
|
||||
return self.consumers[str(receiver)]
|
||||
|
||||
def reconnect(self):
|
||||
"""Handles reconnecting and re-establishing sessions and queues"""
|
||||
"""Handles reconnecting and re-establishing sessions and queues."""
|
||||
attempt = 0
|
||||
delay = 1
|
||||
while True:
|
||||
@ -450,7 +510,7 @@ class Connection(object):
|
||||
try:
|
||||
self.connection_create(broker)
|
||||
self.connection.open()
|
||||
except qpid_exceptions.ConnectionError, e:
|
||||
except qpid_exceptions.ConnectionError as e:
|
||||
msg_dict = dict(e=e, delay=delay)
|
||||
msg = _("Unable to connect to AMQP server: %(e)s. "
|
||||
"Sleeping %(delay)s seconds") % msg_dict
|
||||
@ -467,7 +527,7 @@ class Connection(object):
|
||||
consumers = self.consumers
|
||||
self.consumers = {}
|
||||
|
||||
for consumer in consumers.itervalues():
|
||||
for consumer in six.itervalues(consumers):
|
||||
consumer.reconnect(self.session)
|
||||
self._register_consumer(consumer)
|
||||
|
||||
@ -478,20 +538,26 @@ class Connection(object):
|
||||
try:
|
||||
return method(*args, **kwargs)
|
||||
except (qpid_exceptions.Empty,
|
||||
qpid_exceptions.ConnectionError), e:
|
||||
qpid_exceptions.ConnectionError) as e:
|
||||
if error_callback:
|
||||
error_callback(e)
|
||||
self.reconnect()
|
||||
|
||||
def close(self):
|
||||
"""Close/release this connection"""
|
||||
"""Close/release this connection."""
|
||||
self.cancel_consumer_thread()
|
||||
self.wait_on_proxy_callbacks()
|
||||
self.connection.close()
|
||||
try:
|
||||
self.connection.close()
|
||||
except Exception:
|
||||
# NOTE(dripton) Logging exceptions that happen during cleanup just
|
||||
# causes confusion; there's really nothing useful we can do with
|
||||
# them.
|
||||
pass
|
||||
self.connection = None
|
||||
|
||||
def reset(self):
|
||||
"""Reset a connection so it can be used again"""
|
||||
"""Reset a connection so it can be used again."""
|
||||
self.cancel_consumer_thread()
|
||||
self.wait_on_proxy_callbacks()
|
||||
self.session.close()
|
||||
@ -515,7 +581,7 @@ class Connection(object):
|
||||
return self.ensure(_connect_error, _declare_consumer)
|
||||
|
||||
def iterconsume(self, limit=None, timeout=None):
|
||||
"""Return an iterator that will consume from all queues/consumers"""
|
||||
"""Return an iterator that will consume from all queues/consumers."""
|
||||
|
||||
def _error_callback(exc):
|
||||
if isinstance(exc, qpid_exceptions.Empty):
|
||||
@ -539,7 +605,7 @@ class Connection(object):
|
||||
yield self.ensure(_error_callback, _consume)
|
||||
|
||||
def cancel_consumer_thread(self):
|
||||
"""Cancel a consumer thread"""
|
||||
"""Cancel a consumer thread."""
|
||||
if self.consumer_thread is not None:
|
||||
self.consumer_thread.kill()
|
||||
try:
|
||||
@ -554,7 +620,7 @@ class Connection(object):
|
||||
proxy_cb.wait()
|
||||
|
||||
def publisher_send(self, cls, topic, msg):
|
||||
"""Send to a publisher based on the publisher class"""
|
||||
"""Send to a publisher based on the publisher class."""
|
||||
|
||||
def _connect_error(exc):
|
||||
log_info = {'topic': topic, 'err_str': str(exc)}
|
||||
@ -584,15 +650,15 @@ class Connection(object):
|
||||
topic, callback)
|
||||
|
||||
def declare_fanout_consumer(self, topic, callback):
|
||||
"""Create a 'fanout' consumer"""
|
||||
"""Create a 'fanout' consumer."""
|
||||
self.declare_consumer(FanoutConsumer, topic, callback)
|
||||
|
||||
def direct_send(self, msg_id, msg):
|
||||
"""Send a 'direct' message"""
|
||||
"""Send a 'direct' message."""
|
||||
self.publisher_send(DirectPublisher, msg_id, msg)
|
||||
|
||||
def topic_send(self, topic, msg, timeout=None):
|
||||
"""Send a 'topic' message"""
|
||||
"""Send a 'topic' message."""
|
||||
#
|
||||
# We want to create a message with attributes, e.g. a TTL. We
|
||||
# don't really need to keep 'msg' in its JSON format any longer
|
||||
@ -607,24 +673,25 @@ class Connection(object):
|
||||
self.publisher_send(TopicPublisher, topic, qpid_message)
|
||||
|
||||
def fanout_send(self, topic, msg):
|
||||
"""Send a 'fanout' message"""
|
||||
"""Send a 'fanout' message."""
|
||||
self.publisher_send(FanoutPublisher, topic, msg)
|
||||
|
||||
def notify_send(self, topic, msg, **kwargs):
|
||||
"""Send a notify message on a topic"""
|
||||
"""Send a notify message on a topic."""
|
||||
self.publisher_send(NotifyPublisher, topic, msg)
|
||||
|
||||
def consume(self, limit=None):
|
||||
"""Consume from all queues/consumers"""
|
||||
"""Consume from all queues/consumers."""
|
||||
it = self.iterconsume(limit=limit)
|
||||
while True:
|
||||
try:
|
||||
it.next()
|
||||
six.next(it)
|
||||
except StopIteration:
|
||||
return
|
||||
|
||||
def consume_in_thread(self):
|
||||
"""Consumer from all queues/consumers in a greenthread"""
|
||||
"""Consumer from all queues/consumers in a greenthread."""
|
||||
@excutils.forever_retry_uncaught_exceptions
|
||||
def _consumer_thread():
|
||||
try:
|
||||
self.consume()
|
||||
@ -635,7 +702,7 @@ class Connection(object):
|
||||
return self.consumer_thread
|
||||
|
||||
def create_consumer(self, topic, proxy, fanout=False):
|
||||
"""Create a consumer that calls a method in a proxy object"""
|
||||
"""Create a consumer that calls a method in a proxy object."""
|
||||
proxy_cb = rpc_amqp.ProxyCallback(
|
||||
self.conf, proxy,
|
||||
rpc_amqp.get_connection_pool(self.conf, Connection))
|
||||
@ -651,7 +718,7 @@ class Connection(object):
|
||||
return consumer
|
||||
|
||||
def create_worker(self, topic, proxy, pool_name):
|
||||
"""Create a worker that calls a method in a proxy object"""
|
||||
"""Create a worker that calls a method in a proxy object."""
|
||||
proxy_cb = rpc_amqp.ProxyCallback(
|
||||
self.conf, proxy,
|
||||
rpc_amqp.get_connection_pool(self.conf, Connection))
|
||||
@ -665,7 +732,7 @@ class Connection(object):
|
||||
return consumer
|
||||
|
||||
def join_consumer_pool(self, callback, pool_name, topic,
|
||||
exchange_name=None):
|
||||
exchange_name=None, ack_on_error=True):
|
||||
"""Register as a member of a group of consumers for a given topic from
|
||||
the specified exchange.
|
||||
|
||||
@ -679,6 +746,7 @@ class Connection(object):
|
||||
callback=callback,
|
||||
connection_pool=rpc_amqp.get_connection_pool(self.conf,
|
||||
Connection),
|
||||
wait_for_consumers=not ack_on_error
|
||||
)
|
||||
self.proxy_callbacks.append(callback_wrapper)
|
||||
|
||||
@ -694,7 +762,7 @@ class Connection(object):
|
||||
|
||||
|
||||
def create_connection(conf, new=True):
|
||||
"""Create a connection"""
|
||||
"""Create a connection."""
|
||||
return rpc_amqp.create_connection(
|
||||
conf, new,
|
||||
rpc_amqp.get_connection_pool(conf, Connection))
|
||||
|
@ -1,5 +1,3 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2011 Cloudscaling Group, Inc
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
@ -25,6 +23,8 @@ import uuid
|
||||
import eventlet
|
||||
import greenlet
|
||||
from oslo.config import cfg
|
||||
import six
|
||||
from six import moves
|
||||
|
||||
from cinder.openstack.common import excutils
|
||||
from cinder.openstack.common.gettextutils import _
|
||||
@ -192,7 +192,7 @@ class ZmqSocket(object):
|
||||
# it would be much worse if some of the code calling this
|
||||
# were to fail. For now, lets log, and later evaluate
|
||||
# if we can safely raise here.
|
||||
LOG.error("ZeroMQ socket could not be closed.")
|
||||
LOG.error(_("ZeroMQ socket could not be closed."))
|
||||
self.sock = None
|
||||
|
||||
def recv(self, **kwargs):
|
||||
@ -221,7 +221,7 @@ class ZmqClient(object):
|
||||
return
|
||||
|
||||
rpc_envelope = rpc_common.serialize_msg(data[1], envelope)
|
||||
zmq_msg = reduce(lambda x, y: x + y, rpc_envelope.items())
|
||||
zmq_msg = moves.reduce(lambda x, y: x + y, rpc_envelope.items())
|
||||
self.outq.send(map(bytes,
|
||||
(msg_id, topic, 'impl_zmq_v2', data[0]) + zmq_msg))
|
||||
|
||||
@ -383,6 +383,7 @@ class ZmqBaseReactor(ConsumerBase):
|
||||
LOG.info(_("In reactor registered"))
|
||||
|
||||
def consume_in_thread(self):
|
||||
@excutils.forever_retry_uncaught_exceptions
|
||||
def _consume(sock):
|
||||
LOG.info(_("Consuming socket"))
|
||||
while True:
|
||||
@ -522,8 +523,8 @@ def unflatten_envelope(packenv):
|
||||
h = {}
|
||||
try:
|
||||
while True:
|
||||
k = i.next()
|
||||
h[k] = i.next()
|
||||
k = six.next(i)
|
||||
h[k] = six.next(i)
|
||||
except StopIteration:
|
||||
return h
|
||||
|
||||
@ -584,7 +585,7 @@ class Connection(rpc_common.Connection):
|
||||
else:
|
||||
sock_type = zmq.PULL
|
||||
subscribe = None
|
||||
topic = '.'.join((topic, CONF.rpc_zmq_host))
|
||||
topic = '.'.join((topic.split('.', 1)[0], CONF.rpc_zmq_host))
|
||||
|
||||
if topic in self.topics:
|
||||
LOG.info(_("Skipping topic registration. Already registered."))
|
||||
@ -768,9 +769,7 @@ def fanout_cast(conf, context, topic, msg, **kwargs):
|
||||
"""Send a message to all listening and expect no reply."""
|
||||
# NOTE(ewindisch): fanout~ is used because it avoid splitting on .
|
||||
# and acts as a non-subtle hint to the matchmaker and ZmqProxy.
|
||||
LOG.error(_('topic is %s.') % topic)
|
||||
if topic:
|
||||
_multi_send(_cast, context, 'fanout~' + str(topic), msg, **kwargs)
|
||||
_multi_send(_cast, context, 'fanout~' + str(topic), msg, **kwargs)
|
||||
|
||||
|
||||
def notify(conf, context, topic, msg, envelope):
|
||||
|
@ -1,5 +1,3 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2011 Cloudscaling Group, Inc
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
@ -19,8 +17,6 @@ return keys for direct exchanges, per (approximate) AMQP parlance.
|
||||
"""
|
||||
|
||||
import contextlib
|
||||
import itertools
|
||||
import json
|
||||
|
||||
import eventlet
|
||||
from oslo.config import cfg
|
||||
@ -30,10 +26,6 @@ from cinder.openstack.common import log as logging
|
||||
|
||||
|
||||
matchmaker_opts = [
|
||||
# Matchmaker ring file
|
||||
cfg.StrOpt('matchmaker_ringfile',
|
||||
default='/etc/nova/matchmaker_ring.json',
|
||||
help='Matchmaker ring file (JSON)'),
|
||||
cfg.IntOpt('matchmaker_heartbeat_freq',
|
||||
default=300,
|
||||
help='Heartbeat frequency'),
|
||||
@ -54,8 +46,8 @@ class MatchMakerException(Exception):
|
||||
|
||||
|
||||
class Exchange(object):
|
||||
"""
|
||||
Implements lookups.
|
||||
"""Implements lookups.
|
||||
|
||||
Subclass this to support hashtables, dns, etc.
|
||||
"""
|
||||
def __init__(self):
|
||||
@ -66,9 +58,7 @@ class Exchange(object):
|
||||
|
||||
|
||||
class Binding(object):
|
||||
"""
|
||||
A binding on which to perform a lookup.
|
||||
"""
|
||||
"""A binding on which to perform a lookup."""
|
||||
def __init__(self):
|
||||
pass
|
||||
|
||||
@ -77,10 +67,10 @@ class Binding(object):
|
||||
|
||||
|
||||
class MatchMakerBase(object):
|
||||
"""
|
||||
Match Maker Base Class.
|
||||
Build off HeartbeatMatchMakerBase if building a
|
||||
heartbeat-capable MatchMaker.
|
||||
"""Match Maker Base Class.
|
||||
|
||||
Build off HeartbeatMatchMakerBase if building a heartbeat-capable
|
||||
MatchMaker.
|
||||
"""
|
||||
def __init__(self):
|
||||
# Array of tuples. Index [2] toggles negation, [3] is last-if-true
|
||||
@ -90,58 +80,47 @@ class MatchMakerBase(object):
|
||||
'registration or heartbeat.')
|
||||
|
||||
def register(self, key, host):
|
||||
"""
|
||||
Register a host on a backend.
|
||||
"""Register a host on a backend.
|
||||
|
||||
Heartbeats, if applicable, may keepalive registration.
|
||||
"""
|
||||
pass
|
||||
|
||||
def ack_alive(self, key, host):
|
||||
"""
|
||||
Acknowledge that a key.host is alive.
|
||||
Used internally for updating heartbeats,
|
||||
but may also be used publically to acknowledge
|
||||
a system is alive (i.e. rpc message successfully
|
||||
sent to host)
|
||||
"""Acknowledge that a key.host is alive.
|
||||
|
||||
Used internally for updating heartbeats, but may also be used
|
||||
publicly to acknowledge a system is alive (i.e. rpc message
|
||||
successfully sent to host)
|
||||
"""
|
||||
pass
|
||||
|
||||
def is_alive(self, topic, host):
|
||||
"""
|
||||
Checks if a host is alive.
|
||||
"""
|
||||
"""Checks if a host is alive."""
|
||||
pass
|
||||
|
||||
def expire(self, topic, host):
|
||||
"""
|
||||
Explicitly expire a host's registration.
|
||||
"""
|
||||
"""Explicitly expire a host's registration."""
|
||||
pass
|
||||
|
||||
def send_heartbeats(self):
|
||||
"""
|
||||
Send all heartbeats.
|
||||
"""Send all heartbeats.
|
||||
|
||||
Use start_heartbeat to spawn a heartbeat greenthread,
|
||||
which loops this method.
|
||||
"""
|
||||
pass
|
||||
|
||||
def unregister(self, key, host):
|
||||
"""
|
||||
Unregister a topic.
|
||||
"""
|
||||
"""Unregister a topic."""
|
||||
pass
|
||||
|
||||
def start_heartbeat(self):
|
||||
"""
|
||||
Spawn heartbeat greenthread.
|
||||
"""
|
||||
"""Spawn heartbeat greenthread."""
|
||||
pass
|
||||
|
||||
def stop_heartbeat(self):
|
||||
"""
|
||||
Destroys the heartbeat greenthread.
|
||||
"""
|
||||
"""Destroys the heartbeat greenthread."""
|
||||
pass
|
||||
|
||||
def add_binding(self, binding, rule, last=True):
|
||||
@ -168,10 +147,10 @@ class MatchMakerBase(object):
|
||||
|
||||
|
||||
class HeartbeatMatchMakerBase(MatchMakerBase):
|
||||
"""
|
||||
Base for a heart-beat capable MatchMaker.
|
||||
Provides common methods for registering,
|
||||
unregistering, and maintaining heartbeats.
|
||||
"""Base for a heart-beat capable MatchMaker.
|
||||
|
||||
Provides common methods for registering, unregistering, and maintaining
|
||||
heartbeats.
|
||||
"""
|
||||
def __init__(self):
|
||||
self.hosts = set()
|
||||
@ -181,8 +160,8 @@ class HeartbeatMatchMakerBase(MatchMakerBase):
|
||||
super(HeartbeatMatchMakerBase, self).__init__()
|
||||
|
||||
def send_heartbeats(self):
|
||||
"""
|
||||
Send all heartbeats.
|
||||
"""Send all heartbeats.
|
||||
|
||||
Use start_heartbeat to spawn a heartbeat greenthread,
|
||||
which loops this method.
|
||||
"""
|
||||
@ -190,32 +169,31 @@ class HeartbeatMatchMakerBase(MatchMakerBase):
|
||||
self.ack_alive(key, host)
|
||||
|
||||
def ack_alive(self, key, host):
|
||||
"""
|
||||
Acknowledge that a host.topic is alive.
|
||||
Used internally for updating heartbeats,
|
||||
but may also be used publically to acknowledge
|
||||
a system is alive (i.e. rpc message successfully
|
||||
sent to host)
|
||||
"""Acknowledge that a host.topic is alive.
|
||||
|
||||
Used internally for updating heartbeats, but may also be used
|
||||
publicly to acknowledge a system is alive (i.e. rpc message
|
||||
successfully sent to host)
|
||||
"""
|
||||
raise NotImplementedError("Must implement ack_alive")
|
||||
|
||||
def backend_register(self, key, host):
|
||||
"""
|
||||
Implements registration logic.
|
||||
"""Implements registration logic.
|
||||
|
||||
Called by register(self,key,host)
|
||||
"""
|
||||
raise NotImplementedError("Must implement backend_register")
|
||||
|
||||
def backend_unregister(self, key, key_host):
|
||||
"""
|
||||
Implements de-registration logic.
|
||||
"""Implements de-registration logic.
|
||||
|
||||
Called by unregister(self,key,host)
|
||||
"""
|
||||
raise NotImplementedError("Must implement backend_unregister")
|
||||
|
||||
def register(self, key, host):
|
||||
"""
|
||||
Register a host on a backend.
|
||||
"""Register a host on a backend.
|
||||
|
||||
Heartbeats, if applicable, may keepalive registration.
|
||||
"""
|
||||
self.hosts.add(host)
|
||||
@ -227,25 +205,24 @@ class HeartbeatMatchMakerBase(MatchMakerBase):
|
||||
self.ack_alive(key, host)
|
||||
|
||||
def unregister(self, key, host):
|
||||
"""
|
||||
Unregister a topic.
|
||||
"""
|
||||
"""Unregister a topic."""
|
||||
if (key, host) in self.host_topic:
|
||||
del self.host_topic[(key, host)]
|
||||
|
||||
self.hosts.discard(host)
|
||||
self.backend_unregister(key, '.'.join((key, host)))
|
||||
|
||||
LOG.info(_("Matchmaker unregistered: %s, %s" % (key, host)))
|
||||
LOG.info(_("Matchmaker unregistered: %(key)s, %(host)s"),
|
||||
{'key': key, 'host': host})
|
||||
|
||||
def start_heartbeat(self):
|
||||
"""
|
||||
Implementation of MatchMakerBase.start_heartbeat
|
||||
"""Implementation of MatchMakerBase.start_heartbeat.
|
||||
|
||||
Launches greenthread looping send_heartbeats(),
|
||||
yielding for CONF.matchmaker_heartbeat_freq seconds
|
||||
between iterations.
|
||||
"""
|
||||
if len(self.hosts) == 0:
|
||||
if not self.hosts:
|
||||
raise MatchMakerException(
|
||||
_("Register before starting heartbeat."))
|
||||
|
||||
@ -257,45 +234,37 @@ class HeartbeatMatchMakerBase(MatchMakerBase):
|
||||
self._heart = eventlet.spawn(do_heartbeat)
|
||||
|
||||
def stop_heartbeat(self):
|
||||
"""
|
||||
Destroys the heartbeat greenthread.
|
||||
"""
|
||||
"""Destroys the heartbeat greenthread."""
|
||||
if self._heart:
|
||||
self._heart.kill()
|
||||
|
||||
|
||||
class DirectBinding(Binding):
|
||||
"""
|
||||
Specifies a host in the key via a '.' character
|
||||
"""Specifies a host in the key via a '.' character.
|
||||
|
||||
Although dots are used in the key, the behavior here is
|
||||
that it maps directly to a host, thus direct.
|
||||
"""
|
||||
def test(self, key):
|
||||
if '.' in key:
|
||||
return True
|
||||
return False
|
||||
return '.' in key
|
||||
|
||||
|
||||
class TopicBinding(Binding):
|
||||
"""
|
||||
Where a 'bare' key without dots.
|
||||
"""Where a 'bare' key without dots.
|
||||
|
||||
AMQP generally considers topic exchanges to be those *with* dots,
|
||||
but we deviate here in terminology as the behavior here matches
|
||||
that of a topic exchange (whereas where there are dots, behavior
|
||||
matches that of a direct exchange.
|
||||
"""
|
||||
def test(self, key):
|
||||
if '.' not in key:
|
||||
return True
|
||||
return False
|
||||
return '.' not in key
|
||||
|
||||
|
||||
class FanoutBinding(Binding):
|
||||
"""Match on fanout keys, where key starts with 'fanout.' string."""
|
||||
def test(self, key):
|
||||
if key.startswith('fanout~'):
|
||||
return True
|
||||
return False
|
||||
return key.startswith('fanout~')
|
||||
|
||||
|
||||
class StubExchange(Exchange):
|
||||
@ -304,67 +273,6 @@ class StubExchange(Exchange):
|
||||
return [(key, None)]
|
||||
|
||||
|
||||
class RingExchange(Exchange):
|
||||
"""
|
||||
Match Maker where hosts are loaded from a static file containing
|
||||
a hashmap (JSON formatted).
|
||||
|
||||
__init__ takes optional ring dictionary argument, otherwise
|
||||
loads the ringfile from CONF.mathcmaker_ringfile.
|
||||
"""
|
||||
def __init__(self, ring=None):
|
||||
super(RingExchange, self).__init__()
|
||||
|
||||
if ring:
|
||||
self.ring = ring
|
||||
else:
|
||||
fh = open(CONF.matchmaker_ringfile, 'r')
|
||||
self.ring = json.load(fh)
|
||||
fh.close()
|
||||
|
||||
self.ring0 = {}
|
||||
for k in self.ring.keys():
|
||||
self.ring0[k] = itertools.cycle(self.ring[k])
|
||||
|
||||
def _ring_has(self, key):
|
||||
if key in self.ring0:
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
class RoundRobinRingExchange(RingExchange):
|
||||
"""A Topic Exchange based on a hashmap."""
|
||||
def __init__(self, ring=None):
|
||||
super(RoundRobinRingExchange, self).__init__(ring)
|
||||
|
||||
def run(self, key):
|
||||
if not self._ring_has(key):
|
||||
LOG.warn(
|
||||
_("No key defining hosts for topic '%s', "
|
||||
"see ringfile") % (key, )
|
||||
)
|
||||
return []
|
||||
host = next(self.ring0[key])
|
||||
return [(key + '.' + host, host)]
|
||||
|
||||
|
||||
class FanoutRingExchange(RingExchange):
|
||||
"""Fanout Exchange based on a hashmap."""
|
||||
def __init__(self, ring=None):
|
||||
super(FanoutRingExchange, self).__init__(ring)
|
||||
|
||||
def run(self, key):
|
||||
# Assume starts with "fanout~", strip it for lookup.
|
||||
nkey = key.split('fanout~')[1:][0]
|
||||
if not self._ring_has(nkey):
|
||||
LOG.warn(
|
||||
_("No key defining hosts for topic '%s', "
|
||||
"see ringfile") % (nkey, )
|
||||
)
|
||||
return []
|
||||
return map(lambda x: (key + '.' + x, x), self.ring[nkey])
|
||||
|
||||
|
||||
class LocalhostExchange(Exchange):
|
||||
"""Exchange where all direct topics are local."""
|
||||
def __init__(self, host='localhost'):
|
||||
@ -376,8 +284,8 @@ class LocalhostExchange(Exchange):
|
||||
|
||||
|
||||
class DirectExchange(Exchange):
|
||||
"""
|
||||
Exchange where all topic keys are split, sending to second half.
|
||||
"""Exchange where all topic keys are split, sending to second half.
|
||||
|
||||
i.e. "compute.host" sends a message to "compute.host" running on "host"
|
||||
"""
|
||||
def __init__(self):
|
||||
@ -388,20 +296,9 @@ class DirectExchange(Exchange):
|
||||
return [(key, e)]
|
||||
|
||||
|
||||
class MatchMakerRing(MatchMakerBase):
|
||||
"""
|
||||
Match Maker where hosts are loaded from a static hashmap.
|
||||
"""
|
||||
def __init__(self, ring=None):
|
||||
super(MatchMakerRing, self).__init__()
|
||||
self.add_binding(FanoutBinding(), FanoutRingExchange(ring))
|
||||
self.add_binding(DirectBinding(), DirectExchange())
|
||||
self.add_binding(TopicBinding(), RoundRobinRingExchange(ring))
|
||||
|
||||
|
||||
class MatchMakerLocalhost(MatchMakerBase):
|
||||
"""
|
||||
Match Maker where all bare topics resolve to localhost.
|
||||
"""Match Maker where all bare topics resolve to localhost.
|
||||
|
||||
Useful for testing.
|
||||
"""
|
||||
def __init__(self, host='localhost'):
|
||||
@ -412,13 +309,13 @@ class MatchMakerLocalhost(MatchMakerBase):
|
||||
|
||||
|
||||
class MatchMakerStub(MatchMakerBase):
|
||||
"""
|
||||
Match Maker where topics are untouched.
|
||||
"""Match Maker where topics are untouched.
|
||||
|
||||
Useful for testing, or for AMQP/brokered queues.
|
||||
Will not work where knowledge of hosts is known (i.e. zeromq)
|
||||
"""
|
||||
def __init__(self):
|
||||
super(MatchMakerLocalhost, self).__init__()
|
||||
super(MatchMakerStub, self).__init__()
|
||||
|
||||
self.add_binding(FanoutBinding(), StubExchange())
|
||||
self.add_binding(DirectBinding(), StubExchange())
|
||||
|
@ -1,5 +1,3 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2013 Cloudscaling Group, Inc
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
@ -55,8 +53,8 @@ class RedisExchange(mm_common.Exchange):
|
||||
|
||||
|
||||
class RedisTopicExchange(RedisExchange):
|
||||
"""
|
||||
Exchange where all topic keys are split, sending to second half.
|
||||
"""Exchange where all topic keys are split, sending to second half.
|
||||
|
||||
i.e. "compute.host" sends a message to "compute" running on "host"
|
||||
"""
|
||||
def run(self, topic):
|
||||
@ -77,9 +75,7 @@ class RedisTopicExchange(RedisExchange):
|
||||
|
||||
|
||||
class RedisFanoutExchange(RedisExchange):
|
||||
"""
|
||||
Return a list of all hosts.
|
||||
"""
|
||||
"""Return a list of all hosts."""
|
||||
def run(self, topic):
|
||||
topic = topic.split('~', 1)[1]
|
||||
hosts = self.redis.smembers(topic)
|
||||
@ -90,16 +86,14 @@ class RedisFanoutExchange(RedisExchange):
|
||||
|
||||
|
||||
class MatchMakerRedis(mm_common.HeartbeatMatchMakerBase):
|
||||
"""
|
||||
MatchMaker registering and looking-up hosts with a Redis server.
|
||||
"""
|
||||
"""MatchMaker registering and looking-up hosts with a Redis server."""
|
||||
def __init__(self):
|
||||
super(MatchMakerRedis, self).__init__()
|
||||
|
||||
if not redis:
|
||||
raise ImportError("Failed to import module redis.")
|
||||
|
||||
self.redis = redis.StrictRedis(
|
||||
self.redis = redis.Redis(
|
||||
host=CONF.matchmaker_redis.host,
|
||||
port=CONF.matchmaker_redis.port,
|
||||
password=CONF.matchmaker_redis.password)
|
||||
|
@ -1,5 +1,3 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2011-2013 Cloudscaling Group, Inc
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
@ -23,7 +21,7 @@ import json
|
||||
|
||||
from oslo.config import cfg
|
||||
|
||||
from cinder.openstack.common.gettextutils import _ # noqa
|
||||
from cinder.openstack.common.gettextutils import _
|
||||
from cinder.openstack.common import log as logging
|
||||
from cinder.openstack.common.rpc import matchmaker as mm
|
||||
|
||||
@ -63,9 +61,7 @@ class RingExchange(mm.Exchange):
|
||||
self.ring0[k] = itertools.cycle(self.ring[k])
|
||||
|
||||
def _ring_has(self, key):
|
||||
if key in self.ring0:
|
||||
return True
|
||||
return False
|
||||
return key in self.ring0
|
||||
|
||||
|
||||
class RoundRobinRingExchange(RingExchange):
|
||||
|
@ -1,6 +1,4 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2012 Red Hat, Inc.
|
||||
# Copyright 2012-2013 Red Hat, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
@ -21,8 +19,11 @@ For more information about rpc API version numbers, see:
|
||||
rpc/dispatcher.py
|
||||
"""
|
||||
|
||||
import six
|
||||
|
||||
from cinder.openstack.common import rpc
|
||||
from cinder.openstack.common.rpc import common as rpc_common
|
||||
from cinder.openstack.common.rpc import serializer as rpc_serializer
|
||||
|
||||
|
||||
class RpcProxy(object):
|
||||
@ -34,16 +35,28 @@ class RpcProxy(object):
|
||||
rpc API.
|
||||
"""
|
||||
|
||||
def __init__(self, topic, default_version):
|
||||
# The default namespace, which can be overridden in a subclass.
|
||||
RPC_API_NAMESPACE = None
|
||||
|
||||
def __init__(self, topic, default_version, version_cap=None,
|
||||
serializer=None):
|
||||
"""Initialize an RpcProxy.
|
||||
|
||||
:param topic: The topic to use for all messages.
|
||||
:param default_version: The default API version to request in all
|
||||
outgoing messages. This can be overridden on a per-message
|
||||
basis.
|
||||
:param version_cap: Optionally cap the maximum version used for sent
|
||||
messages.
|
||||
:param serializer: Optionaly (de-)serialize entities with a
|
||||
provided helper.
|
||||
"""
|
||||
self.topic = topic
|
||||
self.default_version = default_version
|
||||
self.version_cap = version_cap
|
||||
if serializer is None:
|
||||
serializer = rpc_serializer.NoOpSerializer()
|
||||
self.serializer = serializer
|
||||
super(RpcProxy, self).__init__()
|
||||
|
||||
def _set_version(self, msg, vers):
|
||||
@ -52,19 +65,44 @@ class RpcProxy(object):
|
||||
:param msg: The message having a version added to it.
|
||||
:param vers: The version number to add to the message.
|
||||
"""
|
||||
msg['version'] = vers if vers else self.default_version
|
||||
v = vers if vers else self.default_version
|
||||
if (self.version_cap and not
|
||||
rpc_common.version_is_compatible(self.version_cap, v)):
|
||||
raise rpc_common.RpcVersionCapError(version_cap=self.version_cap)
|
||||
msg['version'] = v
|
||||
|
||||
def _get_topic(self, topic):
|
||||
"""Return the topic to use for a message."""
|
||||
return topic if topic else self.topic
|
||||
|
||||
def can_send_version(self, version):
|
||||
"""Check to see if a version is compatible with the version cap."""
|
||||
return (not self.version_cap or
|
||||
rpc_common.version_is_compatible(self.version_cap, version))
|
||||
|
||||
@staticmethod
|
||||
def make_namespaced_msg(method, namespace, **kwargs):
|
||||
return {'method': method, 'namespace': namespace, 'args': kwargs}
|
||||
|
||||
@staticmethod
|
||||
def make_msg(method, **kwargs):
|
||||
return RpcProxy.make_namespaced_msg(method, None, **kwargs)
|
||||
def make_msg(self, method, **kwargs):
|
||||
return self.make_namespaced_msg(method, self.RPC_API_NAMESPACE,
|
||||
**kwargs)
|
||||
|
||||
def _serialize_msg_args(self, context, kwargs):
|
||||
"""Helper method called to serialize message arguments.
|
||||
|
||||
This calls our serializer on each argument, returning a new
|
||||
set of args that have been serialized.
|
||||
|
||||
:param context: The request context
|
||||
:param kwargs: The arguments to serialize
|
||||
:returns: A new set of serialized arguments
|
||||
"""
|
||||
new_kwargs = dict()
|
||||
for argname, arg in six.iteritems(kwargs):
|
||||
new_kwargs[argname] = self.serializer.serialize_entity(context,
|
||||
arg)
|
||||
return new_kwargs
|
||||
|
||||
def call(self, context, msg, topic=None, version=None, timeout=None):
|
||||
"""rpc.call() a remote method.
|
||||
@ -81,9 +119,11 @@ class RpcProxy(object):
|
||||
:returns: The return value from the remote method.
|
||||
"""
|
||||
self._set_version(msg, version)
|
||||
msg['args'] = self._serialize_msg_args(context, msg['args'])
|
||||
real_topic = self._get_topic(topic)
|
||||
try:
|
||||
return rpc.call(context, real_topic, msg, timeout)
|
||||
result = rpc.call(context, real_topic, msg, timeout)
|
||||
return self.serializer.deserialize_entity(context, result)
|
||||
except rpc.common.Timeout as exc:
|
||||
raise rpc.common.Timeout(
|
||||
exc.info, real_topic, msg.get('method'))
|
||||
@ -104,9 +144,11 @@ class RpcProxy(object):
|
||||
from the remote method as they arrive.
|
||||
"""
|
||||
self._set_version(msg, version)
|
||||
msg['args'] = self._serialize_msg_args(context, msg['args'])
|
||||
real_topic = self._get_topic(topic)
|
||||
try:
|
||||
return rpc.multicall(context, real_topic, msg, timeout)
|
||||
result = rpc.multicall(context, real_topic, msg, timeout)
|
||||
return self.serializer.deserialize_entity(context, result)
|
||||
except rpc.common.Timeout as exc:
|
||||
raise rpc.common.Timeout(
|
||||
exc.info, real_topic, msg.get('method'))
|
||||
@ -124,6 +166,7 @@ class RpcProxy(object):
|
||||
remote method.
|
||||
"""
|
||||
self._set_version(msg, version)
|
||||
msg['args'] = self._serialize_msg_args(context, msg['args'])
|
||||
rpc.cast(context, self._get_topic(topic), msg)
|
||||
|
||||
def fanout_cast(self, context, msg, topic=None, version=None):
|
||||
@ -139,6 +182,7 @@ class RpcProxy(object):
|
||||
from the remote method.
|
||||
"""
|
||||
self._set_version(msg, version)
|
||||
msg['args'] = self._serialize_msg_args(context, msg['args'])
|
||||
rpc.fanout_cast(context, self._get_topic(topic), msg)
|
||||
|
||||
def cast_to_server(self, context, server_params, msg, topic=None,
|
||||
@ -157,6 +201,7 @@ class RpcProxy(object):
|
||||
return values.
|
||||
"""
|
||||
self._set_version(msg, version)
|
||||
msg['args'] = self._serialize_msg_args(context, msg['args'])
|
||||
rpc.cast_to_server(context, server_params, self._get_topic(topic), msg)
|
||||
|
||||
def fanout_cast_to_server(self, context, server_params, msg, topic=None,
|
||||
@ -175,5 +220,6 @@ class RpcProxy(object):
|
||||
return values.
|
||||
"""
|
||||
self._set_version(msg, version)
|
||||
msg['args'] = self._serialize_msg_args(context, msg['args'])
|
||||
rpc.fanout_cast_to_server(context, server_params,
|
||||
self._get_topic(topic), msg)
|
||||
|
54
cinder/openstack/common/rpc/serializer.py
Normal file
54
cinder/openstack/common/rpc/serializer.py
Normal file
@ -0,0 +1,54 @@
|
||||
# Copyright 2013 IBM Corp.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
"""Provides the definition of an RPC serialization handler"""
|
||||
|
||||
import abc
|
||||
|
||||
import six
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class Serializer(object):
|
||||
"""Generic (de-)serialization definition base class."""
|
||||
|
||||
@abc.abstractmethod
|
||||
def serialize_entity(self, context, entity):
|
||||
"""Serialize something to primitive form.
|
||||
|
||||
:param context: Security context
|
||||
:param entity: Entity to be serialized
|
||||
:returns: Serialized form of entity
|
||||
"""
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def deserialize_entity(self, context, entity):
|
||||
"""Deserialize something from primitive form.
|
||||
|
||||
:param context: Security context
|
||||
:param entity: Primitive to be deserialized
|
||||
:returns: Deserialized form of entity
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
class NoOpSerializer(Serializer):
|
||||
"""A serializer that does nothing."""
|
||||
|
||||
def serialize_entity(self, context, entity):
|
||||
return entity
|
||||
|
||||
def deserialize_entity(self, context, entity):
|
||||
return entity
|
@ -1,5 +1,3 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2010 United States Government as represented by the
|
||||
# Administrator of the National Aeronautics and Space Administration.
|
||||
# All Rights Reserved.
|
||||
@ -30,11 +28,13 @@ LOG = logging.getLogger(__name__)
|
||||
class Service(service.Service):
|
||||
"""Service object for binaries running on hosts.
|
||||
|
||||
A service enables rpc by listening to queues based on topic and host."""
|
||||
def __init__(self, host, topic, manager=None):
|
||||
A service enables rpc by listening to queues based on topic and host.
|
||||
"""
|
||||
def __init__(self, host, topic, manager=None, serializer=None):
|
||||
super(Service, self).__init__()
|
||||
self.host = host
|
||||
self.topic = topic
|
||||
self.serializer = serializer
|
||||
if manager is None:
|
||||
self.manager = self
|
||||
else:
|
||||
@ -47,7 +47,8 @@ class Service(service.Service):
|
||||
LOG.debug(_("Creating Consumer connection for Service %s") %
|
||||
self.topic)
|
||||
|
||||
dispatcher = rpc_dispatcher.RpcDispatcher([self.manager])
|
||||
dispatcher = rpc_dispatcher.RpcDispatcher([self.manager],
|
||||
self.serializer)
|
||||
|
||||
# Share this same connection for these Consumers
|
||||
self.conn.create_consumer(self.topic, dispatcher, fanout=False)
|
||||
|
3
cinder/openstack/common/rpc/zmq_receiver.py
Executable file → Normal file
3
cinder/openstack/common/rpc/zmq_receiver.py
Executable file → Normal file
@ -1,6 +1,3 @@
|
||||
#!/usr/bin/env python
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2011 OpenStack Foundation
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
|
@ -113,7 +113,7 @@ class Service(service.Service):
|
||||
# Share this same connection for these Consumers
|
||||
self.conn.create_consumer(self.topic, rpc_dispatcher, fanout=False)
|
||||
|
||||
node_topic = '%s:%s' % (self.topic, self.host)
|
||||
node_topic = '%s.%s' % (self.topic, self.host)
|
||||
self.conn.create_consumer(node_topic, rpc_dispatcher, fanout=False)
|
||||
|
||||
self.conn.create_consumer(self.topic, rpc_dispatcher, fanout=True)
|
||||
|
@ -103,7 +103,7 @@ class VolumeRpcAPITestCase(test.TestCase):
|
||||
host = kwargs['host']
|
||||
else:
|
||||
host = kwargs['volume']['host']
|
||||
expected_topic = '%s:%s' % (CONF.volume_topic, host)
|
||||
expected_topic = '%s.%s' % (CONF.volume_topic, host)
|
||||
|
||||
self.fake_args = None
|
||||
self.fake_kwargs = None
|
||||
|
@ -648,8 +648,8 @@
|
||||
# by impl_zmq. (integer value)
|
||||
#rpc_cast_timeout=30
|
||||
|
||||
# Modules of exceptions that are permitted to be recreatedupon
|
||||
# receiving exception data from an rpc call. (list value)
|
||||
# Modules of exceptions that are permitted to be recreated
|
||||
# upon receiving exception data from an rpc call. (list value)
|
||||
#allowed_rpc_exception_modules=nova.exception,cinder.exception,exceptions
|
||||
|
||||
# If passed, use a fake RabbitMQ provider (boolean value)
|
||||
@ -664,10 +664,6 @@
|
||||
# Options defined in cinder.openstack.common.rpc.amqp
|
||||
#
|
||||
|
||||
# Enable a fast single reply queue if using AMQP based RPC
|
||||
# like RabbitMQ or Qpid. (boolean value)
|
||||
#amqp_rpc_single_reply_queue=false
|
||||
|
||||
# Use durable queues in amqp. (boolean value)
|
||||
# Deprecated group/name - [DEFAULT]/rabbit_durable_queues
|
||||
#amqp_durable_queues=false
|
||||
@ -680,8 +676,9 @@
|
||||
# Options defined in cinder.openstack.common.rpc.impl_kombu
|
||||
#
|
||||
|
||||
# SSL version to use (valid only if SSL enabled) (string
|
||||
# value)
|
||||
# SSL version to use (valid only if SSL enabled). valid values
|
||||
# are TLSv1, SSLv23 and SSLv3. SSLv2 may be available on some
|
||||
# distributions (string value)
|
||||
#kombu_ssl_version=
|
||||
|
||||
# SSL key file (valid only if SSL enabled) (string value)
|
||||
@ -813,9 +810,6 @@
|
||||
# Options defined in cinder.openstack.common.rpc.matchmaker
|
||||
#
|
||||
|
||||
# Matchmaker ring file (JSON) (string value)
|
||||
#matchmaker_ringfile=/etc/nova/matchmaker_ring.json
|
||||
|
||||
# Heartbeat frequency (integer value)
|
||||
#matchmaker_heartbeat_freq=300
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user