Use openstack.common.notifier.
- Update openstack.common.log to use it (and fixes #1030078 for cinder as well so we can get review I9d4251b4292188c0174ebac1dcd98318df44c0e3 working). - Don't use openstack.common.context but our own just yet. Change-Id: I10feda623788a3b2418cad6bf55cbdb860ba8ee7
This commit is contained in:
parent
a1b4bd5e86
commit
a4c8e71e08
@ -91,9 +91,10 @@ def wrap_exception(notifier=None, publisher_id=None, event_type=None,
|
||||
get thrown. It logs the exception as well as optionally sending
|
||||
it to the notification system.
|
||||
"""
|
||||
# TODO(sandy): Find a way to import cinder.notifier.api so we don't have
|
||||
# to pass it in as a parameter. Otherwise we get a cyclic import of
|
||||
# cinder.notifier.api -> cinder.utils -> cinder.exception :(
|
||||
# TODO(sandy): Find a way to import cinder.openstackc.common.notifier.api
|
||||
# so we don't have to pass it in as a parameter. Otherwise we get a cyclic
|
||||
# import of cinder.openstack.common.notifier.api -> cinder.utils ->
|
||||
# cinder.exception :(
|
||||
# TODO(johannes): Also, it would be nice to use
|
||||
# utils.save_and_reraise_exception() without an import loop
|
||||
def inner(f):
|
||||
|
@ -222,7 +222,7 @@ global_opts = [
|
||||
default='cinder',
|
||||
help='availability zone of this node'),
|
||||
cfg.StrOpt('notification_driver',
|
||||
default='cinder.notifier.no_op_notifier',
|
||||
default='cinder.openstack.common.notifier.no_op_notifier',
|
||||
help='Default driver for sending notifications'),
|
||||
cfg.ListOpt('memcached_servers',
|
||||
default=None,
|
||||
|
@ -1,81 +0,0 @@
|
||||
# Copyright 2011 OpenStack LLC.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from cinder import context
|
||||
from cinder import db
|
||||
from cinder.openstack.common import log as logging
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def notify(message):
|
||||
"""Look for specific compute manager events and interprete them
|
||||
so as to keep the Capacity table up to date.
|
||||
|
||||
NOTE: the True/False return codes are only for testing.
|
||||
"""
|
||||
|
||||
# The event_type must start with 'compute.instance.'
|
||||
event_type = message.get('event_type', None)
|
||||
preamble = 'compute.instance.'
|
||||
if not event_type or not event_type.startswith(preamble):
|
||||
return False
|
||||
|
||||
# Events we're interested in end with .start and .end
|
||||
event = event_type[len(preamble):]
|
||||
parts = event.split('.')
|
||||
suffix = parts[-1].lower()
|
||||
event = event[:(-len(suffix) - 1)]
|
||||
|
||||
if suffix not in ['start', 'end']:
|
||||
return False
|
||||
started = suffix == 'start'
|
||||
ended = suffix == 'end'
|
||||
|
||||
if started and event == 'create':
|
||||
# We've already updated this stuff in the scheduler. Don't redo the
|
||||
# work here.
|
||||
return False
|
||||
|
||||
work = 1 if started else -1
|
||||
|
||||
# Extract the host name from the publisher id ...
|
||||
publisher_preamble = 'compute.'
|
||||
publisher = message.get('publisher_id', None)
|
||||
if not publisher or not publisher.startswith(publisher_preamble):
|
||||
return False
|
||||
host = publisher[len(publisher_preamble):]
|
||||
|
||||
# If we deleted an instance, make sure we reclaim the resources.
|
||||
# We may need to do something explicit for rebuild/migrate.
|
||||
free_ram_mb = 0
|
||||
free_disk_gb = 0
|
||||
vms = 0
|
||||
if ended and event == 'delete':
|
||||
vms = -1
|
||||
payload = message.get('payload', {})
|
||||
free_ram_mb = payload.get('memory_mb', 0)
|
||||
free_disk_gb = payload.get('disk_gb', 0)
|
||||
|
||||
LOG.debug("EventType=%(event_type)s -> host %(host)s: "
|
||||
"ram %(free_ram_mb)d, disk %(free_disk_gb)d, "
|
||||
"work %(work)d, vms%(vms)d" % locals())
|
||||
|
||||
db.api.compute_node_utilization_update(context.get_admin_context(), host,
|
||||
free_ram_mb_delta=free_ram_mb, free_disk_gb_delta=free_disk_gb,
|
||||
work_delta=work, vm_delta=vms)
|
||||
|
||||
return True
|
@ -1,71 +0,0 @@
|
||||
# Copyright 2011 OpenStack LLC.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from cinder import flags
|
||||
from cinder.openstack.common import log as logging
|
||||
from cinder.openstack.common import cfg
|
||||
from cinder.openstack.common import exception as common_exception
|
||||
from cinder.openstack.common import importutils
|
||||
|
||||
|
||||
list_notifier_drivers_opt = cfg.MultiStrOpt('list_notifier_drivers',
|
||||
default=['cinder.notifier.no_op_notifier'],
|
||||
help='List of drivers to send notifications')
|
||||
|
||||
FLAGS = flags.FLAGS
|
||||
FLAGS.register_opt(list_notifier_drivers_opt)
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
drivers = None
|
||||
|
||||
|
||||
class ImportFailureNotifier(object):
|
||||
"""Noisily re-raises some exception over-and-over when notify is called."""
|
||||
|
||||
def __init__(self, exception):
|
||||
self.exception = exception
|
||||
|
||||
def notify(self, message):
|
||||
raise self.exception
|
||||
|
||||
|
||||
def _get_drivers():
|
||||
"""Instantiates and returns drivers based on the flag values."""
|
||||
global drivers
|
||||
if not drivers:
|
||||
drivers = []
|
||||
for notification_driver in FLAGS.list_notifier_drivers:
|
||||
try:
|
||||
drivers.append(importutils.import_module(notification_driver))
|
||||
except ImportError as e:
|
||||
drivers.append(ImportFailureNotifier(e))
|
||||
return drivers
|
||||
|
||||
|
||||
def notify(message):
|
||||
"""Passes notification to multiple notifiers in a list."""
|
||||
for driver in _get_drivers():
|
||||
try:
|
||||
driver.notify(message)
|
||||
except Exception as e:
|
||||
LOG.exception(_("Problem '%(e)s' attempting to send to "
|
||||
"notification driver %(driver)s."), locals())
|
||||
|
||||
|
||||
def _reset_drivers():
|
||||
"""Used by unit tests to reset the drivers."""
|
||||
global drivers
|
||||
drivers = None
|
@ -464,7 +464,7 @@ def _is_opt_registered(opts, opt):
|
||||
:raises: DuplicateOptError if a naming conflict is detected
|
||||
"""
|
||||
if opt.dest in opts:
|
||||
if opts[opt.dest]['opt'] is not opt:
|
||||
if opts[opt.dest]['opt'] != opt:
|
||||
raise DuplicateOptError(opt.name)
|
||||
return True
|
||||
else:
|
||||
@ -527,6 +527,9 @@ class Opt(object):
|
||||
else:
|
||||
self.deprecated_name = None
|
||||
|
||||
def __ne__(self, another):
|
||||
return vars(self) != vars(another)
|
||||
|
||||
def _get_from_config_parser(self, cparser, section):
|
||||
"""Retrieves the option value from a MultiConfigParser object.
|
||||
|
||||
|
@ -19,7 +19,6 @@
|
||||
Exceptions common to OpenStack projects
|
||||
"""
|
||||
|
||||
import itertools
|
||||
import logging
|
||||
|
||||
|
||||
|
@ -107,9 +107,11 @@ def to_primitive(value, convert_instances=False, level=0):
|
||||
elif hasattr(value, 'iteritems'):
|
||||
return to_primitive(dict(value.iteritems()),
|
||||
convert_instances=convert_instances,
|
||||
level=level)
|
||||
level=level + 1)
|
||||
elif hasattr(value, '__iter__'):
|
||||
return to_primitive(list(value), level)
|
||||
return to_primitive(list(value),
|
||||
convert_instances=convert_instances,
|
||||
level=level)
|
||||
elif convert_instances and hasattr(value, '__dict__'):
|
||||
# Likely an instance of something. Watch for cycles.
|
||||
# Ignore class member vars.
|
||||
|
@ -44,7 +44,7 @@ from cinder.openstack.common import cfg
|
||||
from cinder.openstack.common.gettextutils import _
|
||||
from cinder.openstack.common import jsonutils
|
||||
from cinder.openstack.common import local
|
||||
from cinder import notifier
|
||||
from cinder.openstack.common import notifier
|
||||
|
||||
|
||||
log_opts = [
|
||||
@ -251,22 +251,24 @@ class PublishErrorsHandler(logging.Handler):
|
||||
if ('cinder.openstack.common.notifier.log_notifier' in
|
||||
CONF.list_notifier_drivers):
|
||||
return
|
||||
notifier.api.notify('error.publisher',
|
||||
notifier.api.notify(None, 'error.publisher',
|
||||
'error_notification',
|
||||
notifier.api.ERROR,
|
||||
dict(error=record.msg))
|
||||
|
||||
|
||||
def handle_exception(type, value, tb):
|
||||
extra = {}
|
||||
if CONF.verbose:
|
||||
extra['exc_info'] = (type, value, tb)
|
||||
getLogger().critical(str(value), **extra)
|
||||
def _create_logging_excepthook(product_name):
|
||||
def logging_excepthook(type, value, tb):
|
||||
extra = {}
|
||||
if CONF.verbose:
|
||||
extra['exc_info'] = (type, value, tb)
|
||||
getLogger(product_name).critical(str(value), **extra)
|
||||
return logging_excepthook
|
||||
|
||||
|
||||
def setup(product_name):
|
||||
"""Setup logging."""
|
||||
sys.excepthook = handle_exception
|
||||
sys.excepthook = _create_logging_excepthook(product_name)
|
||||
|
||||
if CONF.log_config:
|
||||
try:
|
||||
@ -357,17 +359,6 @@ def _setup_logging_from_conf(product_name):
|
||||
for handler in log_root.handlers:
|
||||
logger.addHandler(handler)
|
||||
|
||||
# NOTE(jkoelker) Clear the handlers for the root logger that was setup
|
||||
# by basicConfig in nova/__init__.py and install the
|
||||
# NullHandler.
|
||||
root = logging.getLogger()
|
||||
for handler in root.handlers:
|
||||
root.removeHandler(handler)
|
||||
handler = NullHandler()
|
||||
handler.setFormatter(logging.Formatter())
|
||||
root.addHandler(handler)
|
||||
|
||||
|
||||
_loggers = {}
|
||||
|
||||
|
||||
@ -405,8 +396,12 @@ class LegacyFormatter(logging.Formatter):
|
||||
|
||||
def format(self, record):
|
||||
"""Uses contextstring if request_id is set, otherwise default."""
|
||||
if 'instance' not in record.__dict__:
|
||||
record.__dict__['instance'] = ''
|
||||
# NOTE(sdague): default the fancier formating params
|
||||
# to an empty string so we don't throw an exception if
|
||||
# they get used
|
||||
for key in ('instance', 'color'):
|
||||
if key not in record.__dict__:
|
||||
record.__dict__[key] = ''
|
||||
|
||||
if record.__dict__.get('request_id', None):
|
||||
self._fmt = CONF.logging_context_format_string
|
||||
|
@ -13,30 +13,34 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import inspect
|
||||
import uuid
|
||||
|
||||
from cinder import flags
|
||||
from cinder import utils
|
||||
from cinder.openstack.common import log as logging
|
||||
from cinder.openstack.common import cfg
|
||||
from cinder import context
|
||||
from cinder.openstack.common.gettextutils import _
|
||||
from cinder.openstack.common import importutils
|
||||
from cinder.openstack.common import jsonutils
|
||||
from cinder.openstack.common import log as logging
|
||||
from cinder.openstack.common import timeutils
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
notifier_opts = [
|
||||
cfg.StrOpt('notification_driver',
|
||||
default='cinder.openstack.common.notifier.no_op_notifier',
|
||||
help='Default driver for sending notifications'),
|
||||
cfg.StrOpt('default_notification_level',
|
||||
default='INFO',
|
||||
help='Default notification level for outgoing notifications'),
|
||||
cfg.StrOpt('default_publisher_id',
|
||||
default='$host',
|
||||
help='Default publisher_id for outgoing notifications'),
|
||||
]
|
||||
]
|
||||
|
||||
FLAGS = flags.FLAGS
|
||||
FLAGS.register_opts(notifier_opts)
|
||||
CONF = cfg.CONF
|
||||
CONF.register_opts(notifier_opts)
|
||||
|
||||
WARN = 'WARN'
|
||||
INFO = 'INFO'
|
||||
@ -67,21 +71,24 @@ def notify_decorator(name, fn):
|
||||
body['args'].append(arg)
|
||||
for key in kwarg:
|
||||
body['kwarg'][key] = kwarg[key]
|
||||
notify(FLAGS.default_publisher_id,
|
||||
name,
|
||||
FLAGS.default_notification_level,
|
||||
body)
|
||||
|
||||
ctxt = context.get_context_from_function_and_args(fn, args, kwarg)
|
||||
notify(ctxt,
|
||||
CONF.default_publisher_id,
|
||||
name,
|
||||
CONF.default_notification_level,
|
||||
body)
|
||||
return fn(*args, **kwarg)
|
||||
return wrapped_func
|
||||
|
||||
|
||||
def publisher_id(service, host=None):
|
||||
if not host:
|
||||
host = FLAGS.host
|
||||
host = CONF.host
|
||||
return "%s.%s" % (service, host)
|
||||
|
||||
|
||||
def notify(publisher_id, event_type, priority, payload):
|
||||
def notify(context, publisher_id, event_type, priority, payload):
|
||||
"""Sends a notification using the specified driver
|
||||
|
||||
:param publisher_id: the source worker_type.host of the message
|
||||
@ -115,21 +122,21 @@ def notify(publisher_id, event_type, priority, payload):
|
||||
"""
|
||||
if priority not in log_levels:
|
||||
raise BadPriorityException(
|
||||
_('%s not in valid priorities') % priority)
|
||||
_('%s not in valid priorities') % priority)
|
||||
|
||||
# Ensure everything is JSON serializable.
|
||||
payload = jsonutils.to_primitive(payload, convert_instances=True)
|
||||
|
||||
driver = importutils.import_module(FLAGS.notification_driver)
|
||||
driver = importutils.import_module(CONF.notification_driver)
|
||||
msg = dict(message_id=str(uuid.uuid4()),
|
||||
publisher_id=publisher_id,
|
||||
event_type=event_type,
|
||||
priority=priority,
|
||||
payload=payload,
|
||||
timestamp=str(timeutils.utcnow()))
|
||||
publisher_id=publisher_id,
|
||||
event_type=event_type,
|
||||
priority=priority,
|
||||
payload=payload,
|
||||
timestamp=str(timeutils.utcnow()))
|
||||
try:
|
||||
driver.notify(msg)
|
||||
driver.notify(context, msg)
|
||||
except Exception, e:
|
||||
LOG.exception(_("Problem '%(e)s' attempting to "
|
||||
"send to notification system. Payload=%(payload)s") %
|
||||
locals())
|
||||
locals())
|
118
cinder/openstack/common/notifier/list_notifier.py
Normal file
118
cinder/openstack/common/notifier/list_notifier.py
Normal file
@ -0,0 +1,118 @@
|
||||
# Copyright 2011 OpenStack LLC.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from cinder.openstack.common import cfg
|
||||
from cinder.openstack.common.gettextutils import _
|
||||
from cinder.openstack.common import importutils
|
||||
from cinder.openstack.common import log as logging
|
||||
|
||||
|
||||
list_notifier_drivers_opt = cfg.MultiStrOpt(
|
||||
'list_notifier_drivers',
|
||||
default=['cinder.openstack.common.notifier.no_op_notifier'],
|
||||
help='List of drivers to send notifications')
|
||||
|
||||
CONF = cfg.CONF
|
||||
CONF.register_opt(list_notifier_drivers_opt)
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
drivers = None
|
||||
|
||||
|
||||
class ImportFailureNotifier(object):
|
||||
"""Noisily re-raises some exception over-and-over when notify is called."""
|
||||
|
||||
def __init__(self, exception):
|
||||
self.exception = exception
|
||||
|
||||
def notify(self, context, message):
|
||||
raise self.exception
|
||||
|
||||
|
||||
def _get_drivers():
|
||||
"""Instantiates and returns drivers based on the flag values."""
|
||||
global drivers
|
||||
if drivers is None:
|
||||
drivers = []
|
||||
for notification_driver in CONF.list_notifier_drivers:
|
||||
try:
|
||||
drivers.append(importutils.import_module(notification_driver))
|
||||
except ImportError as e:
|
||||
drivers.append(ImportFailureNotifier(e))
|
||||
return drivers
|
||||
|
||||
|
||||
def add_driver(notification_driver):
|
||||
"""Add a notification driver at runtime."""
|
||||
# Make sure the driver list is initialized.
|
||||
_get_drivers()
|
||||
if isinstance(notification_driver, basestring):
|
||||
# Load and add
|
||||
try:
|
||||
drivers.append(importutils.import_module(notification_driver))
|
||||
except ImportError as e:
|
||||
drivers.append(ImportFailureNotifier(e))
|
||||
else:
|
||||
# Driver is already loaded; just add the object.
|
||||
drivers.append(notification_driver)
|
||||
|
||||
|
||||
def _object_name(obj):
|
||||
name = []
|
||||
if hasattr(obj, '__module__'):
|
||||
name.append(obj.__module__)
|
||||
if hasattr(obj, '__name__'):
|
||||
name.append(obj.__name__)
|
||||
else:
|
||||
name.append(obj.__class__.__name__)
|
||||
return '.'.join(name)
|
||||
|
||||
|
||||
def remove_driver(notification_driver):
|
||||
"""Remove a notification driver at runtime."""
|
||||
# Make sure the driver list is initialized.
|
||||
_get_drivers()
|
||||
removed = False
|
||||
if notification_driver in drivers:
|
||||
# We're removing an object. Easy.
|
||||
drivers.remove(notification_driver)
|
||||
removed = True
|
||||
else:
|
||||
# We're removing a driver by name. Search for it.
|
||||
for driver in drivers:
|
||||
if _object_name(driver) == notification_driver:
|
||||
drivers.remove(driver)
|
||||
removed = True
|
||||
|
||||
if not removed:
|
||||
raise ValueError("Cannot remove; %s is not in list" %
|
||||
notification_driver)
|
||||
|
||||
|
||||
def notify(context, message):
|
||||
"""Passes notification to multiple notifiers in a list."""
|
||||
for driver in _get_drivers():
|
||||
try:
|
||||
driver.notify(context, message)
|
||||
except Exception as e:
|
||||
LOG.exception(_("Problem '%(e)s' attempting to send to "
|
||||
"notification driver %(driver)s."), locals())
|
||||
|
||||
|
||||
def _reset_drivers():
|
||||
"""Used by unit tests to reset the drivers."""
|
||||
global drivers
|
||||
drivers = None
|
@ -14,21 +14,22 @@
|
||||
# under the License.
|
||||
|
||||
|
||||
from cinder import flags
|
||||
from cinder.openstack.common import log as logging
|
||||
from cinder.openstack.common import cfg
|
||||
from cinder.openstack.common import jsonutils
|
||||
from cinder.openstack.common import log as logging
|
||||
|
||||
|
||||
FLAGS = flags.FLAGS
|
||||
CONF = cfg.CONF
|
||||
|
||||
|
||||
def notify(message):
|
||||
def notify(_context, message):
|
||||
"""Notifies the recipient of the desired event given the model.
|
||||
Log notifications using cinder's default logging system"""
|
||||
Log notifications using openstack's default logging system"""
|
||||
|
||||
priority = message.get('priority',
|
||||
FLAGS.default_notification_level)
|
||||
CONF.default_notification_level)
|
||||
priority = priority.lower()
|
||||
logger = logging.getLogger(
|
||||
'cinder.notification.%s' % message['event_type'])
|
||||
'cinder.openstack.common.notification.%s' %
|
||||
message['event_type'])
|
||||
getattr(logger, priority)(jsonutils.dumps(message))
|
@ -14,6 +14,6 @@
|
||||
# under the License.
|
||||
|
||||
|
||||
def notify(message):
|
||||
def notify(_context, message):
|
||||
"""Notifies the recipient of the desired event given the model"""
|
||||
pass
|
@ -14,30 +14,30 @@
|
||||
# under the License.
|
||||
|
||||
|
||||
import cinder.context
|
||||
|
||||
from cinder import flags
|
||||
from cinder.openstack.common import log as logging
|
||||
from cinder.openstack.common import cfg
|
||||
from cinder.openstack.common import context as req_context
|
||||
from cinder.openstack.common.gettextutils import _
|
||||
from cinder.openstack.common import log as logging
|
||||
from cinder.openstack.common import rpc
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
notification_topic_opt = cfg.ListOpt('notification_topics',
|
||||
default=['notifications', ],
|
||||
help='AMQP topic used for Cinder notifications')
|
||||
notification_topic_opt = cfg.ListOpt(
|
||||
'notification_topics', default=['notifications', ],
|
||||
help='AMQP topic used for openstack notifications')
|
||||
|
||||
FLAGS = flags.FLAGS
|
||||
FLAGS.register_opt(notification_topic_opt)
|
||||
CONF = cfg.CONF
|
||||
CONF.register_opt(notification_topic_opt)
|
||||
|
||||
|
||||
def notify(message):
|
||||
def notify(context, message):
|
||||
"""Sends a notification to the RabbitMQ"""
|
||||
context = cinder.context.get_admin_context()
|
||||
if not context:
|
||||
context = req_context.get_admin_context()
|
||||
priority = message.get('priority',
|
||||
FLAGS.default_notification_level)
|
||||
CONF.default_notification_level)
|
||||
priority = priority.lower()
|
||||
for topic in FLAGS.notification_topics:
|
||||
for topic in CONF.notification_topics:
|
||||
topic = '%s.%s' % (topic, priority)
|
||||
try:
|
||||
rpc.notify(context, topic, message)
|
@ -13,13 +13,10 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from cinder import flags
|
||||
|
||||
FLAGS = flags.FLAGS
|
||||
|
||||
NOTIFICATIONS = []
|
||||
|
||||
|
||||
def notify(message):
|
||||
def notify(_context, message):
|
||||
"""Test notifier, stores notifications in memory for unittests."""
|
||||
NOTIFICATIONS.append(message)
|
@ -108,7 +108,7 @@ class Connection(object):
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
def create_consumer(self, conf, topic, proxy, fanout=False):
|
||||
def create_consumer(self, topic, proxy, fanout=False):
|
||||
"""Create a consumer on this connection.
|
||||
|
||||
A consumer is associated with a message queue on the backend message
|
||||
@ -117,7 +117,6 @@ class Connection(object):
|
||||
off of the queue will determine which method gets called on the proxy
|
||||
object.
|
||||
|
||||
:param conf: An openstack.common.cfg configuration object.
|
||||
:param topic: This is a name associated with what to consume from.
|
||||
Multiple instances of a service may consume from the same
|
||||
topic. For example, all instances of nova-compute consume
|
||||
@ -133,7 +132,7 @@ class Connection(object):
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
def create_worker(self, conf, topic, proxy, pool_name):
|
||||
def create_worker(self, topic, proxy, pool_name):
|
||||
"""Create a worker on this connection.
|
||||
|
||||
A worker is like a regular consumer of messages directed to a
|
||||
@ -143,7 +142,6 @@ class Connection(object):
|
||||
be asked to process it. Load is distributed across the members
|
||||
of the pool in round-robin fashion.
|
||||
|
||||
:param conf: An openstack.common.cfg configuration object.
|
||||
:param topic: This is a name associated with what to consume from.
|
||||
Multiple instances of a service may consume from the same
|
||||
topic.
|
||||
|
@ -329,7 +329,7 @@ class Connection(object):
|
||||
if self.conf.qpid_reconnect_interval:
|
||||
self.connection.reconnect_interval = (
|
||||
self.conf.qpid_reconnect_interval)
|
||||
self.connection.hearbeat = self.conf.qpid_heartbeat
|
||||
self.connection.heartbeat = self.conf.qpid_heartbeat
|
||||
self.connection.protocol = self.conf.qpid_protocol
|
||||
self.connection.tcp_nodelay = self.conf.qpid_tcp_nodelay
|
||||
|
||||
|
@ -112,11 +112,12 @@ class RpcProxy(object):
|
||||
self._set_version(msg, version)
|
||||
rpc.cast(context, self._get_topic(topic), msg)
|
||||
|
||||
def fanout_cast(self, context, msg, version=None):
|
||||
def fanout_cast(self, context, msg, topic=None, version=None):
|
||||
"""rpc.fanout_cast() a remote method.
|
||||
|
||||
:param context: The request context
|
||||
:param msg: The message to send, including the method and args.
|
||||
:param topic: Override the topic for this message.
|
||||
:param version: (Optional) Override the requested API version in this
|
||||
message.
|
||||
|
||||
@ -124,7 +125,7 @@ class RpcProxy(object):
|
||||
from the remote method.
|
||||
"""
|
||||
self._set_version(msg, version)
|
||||
rpc.fanout_cast(context, self.topic, msg)
|
||||
rpc.fanout_cast(context, self._get_topic(topic), msg)
|
||||
|
||||
def cast_to_server(self, context, server_params, msg, topic=None,
|
||||
version=None):
|
||||
@ -144,13 +145,15 @@ class RpcProxy(object):
|
||||
self._set_version(msg, version)
|
||||
rpc.cast_to_server(context, server_params, self._get_topic(topic), msg)
|
||||
|
||||
def fanout_cast_to_server(self, context, server_params, msg, version=None):
|
||||
def fanout_cast_to_server(self, context, server_params, msg, topic=None,
|
||||
version=None):
|
||||
"""rpc.fanout_cast_to_server() a remote method.
|
||||
|
||||
:param context: The request context
|
||||
:param server_params: Server parameters. See rpc.cast_to_server() for
|
||||
details.
|
||||
:param msg: The message to send, including the method and args.
|
||||
:param topic: Override the topic for this message.
|
||||
:param version: (Optional) Override the requested API version in this
|
||||
message.
|
||||
|
||||
@ -158,4 +161,5 @@ class RpcProxy(object):
|
||||
return values.
|
||||
"""
|
||||
self._set_version(msg, version)
|
||||
rpc.fanout_cast_to_server(context, server_params, self.topic, msg)
|
||||
rpc.fanout_cast_to_server(context, server_params,
|
||||
self._get_topic(topic), msg)
|
||||
|
@ -35,7 +35,6 @@ def parse_mailmap(mailmap='.mailmap'):
|
||||
for l in fp:
|
||||
l = l.strip()
|
||||
if not l.startswith('#') and ' ' in l:
|
||||
print l
|
||||
canonical_email, alias = [x for x in l.split(' ')
|
||||
if x.startswith('<')]
|
||||
mapping[alias] = canonical_email
|
||||
@ -53,7 +52,6 @@ def canonicalize_emails(changelog, mapping):
|
||||
|
||||
# Get requirements from the first file that exists
|
||||
def get_reqs_from_files(requirements_files):
|
||||
reqs_in = []
|
||||
for requirements_file in requirements_files:
|
||||
if os.path.exists(requirements_file):
|
||||
return open(requirements_file, 'r').read().split('\n')
|
||||
@ -145,8 +143,8 @@ def _get_git_next_version_suffix(branch_name):
|
||||
# where the bit after the last . is the short sha, and the bit between
|
||||
# the last and second to last is the revno count
|
||||
(revno, sha) = post_version.split(".")[-2:]
|
||||
first_half = "%(milestonever)s~%(datestamp)s" % locals()
|
||||
second_half = "%(revno_prefix)s%(revno)s.%(sha)s" % locals()
|
||||
first_half = "%s~%s" % (milestonever, datestamp)
|
||||
second_half = "%s%s.%s" % (revno_prefix, revno, sha)
|
||||
return ".".join((first_half, second_half))
|
||||
|
||||
|
||||
|
@ -21,7 +21,6 @@ Time related utilities and helper functions.
|
||||
|
||||
import calendar
|
||||
import datetime
|
||||
import time
|
||||
|
||||
import iso8601
|
||||
|
||||
|
@ -1,16 +0,0 @@
|
||||
# Copyright 2011 OpenStack LLC.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from cinder.tests import *
|
@ -1,84 +0,0 @@
|
||||
# Copyright 2011 OpenStack LLC.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import cinder
|
||||
from cinder.openstack.common import log as logging
|
||||
import cinder.notifier.api
|
||||
import cinder.notifier.log_notifier
|
||||
import cinder.notifier.no_op_notifier
|
||||
from cinder.notifier import list_notifier
|
||||
from cinder import test
|
||||
|
||||
|
||||
class NotifierListTestCase(test.TestCase):
|
||||
"""Test case for notifications"""
|
||||
|
||||
def setUp(self):
|
||||
super(NotifierListTestCase, self).setUp()
|
||||
list_notifier._reset_drivers()
|
||||
# Mock log to add one to exception_count when log.exception is called
|
||||
|
||||
def mock_exception(cls, *args):
|
||||
self.exception_count += 1
|
||||
|
||||
self.exception_count = 0
|
||||
list_notifier_log = logging.getLogger('cinder.notifier.list_notifier')
|
||||
self.stubs.Set(list_notifier_log, "exception", mock_exception)
|
||||
# Mock no_op notifier to add one to notify_count when called.
|
||||
|
||||
def mock_notify(cls, *args):
|
||||
self.notify_count += 1
|
||||
|
||||
self.notify_count = 0
|
||||
self.stubs.Set(cinder.notifier.no_op_notifier, 'notify', mock_notify)
|
||||
# Mock log_notifier to raise RuntimeError when called.
|
||||
|
||||
def mock_notify2(cls, *args):
|
||||
raise RuntimeError("Bad notifier.")
|
||||
|
||||
self.stubs.Set(cinder.notifier.log_notifier, 'notify', mock_notify2)
|
||||
|
||||
def tearDown(self):
|
||||
list_notifier._reset_drivers()
|
||||
super(NotifierListTestCase, self).tearDown()
|
||||
|
||||
def test_send_notifications_successfully(self):
|
||||
self.flags(notification_driver='cinder.notifier.list_notifier',
|
||||
list_notifier_drivers=['cinder.notifier.no_op_notifier',
|
||||
'cinder.notifier.no_op_notifier'])
|
||||
cinder.notifier.api.notify('publisher_id', 'event_type',
|
||||
cinder.notifier.api.WARN, dict(a=3))
|
||||
self.assertEqual(self.notify_count, 2)
|
||||
self.assertEqual(self.exception_count, 0)
|
||||
|
||||
def test_send_notifications_with_errors(self):
|
||||
|
||||
self.flags(notification_driver='cinder.notifier.list_notifier',
|
||||
list_notifier_drivers=['cinder.notifier.no_op_notifier',
|
||||
'cinder.notifier.log_notifier'])
|
||||
cinder.notifier.api.notify('publisher_id',
|
||||
'event_type', cinder.notifier.api.WARN, dict(a=3))
|
||||
self.assertEqual(self.notify_count, 1)
|
||||
self.assertEqual(self.exception_count, 1)
|
||||
|
||||
def test_when_driver_fails_to_import(self):
|
||||
self.flags(notification_driver='cinder.notifier.list_notifier',
|
||||
list_notifier_drivers=['cinder.notifier.no_op_notifier',
|
||||
'cinder.notifier.logo_notifier',
|
||||
'fdsjgsdfhjkhgsfkj'])
|
||||
cinder.notifier.api.notify('publisher_id',
|
||||
'event_type', cinder.notifier.api.WARN, dict(a=3))
|
||||
self.assertEqual(self.exception_count, 2)
|
||||
self.assertEqual(self.notify_count, 1)
|
@ -26,7 +26,7 @@ from cinder import context
|
||||
from cinder import db
|
||||
from cinder import exception
|
||||
from cinder import flags
|
||||
from cinder.notifier import api as notifier
|
||||
from cinder.openstack.common.notifier import api as notifier
|
||||
from cinder.openstack.common import rpc
|
||||
from cinder.openstack.common.rpc import common as rpc_common
|
||||
from cinder.openstack.common import timeutils
|
||||
|
@ -21,7 +21,7 @@ from cinder import exception
|
||||
|
||||
|
||||
class FakeNotifier(object):
|
||||
"""Acts like the cinder.notifier.api module."""
|
||||
"""Acts like the cinder.openstack.common.notifier.api module."""
|
||||
ERROR = 88
|
||||
|
||||
def __init__(self):
|
||||
|
@ -1,133 +0,0 @@
|
||||
# Copyright 2011 OpenStack LLC.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import cinder
|
||||
from cinder import flags
|
||||
from cinder.openstack.common import log as logging
|
||||
import cinder.notifier.no_op_notifier
|
||||
from cinder.notifier import api as notifier_api
|
||||
from cinder import test
|
||||
|
||||
|
||||
class NotifierTestCase(test.TestCase):
|
||||
"""Test case for notifications"""
|
||||
def setUp(self):
|
||||
super(NotifierTestCase, self).setUp()
|
||||
self.flags(notification_driver='cinder.notifier.no_op_notifier')
|
||||
|
||||
def test_send_notification(self):
|
||||
self.notify_called = False
|
||||
|
||||
def mock_notify(cls, *args):
|
||||
self.notify_called = True
|
||||
|
||||
self.stubs.Set(cinder.notifier.no_op_notifier, 'notify',
|
||||
mock_notify)
|
||||
|
||||
notifier_api.notify('publisher_id', 'event_type',
|
||||
cinder.notifier.api.WARN, dict(a=3))
|
||||
self.assertEqual(self.notify_called, True)
|
||||
|
||||
def test_verify_message_format(self):
|
||||
"""A test to ensure changing the message format is prohibitively
|
||||
annoying"""
|
||||
|
||||
def message_assert(message):
|
||||
fields = [('publisher_id', 'publisher_id'),
|
||||
('event_type', 'event_type'),
|
||||
('priority', 'WARN'),
|
||||
('payload', dict(a=3))]
|
||||
for k, v in fields:
|
||||
self.assertEqual(message[k], v)
|
||||
self.assertTrue(len(message['message_id']) > 0)
|
||||
self.assertTrue(len(message['timestamp']) > 0)
|
||||
|
||||
self.stubs.Set(cinder.notifier.no_op_notifier, 'notify',
|
||||
message_assert)
|
||||
notifier_api.notify('publisher_id', 'event_type',
|
||||
cinder.notifier.api.WARN, dict(a=3))
|
||||
|
||||
def test_send_rabbit_notification(self):
|
||||
self.stubs.Set(cinder.flags.FLAGS, 'notification_driver',
|
||||
'cinder.notifier.rabbit_notifier')
|
||||
self.mock_notify = False
|
||||
|
||||
def mock_notify(cls, *args):
|
||||
self.mock_notify = True
|
||||
|
||||
self.stubs.Set(cinder.openstack.common.rpc, 'notify', mock_notify)
|
||||
notifier_api.notify('publisher_id', 'event_type',
|
||||
cinder.notifier.api.WARN, dict(a=3))
|
||||
|
||||
self.assertEqual(self.mock_notify, True)
|
||||
|
||||
def test_invalid_priority(self):
|
||||
self.assertRaises(cinder.notifier.api.BadPriorityException,
|
||||
notifier_api.notify, 'publisher_id',
|
||||
'event_type', 'not a priority', dict(a=3))
|
||||
|
||||
def test_rabbit_priority_queue(self):
|
||||
flags.DECLARE('notification_topics', 'cinder.notifier.rabbit_notifier')
|
||||
self.stubs.Set(cinder.flags.FLAGS, 'notification_driver',
|
||||
'cinder.notifier.rabbit_notifier')
|
||||
self.stubs.Set(cinder.flags.FLAGS, 'notification_topics',
|
||||
['testnotify', ])
|
||||
|
||||
self.test_topic = None
|
||||
|
||||
def mock_notify(context, topic, msg):
|
||||
self.test_topic = topic
|
||||
|
||||
self.stubs.Set(cinder.openstack.common.rpc, 'notify', mock_notify)
|
||||
notifier_api.notify('publisher_id', 'event_type', 'DEBUG', dict(a=3))
|
||||
self.assertEqual(self.test_topic, 'testnotify.debug')
|
||||
|
||||
def test_error_notification(self):
|
||||
self.stubs.Set(cinder.flags.FLAGS, 'notification_driver',
|
||||
'cinder.notifier.rabbit_notifier')
|
||||
self.stubs.Set(cinder.flags.FLAGS, 'publish_errors', True)
|
||||
LOG = logging.getLogger('cinder')
|
||||
logging.setup("cinder")
|
||||
msgs = []
|
||||
|
||||
def mock_notify(context, topic, data):
|
||||
msgs.append(data)
|
||||
|
||||
self.stubs.Set(cinder.openstack.common.rpc, 'notify', mock_notify)
|
||||
LOG.error('foo')
|
||||
self.assertEqual(1, len(msgs))
|
||||
msg = msgs[0]
|
||||
self.assertEqual(msg['event_type'], 'error_notification')
|
||||
self.assertEqual(msg['priority'], 'ERROR')
|
||||
self.assertEqual(msg['payload']['error'], 'foo')
|
||||
|
||||
def test_send_notification_by_decorator(self):
|
||||
self.notify_called = False
|
||||
|
||||
def example_api(arg1, arg2):
|
||||
return arg1 + arg2
|
||||
|
||||
example_api = cinder.notifier.api.notify_decorator(
|
||||
'example_api',
|
||||
example_api)
|
||||
|
||||
def mock_notify(cls, *args):
|
||||
self.notify_called = True
|
||||
|
||||
self.stubs.Set(cinder.notifier.no_op_notifier, 'notify',
|
||||
mock_notify)
|
||||
|
||||
self.assertEqual(3, example_api(1, 2))
|
||||
self.assertEqual(self.notify_called, True)
|
@ -805,10 +805,11 @@ def monkey_patch():
|
||||
You can set decorators for each modules
|
||||
using FLAGS.monkey_patch_modules.
|
||||
The format is "Module path:Decorator function".
|
||||
Example: 'cinder.api.ec2.cloud:cinder.notifier.api.notify_decorator'
|
||||
Example: 'cinder.api.ec2.cloud:' \
|
||||
cinder.openstack.common.notifier.api.notify_decorator'
|
||||
|
||||
Parameters of the decorator is as follows.
|
||||
(See cinder.notifier.api.notify_decorator)
|
||||
(See cinder.openstack.common.notifier.api.notify_decorator)
|
||||
|
||||
name - name of the function
|
||||
function - object of the function
|
||||
|
@ -118,13 +118,13 @@
|
||||
###### (BoolOpt) Whether to log monkey patching
|
||||
# monkey_patch=false
|
||||
###### (ListOpt) List of modules/decorators to monkey patch
|
||||
# monkey_patch_modules="cinder.api.ec2.cloud:cinder.notifier.api.notify_decorator,cinder.compute.api:cinder.notifier.api.notify_decorator"
|
||||
# monkey_patch_modules="cinder.api.ec2.cloud:cinder.openstack.common.notifier.api.notify_decorator,cinder.compute.api:cinder.openstack.common.notifier.api.notify_decorator"
|
||||
###### (StrOpt) ip address of this host
|
||||
# my_ip="10.0.0.1"
|
||||
###### (StrOpt) availability zone of this node
|
||||
# node_availability_zone="cinder"
|
||||
###### (StrOpt) Default driver for sending notifications
|
||||
# notification_driver="cinder.notifier.no_op_notifier"
|
||||
# notification_driver="cinder.openstack.common.notifier.no_op_notifier"
|
||||
###### (StrOpt) kernel image that indicates not to use a kernel, but to use a raw disk image instead
|
||||
# null_kernel="nokernel"
|
||||
###### (ListOpt) Specify list of extensions to load when using osapi_compute_extension option with cinder.api.openstack.compute.contrib.select_extensions
|
||||
@ -450,12 +450,12 @@
|
||||
###### (StrOpt) Tilera command line program for Bare-metal driver
|
||||
# tile_monitor="/usr/local/TileraMDE/bin/tile-monitor"
|
||||
|
||||
######### defined in cinder.notifier.list_notifier #########
|
||||
######### defined in cinder.openstack.common.notifier.list_notifier #########
|
||||
|
||||
###### (MultiStrOpt) List of drivers to send notifications
|
||||
# list_notifier_drivers="cinder.notifier.no_op_notifier"
|
||||
# list_notifier_drivers="cinder.openstack.common.notifier.no_op_notifier"
|
||||
|
||||
######### defined in cinder.notifier.rabbit_notifier #########
|
||||
######### defined in cinder.openstack.common.notifier.rabbit_notifier #########
|
||||
|
||||
###### (ListOpt) AMQP topic used for Cinder notifications
|
||||
# notification_topics="notifications"
|
||||
|
@ -1,7 +1,7 @@
|
||||
[DEFAULT]
|
||||
|
||||
# The list of modules to copy from openstack-common
|
||||
modules=cfg,exception,excutils,gettextutils,importutils,iniparser,jsonutils,local,rpc,timeutils,log,setup
|
||||
modules=cfg,exception,excutils,gettextutils,importutils,iniparser,jsonutils,local,rpc,timeutils,log,setup,notifier
|
||||
|
||||
# The base module to hold the copy of openstack.common
|
||||
base=cinder
|
||||
|
Loading…
x
Reference in New Issue
Block a user