Merge "Fix the multi-backend storge issue for ZMQ."
This commit is contained in:
commit
b28e706a80
@ -287,7 +287,7 @@ def queue_get_for(context, topic, host):
|
||||
Messages sent to the 'foo.<host>' topic are sent to the nova-foo service on
|
||||
<host>.
|
||||
"""
|
||||
return '%s.%s' % (topic, host) if host else topic
|
||||
return '%s:%s' % (topic, host) if host else topic
|
||||
|
||||
|
||||
_RPCIMPL = None
|
||||
|
@ -30,7 +30,6 @@ from cinder.openstack.common import excutils
|
||||
from cinder.openstack.common.gettextutils import _
|
||||
from cinder.openstack.common import importutils
|
||||
from cinder.openstack.common import jsonutils
|
||||
from cinder.openstack.common import processutils as utils
|
||||
from cinder.openstack.common.rpc import common as rpc_common
|
||||
|
||||
zmq = importutils.try_import('eventlet.green.zmq')
|
||||
@ -85,8 +84,8 @@ matchmaker = None # memoized matchmaker object
|
||||
|
||||
|
||||
def _serialize(data):
|
||||
"""
|
||||
Serialization wrapper
|
||||
"""Serialization wrapper.
|
||||
|
||||
We prefer using JSON, but it cannot encode all types.
|
||||
Error if a developer passes us bad data.
|
||||
"""
|
||||
@ -98,18 +97,15 @@ def _serialize(data):
|
||||
|
||||
|
||||
def _deserialize(data):
|
||||
"""
|
||||
Deserialization wrapper
|
||||
"""
|
||||
"""Deserialization wrapper."""
|
||||
LOG.debug(_("Deserializing: %s"), data)
|
||||
return jsonutils.loads(data)
|
||||
|
||||
|
||||
class ZmqSocket(object):
|
||||
"""
|
||||
A tiny wrapper around ZeroMQ to simplify the send/recv protocol
|
||||
and connection management.
|
||||
"""A tiny wrapper around ZeroMQ.
|
||||
|
||||
Simplifies the send/recv protocol and connection management.
|
||||
Can be used as a Context (supports the 'with' statement).
|
||||
"""
|
||||
|
||||
@ -180,7 +176,7 @@ class ZmqSocket(object):
|
||||
return
|
||||
|
||||
# We must unsubscribe, or we'll leak descriptors.
|
||||
if len(self.subscriptions) > 0:
|
||||
if self.subscriptions:
|
||||
for f in self.subscriptions:
|
||||
try:
|
||||
self.sock.setsockopt(zmq.UNSUBSCRIBE, f)
|
||||
@ -199,26 +195,24 @@ class ZmqSocket(object):
|
||||
LOG.error("ZeroMQ socket could not be closed.")
|
||||
self.sock = None
|
||||
|
||||
def recv(self):
|
||||
def recv(self, **kwargs):
|
||||
if not self.can_recv:
|
||||
raise RPCException(_("You cannot recv on this socket."))
|
||||
return self.sock.recv_multipart()
|
||||
return self.sock.recv_multipart(**kwargs)
|
||||
|
||||
def send(self, data):
|
||||
def send(self, data, **kwargs):
|
||||
if not self.can_send:
|
||||
raise RPCException(_("You cannot send on this socket."))
|
||||
self.sock.send_multipart(data)
|
||||
self.sock.send_multipart(data, **kwargs)
|
||||
|
||||
|
||||
class ZmqClient(object):
|
||||
"""Client for ZMQ sockets."""
|
||||
|
||||
def __init__(self, addr, socket_type=None, bind=False):
|
||||
if socket_type is None:
|
||||
socket_type = zmq.PUSH
|
||||
self.outq = ZmqSocket(addr, socket_type, bind=bind)
|
||||
def __init__(self, addr):
|
||||
self.outq = ZmqSocket(addr, zmq.PUSH, bind=False)
|
||||
|
||||
def cast(self, msg_id, topic, data, envelope=False):
|
||||
def cast(self, msg_id, topic, data, envelope):
|
||||
msg_id = msg_id or 0
|
||||
|
||||
if not envelope:
|
||||
@ -282,7 +276,7 @@ class InternalContext(object):
|
||||
except greenlet.GreenletExit:
|
||||
# ignore these since they are just from shutdowns
|
||||
pass
|
||||
except rpc_common.ClientException, e:
|
||||
except rpc_common.ClientException as e:
|
||||
LOG.debug(_("Expected exception during message handling (%s)") %
|
||||
e._exc_info[1])
|
||||
return {'exc':
|
||||
@ -356,16 +350,14 @@ class ConsumerBase(object):
|
||||
|
||||
|
||||
class ZmqBaseReactor(ConsumerBase):
|
||||
"""
|
||||
A consumer class implementing a
|
||||
centralized casting broker (PULL-PUSH)
|
||||
for RoundRobin requests.
|
||||
"""A consumer class implementing a centralized casting broker (PULL-PUSH).
|
||||
|
||||
Used for RoundRobin requests.
|
||||
"""
|
||||
|
||||
def __init__(self, conf):
|
||||
super(ZmqBaseReactor, self).__init__()
|
||||
|
||||
self.mapping = {}
|
||||
self.proxies = {}
|
||||
self.threads = []
|
||||
self.sockets = []
|
||||
@ -373,9 +365,8 @@ class ZmqBaseReactor(ConsumerBase):
|
||||
|
||||
self.pool = eventlet.greenpool.GreenPool(conf.rpc_thread_pool_size)
|
||||
|
||||
def register(self, proxy, in_addr, zmq_type_in, out_addr=None,
|
||||
zmq_type_out=None, in_bind=True, out_bind=True,
|
||||
subscribe=None):
|
||||
def register(self, proxy, in_addr, zmq_type_in,
|
||||
in_bind=True, subscribe=None):
|
||||
|
||||
LOG.info(_("Registering reactor"))
|
||||
|
||||
@ -391,21 +382,6 @@ class ZmqBaseReactor(ConsumerBase):
|
||||
|
||||
LOG.info(_("In reactor registered"))
|
||||
|
||||
if not out_addr:
|
||||
return
|
||||
|
||||
if zmq_type_out not in (zmq.PUSH, zmq.PUB):
|
||||
raise RPCException("Bad output socktype")
|
||||
|
||||
# Items push out.
|
||||
outq = ZmqSocket(out_addr, zmq_type_out, bind=out_bind)
|
||||
|
||||
self.mapping[inq] = outq
|
||||
self.mapping[outq] = inq
|
||||
self.sockets.append(outq)
|
||||
|
||||
LOG.info(_("Out reactor registered"))
|
||||
|
||||
def consume_in_thread(self):
|
||||
def _consume(sock):
|
||||
LOG.info(_("Consuming socket"))
|
||||
@ -430,10 +406,9 @@ class ZmqBaseReactor(ConsumerBase):
|
||||
|
||||
|
||||
class ZmqProxy(ZmqBaseReactor):
|
||||
"""
|
||||
A consumer class implementing a
|
||||
topic-based proxy, forwarding to
|
||||
IPC sockets.
|
||||
"""A consumer class implementing a topic-based proxy.
|
||||
|
||||
Forwards to IPC sockets.
|
||||
"""
|
||||
|
||||
def __init__(self, conf):
|
||||
@ -446,11 +421,8 @@ class ZmqProxy(ZmqBaseReactor):
|
||||
def consume(self, sock):
|
||||
ipc_dir = CONF.rpc_zmq_ipc_dir
|
||||
|
||||
#TODO(ewindisch): use zero-copy (i.e. references, not copying)
|
||||
data = sock.recv()
|
||||
topic = data[1]
|
||||
|
||||
LOG.debug(_("CONSUMER GOT %s"), ' '.join(map(pformat, data)))
|
||||
data = sock.recv(copy=False)
|
||||
topic = data[1].bytes
|
||||
|
||||
if topic.startswith('fanout~'):
|
||||
sock_type = zmq.PUB
|
||||
@ -492,9 +464,7 @@ class ZmqProxy(ZmqBaseReactor):
|
||||
|
||||
while(True):
|
||||
data = self.topic_proxy[topic].get()
|
||||
out_sock.send(data)
|
||||
LOG.debug(_("ROUTER RELAY-OUT SUCCEEDED %(data)s") %
|
||||
{'data': data})
|
||||
out_sock.send(data, copy=False)
|
||||
|
||||
wait_sock_creation = eventlet.event.Event()
|
||||
eventlet.spawn(publisher, wait_sock_creation)
|
||||
@ -507,37 +477,34 @@ class ZmqProxy(ZmqBaseReactor):
|
||||
|
||||
try:
|
||||
self.topic_proxy[topic].put_nowait(data)
|
||||
LOG.debug(_("ROUTER RELAY-OUT QUEUED %(data)s") %
|
||||
{'data': data})
|
||||
except eventlet.queue.Full:
|
||||
LOG.error(_("Local per-topic backlog buffer full for topic "
|
||||
"%(topic)s. Dropping message.") % {'topic': topic})
|
||||
|
||||
def consume_in_thread(self):
|
||||
"""Runs the ZmqProxy service"""
|
||||
"""Runs the ZmqProxy service."""
|
||||
ipc_dir = CONF.rpc_zmq_ipc_dir
|
||||
consume_in = "tcp://%s:%s" % \
|
||||
(CONF.rpc_zmq_bind_address,
|
||||
CONF.rpc_zmq_port)
|
||||
consumption_proxy = InternalContext(None)
|
||||
|
||||
if not os.path.isdir(ipc_dir):
|
||||
try:
|
||||
utils.execute('mkdir', '-p', ipc_dir, run_as_root=True)
|
||||
utils.execute('chown', "%s:%s" % (os.getuid(), os.getgid()),
|
||||
ipc_dir, run_as_root=True)
|
||||
utils.execute('chmod', '750', ipc_dir, run_as_root=True)
|
||||
except utils.ProcessExecutionError:
|
||||
try:
|
||||
os.makedirs(ipc_dir)
|
||||
except os.error:
|
||||
if not os.path.isdir(ipc_dir):
|
||||
with excutils.save_and_reraise_exception():
|
||||
LOG.error(_("Could not create IPC directory %s") %
|
||||
(ipc_dir, ))
|
||||
|
||||
LOG.error(_("Required IPC directory does not exist at"
|
||||
" %s") % (ipc_dir, ))
|
||||
try:
|
||||
self.register(consumption_proxy,
|
||||
consume_in,
|
||||
zmq.PULL,
|
||||
out_bind=True)
|
||||
zmq.PULL)
|
||||
except zmq.ZMQError:
|
||||
if os.access(ipc_dir, os.X_OK):
|
||||
with excutils.save_and_reraise_exception():
|
||||
LOG.error(_("Permission denied to IPC directory at"
|
||||
" %s") % (ipc_dir, ))
|
||||
with excutils.save_and_reraise_exception():
|
||||
LOG.error(_("Could not create ZeroMQ receiver daemon. "
|
||||
"Socket may already be in use."))
|
||||
@ -547,8 +514,9 @@ class ZmqProxy(ZmqBaseReactor):
|
||||
|
||||
def unflatten_envelope(packenv):
|
||||
"""Unflattens the RPC envelope.
|
||||
Takes a list and returns a dictionary.
|
||||
i.e. [1,2,3,4] => {1: 2, 3: 4}
|
||||
|
||||
Takes a list and returns a dictionary.
|
||||
i.e. [1,2,3,4] => {1: 2, 3: 4}
|
||||
"""
|
||||
i = iter(packenv)
|
||||
h = {}
|
||||
@ -561,10 +529,9 @@ def unflatten_envelope(packenv):
|
||||
|
||||
|
||||
class ZmqReactor(ZmqBaseReactor):
|
||||
"""
|
||||
A consumer class implementing a
|
||||
consumer for messages. Can also be
|
||||
used as a 1:1 proxy
|
||||
"""A consumer class implementing a consumer for messages.
|
||||
|
||||
Can also be used as a 1:1 proxy
|
||||
"""
|
||||
|
||||
def __init__(self, conf):
|
||||
@ -574,11 +541,6 @@ class ZmqReactor(ZmqBaseReactor):
|
||||
#TODO(ewindisch): use zero-copy (i.e. references, not copying)
|
||||
data = sock.recv()
|
||||
LOG.debug(_("CONSUMER RECEIVED DATA: %s"), data)
|
||||
if sock in self.mapping:
|
||||
LOG.debug(_("ROUTER RELAY-OUT %(data)s") % {
|
||||
'data': data})
|
||||
self.mapping[sock].send(data)
|
||||
return
|
||||
|
||||
proxy = self.proxies[sock]
|
||||
|
||||
@ -622,7 +584,7 @@ class Connection(rpc_common.Connection):
|
||||
else:
|
||||
sock_type = zmq.PULL
|
||||
subscribe = None
|
||||
topic = '.'.join((topic.split('.', 1)[0], CONF.rpc_zmq_host))
|
||||
topic = '.'.join((topic, CONF.rpc_zmq_host))
|
||||
|
||||
if topic in self.topics:
|
||||
LOG.info(_("Skipping topic registration. Already registered."))
|
||||
@ -751,10 +713,9 @@ def _call(addr, context, topic, msg, timeout=None,
|
||||
|
||||
def _multi_send(method, context, topic, msg, timeout=None,
|
||||
envelope=False, _msg_id=None):
|
||||
"""
|
||||
Wraps the sending of messages,
|
||||
dispatches to the matchmaker and sends
|
||||
message to all relevant hosts.
|
||||
"""Wraps the sending of messages.
|
||||
|
||||
Dispatches to the matchmaker and sends message to all relevant hosts.
|
||||
"""
|
||||
conf = CONF
|
||||
LOG.debug(_("%(msg)s") % {'msg': ' '.join(map(pformat, (topic, msg)))})
|
||||
@ -763,7 +724,7 @@ def _multi_send(method, context, topic, msg, timeout=None,
|
||||
LOG.debug(_("Sending message(s) to: %s"), queues)
|
||||
|
||||
# Don't stack if we have no matchmaker results
|
||||
if len(queues) == 0:
|
||||
if not queues:
|
||||
LOG.warn(_("No matchmaker results. Not casting."))
|
||||
# While not strictly a timeout, callers know how to handle
|
||||
# this exception and a timeout isn't too big a lie.
|
||||
@ -807,12 +768,14 @@ def fanout_cast(conf, context, topic, msg, **kwargs):
|
||||
"""Send a message to all listening and expect no reply."""
|
||||
# NOTE(ewindisch): fanout~ is used because it avoid splitting on .
|
||||
# and acts as a non-subtle hint to the matchmaker and ZmqProxy.
|
||||
_multi_send(_cast, context, 'fanout~' + str(topic), msg, **kwargs)
|
||||
LOG.error(_('topic is %s.') % topic)
|
||||
if topic:
|
||||
_multi_send(_cast, context, 'fanout~' + str(topic), msg, **kwargs)
|
||||
|
||||
|
||||
def notify(conf, context, topic, msg, envelope):
|
||||
"""
|
||||
Send notification event.
|
||||
"""Send notification event.
|
||||
|
||||
Notifications are sent to topic-priority.
|
||||
This differs from the AMQP drivers which send to topic.priority.
|
||||
"""
|
||||
@ -846,6 +809,11 @@ def _get_ctxt():
|
||||
def _get_matchmaker(*args, **kwargs):
|
||||
global matchmaker
|
||||
if not matchmaker:
|
||||
matchmaker = importutils.import_object(
|
||||
CONF.rpc_zmq_matchmaker, *args, **kwargs)
|
||||
mm = CONF.rpc_zmq_matchmaker
|
||||
if mm.endswith('matchmaker.MatchMakerRing'):
|
||||
mm.replace('matchmaker', 'matchmaker_ring')
|
||||
LOG.warn(_('rpc_zmq_matchmaker = %(orig)s is deprecated; use'
|
||||
' %(new)s instead') % dict(
|
||||
orig=CONF.rpc_zmq_matchmaker, new=mm))
|
||||
matchmaker = importutils.import_object(mm, *args, **kwargs)
|
||||
return matchmaker
|
||||
|
@ -356,7 +356,6 @@ class Service(object):
|
||||
version_string = version.version_string()
|
||||
LOG.audit(_('Starting %(topic)s node (version %(version_string)s)'),
|
||||
{'topic': self.topic, 'version_string': version_string})
|
||||
self.manager.init_host()
|
||||
self.model_disconnected = False
|
||||
ctxt = context.get_admin_context()
|
||||
try:
|
||||
@ -376,13 +375,14 @@ class Service(object):
|
||||
# Share this same connection for these Consumers
|
||||
self.conn.create_consumer(self.topic, rpc_dispatcher, fanout=False)
|
||||
|
||||
node_topic = '%s.%s' % (self.topic, self.host)
|
||||
node_topic = '%s:%s' % (self.topic, self.host)
|
||||
self.conn.create_consumer(node_topic, rpc_dispatcher, fanout=False)
|
||||
|
||||
self.conn.create_consumer(self.topic, rpc_dispatcher, fanout=True)
|
||||
|
||||
# Consume from all consumers in a thread
|
||||
self.conn.consume_in_thread()
|
||||
self.manager.init_host()
|
||||
|
||||
if self.report_interval:
|
||||
pulse = utils.LoopingCall(self.report_state)
|
||||
|
@ -94,7 +94,7 @@ class VolumeRpcAPITestCase(test.TestCase):
|
||||
host = kwargs['host']
|
||||
else:
|
||||
host = kwargs['volume']['host']
|
||||
expected_topic = '%s.%s' % (CONF.volume_topic, host)
|
||||
expected_topic = '%s:%s' % (CONF.volume_topic, host)
|
||||
|
||||
self.fake_args = None
|
||||
self.fake_kwargs = None
|
||||
|
Loading…
x
Reference in New Issue
Block a user