Fix the multi-backend storge issue for ZMQ.

This issue is not caused by the naming convention as described in Bug 1166899,
but due to the incorrect node_topic registration in ZMQ.

What have been done in this patch:
*Import the latest impl_zmq.py from oslo.
*Change the delimiter from "." to ":" between topic and host. "." will make
node_topic an invalid key for the registration in ZMQ.
*The node_topic should be registered correctly via
topic = '.'.join((topic, CONF.rpc_zmq_host)) in impl_zmq.py.
*Move init_host() in services.py downstairs to make sure the c-vol can be
launched successfully for ZMQ.

Fixed Bug 1166899.

Change-Id: Id982ab9482f08d69bdc68d389fb41a7752efa168
This commit is contained in:
Vincent Hou 2013-06-19 18:48:15 +08:00
parent 213ad6b8cb
commit a4f6ab0f3d
4 changed files with 64 additions and 96 deletions

View File

@ -287,7 +287,7 @@ def queue_get_for(context, topic, host):
Messages sent to the 'foo.<host>' topic are sent to the nova-foo service on Messages sent to the 'foo.<host>' topic are sent to the nova-foo service on
<host>. <host>.
""" """
return '%s.%s' % (topic, host) if host else topic return '%s:%s' % (topic, host) if host else topic
_RPCIMPL = None _RPCIMPL = None

View File

@ -30,7 +30,6 @@ from cinder.openstack.common import excutils
from cinder.openstack.common.gettextutils import _ from cinder.openstack.common.gettextutils import _
from cinder.openstack.common import importutils from cinder.openstack.common import importutils
from cinder.openstack.common import jsonutils from cinder.openstack.common import jsonutils
from cinder.openstack.common import processutils as utils
from cinder.openstack.common.rpc import common as rpc_common from cinder.openstack.common.rpc import common as rpc_common
zmq = importutils.try_import('eventlet.green.zmq') zmq = importutils.try_import('eventlet.green.zmq')
@ -85,8 +84,8 @@ matchmaker = None # memoized matchmaker object
def _serialize(data): def _serialize(data):
""" """Serialization wrapper.
Serialization wrapper
We prefer using JSON, but it cannot encode all types. We prefer using JSON, but it cannot encode all types.
Error if a developer passes us bad data. Error if a developer passes us bad data.
""" """
@ -98,18 +97,15 @@ def _serialize(data):
def _deserialize(data): def _deserialize(data):
""" """Deserialization wrapper."""
Deserialization wrapper
"""
LOG.debug(_("Deserializing: %s"), data) LOG.debug(_("Deserializing: %s"), data)
return jsonutils.loads(data) return jsonutils.loads(data)
class ZmqSocket(object): class ZmqSocket(object):
""" """A tiny wrapper around ZeroMQ.
A tiny wrapper around ZeroMQ to simplify the send/recv protocol
and connection management.
Simplifies the send/recv protocol and connection management.
Can be used as a Context (supports the 'with' statement). Can be used as a Context (supports the 'with' statement).
""" """
@ -180,7 +176,7 @@ class ZmqSocket(object):
return return
# We must unsubscribe, or we'll leak descriptors. # We must unsubscribe, or we'll leak descriptors.
if len(self.subscriptions) > 0: if self.subscriptions:
for f in self.subscriptions: for f in self.subscriptions:
try: try:
self.sock.setsockopt(zmq.UNSUBSCRIBE, f) self.sock.setsockopt(zmq.UNSUBSCRIBE, f)
@ -199,26 +195,24 @@ class ZmqSocket(object):
LOG.error("ZeroMQ socket could not be closed.") LOG.error("ZeroMQ socket could not be closed.")
self.sock = None self.sock = None
def recv(self): def recv(self, **kwargs):
if not self.can_recv: if not self.can_recv:
raise RPCException(_("You cannot recv on this socket.")) raise RPCException(_("You cannot recv on this socket."))
return self.sock.recv_multipart() return self.sock.recv_multipart(**kwargs)
def send(self, data): def send(self, data, **kwargs):
if not self.can_send: if not self.can_send:
raise RPCException(_("You cannot send on this socket.")) raise RPCException(_("You cannot send on this socket."))
self.sock.send_multipart(data) self.sock.send_multipart(data, **kwargs)
class ZmqClient(object): class ZmqClient(object):
"""Client for ZMQ sockets.""" """Client for ZMQ sockets."""
def __init__(self, addr, socket_type=None, bind=False): def __init__(self, addr):
if socket_type is None: self.outq = ZmqSocket(addr, zmq.PUSH, bind=False)
socket_type = zmq.PUSH
self.outq = ZmqSocket(addr, socket_type, bind=bind)
def cast(self, msg_id, topic, data, envelope=False): def cast(self, msg_id, topic, data, envelope):
msg_id = msg_id or 0 msg_id = msg_id or 0
if not envelope: if not envelope:
@ -282,7 +276,7 @@ class InternalContext(object):
except greenlet.GreenletExit: except greenlet.GreenletExit:
# ignore these since they are just from shutdowns # ignore these since they are just from shutdowns
pass pass
except rpc_common.ClientException, e: except rpc_common.ClientException as e:
LOG.debug(_("Expected exception during message handling (%s)") % LOG.debug(_("Expected exception during message handling (%s)") %
e._exc_info[1]) e._exc_info[1])
return {'exc': return {'exc':
@ -356,16 +350,14 @@ class ConsumerBase(object):
class ZmqBaseReactor(ConsumerBase): class ZmqBaseReactor(ConsumerBase):
""" """A consumer class implementing a centralized casting broker (PULL-PUSH).
A consumer class implementing a
centralized casting broker (PULL-PUSH) Used for RoundRobin requests.
for RoundRobin requests.
""" """
def __init__(self, conf): def __init__(self, conf):
super(ZmqBaseReactor, self).__init__() super(ZmqBaseReactor, self).__init__()
self.mapping = {}
self.proxies = {} self.proxies = {}
self.threads = [] self.threads = []
self.sockets = [] self.sockets = []
@ -373,9 +365,8 @@ class ZmqBaseReactor(ConsumerBase):
self.pool = eventlet.greenpool.GreenPool(conf.rpc_thread_pool_size) self.pool = eventlet.greenpool.GreenPool(conf.rpc_thread_pool_size)
def register(self, proxy, in_addr, zmq_type_in, out_addr=None, def register(self, proxy, in_addr, zmq_type_in,
zmq_type_out=None, in_bind=True, out_bind=True, in_bind=True, subscribe=None):
subscribe=None):
LOG.info(_("Registering reactor")) LOG.info(_("Registering reactor"))
@ -391,21 +382,6 @@ class ZmqBaseReactor(ConsumerBase):
LOG.info(_("In reactor registered")) LOG.info(_("In reactor registered"))
if not out_addr:
return
if zmq_type_out not in (zmq.PUSH, zmq.PUB):
raise RPCException("Bad output socktype")
# Items push out.
outq = ZmqSocket(out_addr, zmq_type_out, bind=out_bind)
self.mapping[inq] = outq
self.mapping[outq] = inq
self.sockets.append(outq)
LOG.info(_("Out reactor registered"))
def consume_in_thread(self): def consume_in_thread(self):
def _consume(sock): def _consume(sock):
LOG.info(_("Consuming socket")) LOG.info(_("Consuming socket"))
@ -430,10 +406,9 @@ class ZmqBaseReactor(ConsumerBase):
class ZmqProxy(ZmqBaseReactor): class ZmqProxy(ZmqBaseReactor):
""" """A consumer class implementing a topic-based proxy.
A consumer class implementing a
topic-based proxy, forwarding to Forwards to IPC sockets.
IPC sockets.
""" """
def __init__(self, conf): def __init__(self, conf):
@ -446,11 +421,8 @@ class ZmqProxy(ZmqBaseReactor):
def consume(self, sock): def consume(self, sock):
ipc_dir = CONF.rpc_zmq_ipc_dir ipc_dir = CONF.rpc_zmq_ipc_dir
#TODO(ewindisch): use zero-copy (i.e. references, not copying) data = sock.recv(copy=False)
data = sock.recv() topic = data[1].bytes
topic = data[1]
LOG.debug(_("CONSUMER GOT %s"), ' '.join(map(pformat, data)))
if topic.startswith('fanout~'): if topic.startswith('fanout~'):
sock_type = zmq.PUB sock_type = zmq.PUB
@ -492,9 +464,7 @@ class ZmqProxy(ZmqBaseReactor):
while(True): while(True):
data = self.topic_proxy[topic].get() data = self.topic_proxy[topic].get()
out_sock.send(data) out_sock.send(data, copy=False)
LOG.debug(_("ROUTER RELAY-OUT SUCCEEDED %(data)s") %
{'data': data})
wait_sock_creation = eventlet.event.Event() wait_sock_creation = eventlet.event.Event()
eventlet.spawn(publisher, wait_sock_creation) eventlet.spawn(publisher, wait_sock_creation)
@ -507,37 +477,34 @@ class ZmqProxy(ZmqBaseReactor):
try: try:
self.topic_proxy[topic].put_nowait(data) self.topic_proxy[topic].put_nowait(data)
LOG.debug(_("ROUTER RELAY-OUT QUEUED %(data)s") %
{'data': data})
except eventlet.queue.Full: except eventlet.queue.Full:
LOG.error(_("Local per-topic backlog buffer full for topic " LOG.error(_("Local per-topic backlog buffer full for topic "
"%(topic)s. Dropping message.") % {'topic': topic}) "%(topic)s. Dropping message.") % {'topic': topic})
def consume_in_thread(self): def consume_in_thread(self):
"""Runs the ZmqProxy service""" """Runs the ZmqProxy service."""
ipc_dir = CONF.rpc_zmq_ipc_dir ipc_dir = CONF.rpc_zmq_ipc_dir
consume_in = "tcp://%s:%s" % \ consume_in = "tcp://%s:%s" % \
(CONF.rpc_zmq_bind_address, (CONF.rpc_zmq_bind_address,
CONF.rpc_zmq_port) CONF.rpc_zmq_port)
consumption_proxy = InternalContext(None) consumption_proxy = InternalContext(None)
if not os.path.isdir(ipc_dir): try:
try: os.makedirs(ipc_dir)
utils.execute('mkdir', '-p', ipc_dir, run_as_root=True) except os.error:
utils.execute('chown', "%s:%s" % (os.getuid(), os.getgid()), if not os.path.isdir(ipc_dir):
ipc_dir, run_as_root=True)
utils.execute('chmod', '750', ipc_dir, run_as_root=True)
except utils.ProcessExecutionError:
with excutils.save_and_reraise_exception(): with excutils.save_and_reraise_exception():
LOG.error(_("Could not create IPC directory %s") % LOG.error(_("Required IPC directory does not exist at"
(ipc_dir, )) " %s") % (ipc_dir, ))
try: try:
self.register(consumption_proxy, self.register(consumption_proxy,
consume_in, consume_in,
zmq.PULL, zmq.PULL)
out_bind=True)
except zmq.ZMQError: except zmq.ZMQError:
if os.access(ipc_dir, os.X_OK):
with excutils.save_and_reraise_exception():
LOG.error(_("Permission denied to IPC directory at"
" %s") % (ipc_dir, ))
with excutils.save_and_reraise_exception(): with excutils.save_and_reraise_exception():
LOG.error(_("Could not create ZeroMQ receiver daemon. " LOG.error(_("Could not create ZeroMQ receiver daemon. "
"Socket may already be in use.")) "Socket may already be in use."))
@ -547,8 +514,9 @@ class ZmqProxy(ZmqBaseReactor):
def unflatten_envelope(packenv): def unflatten_envelope(packenv):
"""Unflattens the RPC envelope. """Unflattens the RPC envelope.
Takes a list and returns a dictionary.
i.e. [1,2,3,4] => {1: 2, 3: 4} Takes a list and returns a dictionary.
i.e. [1,2,3,4] => {1: 2, 3: 4}
""" """
i = iter(packenv) i = iter(packenv)
h = {} h = {}
@ -561,10 +529,9 @@ def unflatten_envelope(packenv):
class ZmqReactor(ZmqBaseReactor): class ZmqReactor(ZmqBaseReactor):
""" """A consumer class implementing a consumer for messages.
A consumer class implementing a
consumer for messages. Can also be Can also be used as a 1:1 proxy
used as a 1:1 proxy
""" """
def __init__(self, conf): def __init__(self, conf):
@ -574,11 +541,6 @@ class ZmqReactor(ZmqBaseReactor):
#TODO(ewindisch): use zero-copy (i.e. references, not copying) #TODO(ewindisch): use zero-copy (i.e. references, not copying)
data = sock.recv() data = sock.recv()
LOG.debug(_("CONSUMER RECEIVED DATA: %s"), data) LOG.debug(_("CONSUMER RECEIVED DATA: %s"), data)
if sock in self.mapping:
LOG.debug(_("ROUTER RELAY-OUT %(data)s") % {
'data': data})
self.mapping[sock].send(data)
return
proxy = self.proxies[sock] proxy = self.proxies[sock]
@ -622,7 +584,7 @@ class Connection(rpc_common.Connection):
else: else:
sock_type = zmq.PULL sock_type = zmq.PULL
subscribe = None subscribe = None
topic = '.'.join((topic.split('.', 1)[0], CONF.rpc_zmq_host)) topic = '.'.join((topic, CONF.rpc_zmq_host))
if topic in self.topics: if topic in self.topics:
LOG.info(_("Skipping topic registration. Already registered.")) LOG.info(_("Skipping topic registration. Already registered."))
@ -751,10 +713,9 @@ def _call(addr, context, topic, msg, timeout=None,
def _multi_send(method, context, topic, msg, timeout=None, def _multi_send(method, context, topic, msg, timeout=None,
envelope=False, _msg_id=None): envelope=False, _msg_id=None):
""" """Wraps the sending of messages.
Wraps the sending of messages,
dispatches to the matchmaker and sends Dispatches to the matchmaker and sends message to all relevant hosts.
message to all relevant hosts.
""" """
conf = CONF conf = CONF
LOG.debug(_("%(msg)s") % {'msg': ' '.join(map(pformat, (topic, msg)))}) LOG.debug(_("%(msg)s") % {'msg': ' '.join(map(pformat, (topic, msg)))})
@ -763,7 +724,7 @@ def _multi_send(method, context, topic, msg, timeout=None,
LOG.debug(_("Sending message(s) to: %s"), queues) LOG.debug(_("Sending message(s) to: %s"), queues)
# Don't stack if we have no matchmaker results # Don't stack if we have no matchmaker results
if len(queues) == 0: if not queues:
LOG.warn(_("No matchmaker results. Not casting.")) LOG.warn(_("No matchmaker results. Not casting."))
# While not strictly a timeout, callers know how to handle # While not strictly a timeout, callers know how to handle
# this exception and a timeout isn't too big a lie. # this exception and a timeout isn't too big a lie.
@ -807,12 +768,14 @@ def fanout_cast(conf, context, topic, msg, **kwargs):
"""Send a message to all listening and expect no reply.""" """Send a message to all listening and expect no reply."""
# NOTE(ewindisch): fanout~ is used because it avoid splitting on . # NOTE(ewindisch): fanout~ is used because it avoid splitting on .
# and acts as a non-subtle hint to the matchmaker and ZmqProxy. # and acts as a non-subtle hint to the matchmaker and ZmqProxy.
_multi_send(_cast, context, 'fanout~' + str(topic), msg, **kwargs) LOG.error(_('topic is %s.') % topic)
if topic:
_multi_send(_cast, context, 'fanout~' + str(topic), msg, **kwargs)
def notify(conf, context, topic, msg, envelope): def notify(conf, context, topic, msg, envelope):
""" """Send notification event.
Send notification event.
Notifications are sent to topic-priority. Notifications are sent to topic-priority.
This differs from the AMQP drivers which send to topic.priority. This differs from the AMQP drivers which send to topic.priority.
""" """
@ -846,6 +809,11 @@ def _get_ctxt():
def _get_matchmaker(*args, **kwargs): def _get_matchmaker(*args, **kwargs):
global matchmaker global matchmaker
if not matchmaker: if not matchmaker:
matchmaker = importutils.import_object( mm = CONF.rpc_zmq_matchmaker
CONF.rpc_zmq_matchmaker, *args, **kwargs) if mm.endswith('matchmaker.MatchMakerRing'):
mm.replace('matchmaker', 'matchmaker_ring')
LOG.warn(_('rpc_zmq_matchmaker = %(orig)s is deprecated; use'
' %(new)s instead') % dict(
orig=CONF.rpc_zmq_matchmaker, new=mm))
matchmaker = importutils.import_object(mm, *args, **kwargs)
return matchmaker return matchmaker

View File

@ -356,7 +356,6 @@ class Service(object):
version_string = version.version_string() version_string = version.version_string()
LOG.audit(_('Starting %(topic)s node (version %(version_string)s)'), LOG.audit(_('Starting %(topic)s node (version %(version_string)s)'),
{'topic': self.topic, 'version_string': version_string}) {'topic': self.topic, 'version_string': version_string})
self.manager.init_host()
self.model_disconnected = False self.model_disconnected = False
ctxt = context.get_admin_context() ctxt = context.get_admin_context()
try: try:
@ -376,13 +375,14 @@ class Service(object):
# Share this same connection for these Consumers # Share this same connection for these Consumers
self.conn.create_consumer(self.topic, rpc_dispatcher, fanout=False) self.conn.create_consumer(self.topic, rpc_dispatcher, fanout=False)
node_topic = '%s.%s' % (self.topic, self.host) node_topic = '%s:%s' % (self.topic, self.host)
self.conn.create_consumer(node_topic, rpc_dispatcher, fanout=False) self.conn.create_consumer(node_topic, rpc_dispatcher, fanout=False)
self.conn.create_consumer(self.topic, rpc_dispatcher, fanout=True) self.conn.create_consumer(self.topic, rpc_dispatcher, fanout=True)
# Consume from all consumers in a thread # Consume from all consumers in a thread
self.conn.consume_in_thread() self.conn.consume_in_thread()
self.manager.init_host()
if self.report_interval: if self.report_interval:
pulse = utils.LoopingCall(self.report_state) pulse = utils.LoopingCall(self.report_state)

View File

@ -94,7 +94,7 @@ class VolumeRpcAPITestCase(test.TestCase):
host = kwargs['host'] host = kwargs['host']
else: else:
host = kwargs['volume']['host'] host = kwargs['volume']['host']
expected_topic = '%s.%s' % (CONF.volume_topic, host) expected_topic = '%s:%s' % (CONF.volume_topic, host)
self.fake_args = None self.fake_args = None
self.fake_kwargs = None self.fake_kwargs = None