Adding the volume notifications to cinder

adding the volume notifications
added unit tests for notifications
added context for notify with rabbit

blueprint cinder-notifications

Change-Id: I5aef1e718eebb9a61e0670d524fcd5f438dee016
This commit is contained in:
Craig Vyvial 2012-08-15 15:50:29 -05:00
parent 87ba5de73e
commit 6ad360741f
11 changed files with 401 additions and 54 deletions

87
bin/volume-usage-audit Normal file

@ -0,0 +1,87 @@
#!/usr/bin/env python
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2011 Openstack, LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Cron script to generate usage notifications for volumes existing during
the audit period.
Together with the notifications generated by volumes
create/delete/resize, over that time period, this allows an external
system consuming usage notification feeds to calculate volume usage
for each tenant.
Time periods are specified as 'hour', 'month', 'day' or 'year'
hour = previous hour. If run at 9:07am, will generate usage for 8-9am.
month = previous month. If the script is run April 1, it will generate
usages for March 1 through March 31.
day = previous day. if run on July 4th, it generates usages for July 3rd.
year = previous year. If run on Jan 1, it generates usages for
Jan 1 through Dec 31 of the previous year.
"""
import gettext
import os
import sys
import traceback
# If ../cinder/__init__.py exists, add ../ to Python search path, so that
# it will override what happens to be installed in /usr/(local/)lib/python...
POSSIBLE_TOPDIR = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]),
os.pardir,
os.pardir))
if os.path.exists(os.path.join(POSSIBLE_TOPDIR, 'cinder', '__init__.py')):
sys.path.insert(0, POSSIBLE_TOPDIR)
gettext.install('cinder', unicode=1)
from cinder import context
from cinder import db
from cinder import flags
from cinder.openstack.common import log as logging
from cinder.openstack.common import rpc
from cinder import utils
import cinder.volume.utils
FLAGS = flags.FLAGS
def output(msg):
if not FLAGS.silent:
print msg
if __name__ == '__main__':
rpc.register_opts(FLAGS)
admin_context = context.get_admin_context()
utils.default_flagfile()
flags.FLAGS(sys.argv)
logging.setup()
begin, end = utils.last_completed_audit_period()
output("Starting volume usage audit")
output("Creating usages for %s until %s" % (str(begin), str(end)))
volumes = db.volume_get_active_by_window(admin_context,
begin,
end)
output("Found %d volumes" % len(volumes))
for volume_ref in volumes:
try:
cinder.volume.utils.notify_usage_exists(
admin_context, volume_ref)
except Exception, e:
output(traceback.format_exc(e))
output("Volume usage audit completed")

@ -356,6 +356,13 @@ def volume_type_destroy(context, name):
return IMPL.volume_type_destroy(context, name)
def volume_get_active_by_window(context, begin, end=None, project_id=None):
"""Get all the volumes inside the window.
Specifying a project_id will filter for a certain project."""
return IMPL.volume_get_active_by_window(context, begin, end, project_id)
####################

@ -885,6 +885,23 @@ def volume_type_destroy(context, name):
'updated_at': literal_column('updated_at')})
@require_context
def volume_get_active_by_window(context, begin, end=None,
project_id=None):
"""Return volumes that were active during window."""
session = get_session()
query = session.query(models.Volume)
query = query.filter(or_(models.Volume.deleted_at == None,
models.Volume.deleted_at > begin))
if end:
query = query.filter(models.Volume.created_at < end)
if project_id:
query = query.filter_by(project_id=project_id)
return query.all()
####################

@ -85,54 +85,6 @@ def wrap_db_error(f):
return _wrap
def wrap_exception(notifier=None, publisher_id=None, event_type=None,
level=None):
"""This decorator wraps a method to catch any exceptions that may
get thrown. It logs the exception as well as optionally sending
it to the notification system.
"""
# TODO(sandy): Find a way to import cinder.openstackc.common.notifier.api
# so we don't have to pass it in as a parameter. Otherwise we get a cyclic
# import of cinder.openstack.common.notifier.api -> cinder.utils ->
# cinder.exception :(
# TODO(johannes): Also, it would be nice to use
# utils.save_and_reraise_exception() without an import loop
def inner(f):
def wrapped(*args, **kw):
try:
return f(*args, **kw)
except Exception, e:
# Save exception since it can be clobbered during processing
# below before we can re-raise
exc_info = sys.exc_info()
if notifier:
payload = dict(args=args, exception=e)
payload.update(kw)
# Use a temp vars so we don't shadow
# our outer definitions.
temp_level = level
if not temp_level:
temp_level = notifier.ERROR
temp_type = event_type
if not temp_type:
# If f has multiple decorators, they must use
# functools.wraps to ensure the name is
# propagated.
temp_type = f.__name__
notifier.notify(publisher_id, temp_type, temp_level,
payload)
# re-raise original exception since it may have been clobbered
raise exc_info[0], exc_info[1], exc_info[2]
return functools.wraps(f)(wrapped)
return inner
class CinderException(Exception):
"""Base Cinder Exception

@ -18,6 +18,7 @@
from cinder import test
from cinder import exception
from cinder import utils
class FakeNotifier(object):
@ -30,7 +31,7 @@ class FakeNotifier(object):
self.provided_priority = None
self.provided_payload = None
def notify(self, publisher, event, priority, payload):
def notify(self, context, publisher, event, priority, payload):
self.provided_publisher = publisher
self.provided_event = event
self.provided_priority = priority
@ -51,21 +52,21 @@ def bad_function_exception():
class WrapExceptionTestCase(test.TestCase):
def test_wrap_exception_good_return(self):
wrapped = exception.wrap_exception()
wrapped = utils.wrap_exception()
self.assertEquals(99, wrapped(good_function)())
def test_wrap_exception_throws_error(self):
wrapped = exception.wrap_exception()
wrapped = utils.wrap_exception()
self.assertRaises(exception.Error, wrapped(bad_function_error))
def test_wrap_exception_throws_exception(self):
wrapped = exception.wrap_exception()
wrapped = utils.wrap_exception()
self.assertRaises(test.TestingException,
wrapped(bad_function_exception))
def test_wrap_exception_with_notifier(self):
notifier = FakeNotifier()
wrapped = exception.wrap_exception(notifier, "publisher", "event",
wrapped = utils.wrap_exception(notifier, "publisher", "event",
"level")
self.assertRaises(test.TestingException,
wrapped(bad_function_exception))
@ -77,7 +78,7 @@ class WrapExceptionTestCase(test.TestCase):
def test_wrap_exception_with_notifier_defaults(self):
notifier = FakeNotifier()
wrapped = exception.wrap_exception(notifier)
wrapped = utils.wrap_exception(notifier)
self.assertRaises(test.TestingException,
wrapped(bad_function_exception))
self.assertEquals(notifier.provided_publisher, None)

@ -36,6 +36,7 @@ from cinder import flags
from cinder.tests.image import fake as fake_image
from cinder.openstack.common import log as os_logging
from cinder.openstack.common import importutils
from cinder.openstack.common.notifier import test_notifier
from cinder.openstack.common import rpc
import cinder.policy
from cinder import quota
@ -56,7 +57,10 @@ class VolumeTestCase(test.TestCase):
volumes_dir=vol_tmpdir)
self.volume = importutils.import_object(FLAGS.volume_manager)
self.context = context.get_admin_context()
self.stubs.Set(cinder.flags.FLAGS, 'notification_driver',
'cinder.openstack.common.notifier.test_notifier')
fake_image.stub_out_image_service(self.stubs)
test_notifier.NOTIFICATIONS = []
def tearDown(self):
try:
@ -68,6 +72,7 @@ class VolumeTestCase(test.TestCase):
@staticmethod
def _create_volume(size='0', snapshot_id=None, image_id=None,
metadata=None):
#def _create_volume(size=0, snapshot_id=None):
"""Create a volume object."""
vol = {}
vol['size'] = size
@ -86,11 +91,14 @@ class VolumeTestCase(test.TestCase):
"""Test volume can be created and deleted."""
volume = self._create_volume()
volume_id = volume['id']
self.assertEquals(len(test_notifier.NOTIFICATIONS), 0)
self.volume.create_volume(self.context, volume_id)
self.assertEquals(len(test_notifier.NOTIFICATIONS), 2)
self.assertEqual(volume_id, db.volume_get(context.get_admin_context(),
volume_id).id)
self.volume.delete_volume(self.context, volume_id)
self.assertEquals(len(test_notifier.NOTIFICATIONS), 4)
self.assertRaises(exception.NotFound,
db.volume_get,
self.context,
@ -587,6 +595,30 @@ class VolumeTestCase(test.TestCase):
'name',
'description')
def test_create_volume_usage_notification(self):
"""Ensure create volume generates appropriate usage notification"""
volume = self._create_volume()
volume_id = volume['id']
self.assertEquals(len(test_notifier.NOTIFICATIONS), 0)
self.volume.create_volume(self.context, volume_id)
self.assertEquals(len(test_notifier.NOTIFICATIONS), 2)
msg = test_notifier.NOTIFICATIONS[0]
self.assertEquals(msg['event_type'], 'volume.create.start')
msg = test_notifier.NOTIFICATIONS[1]
self.assertEquals(msg['priority'], 'INFO')
self.assertEquals(msg['event_type'], 'volume.create.end')
payload = msg['payload']
self.assertEquals(payload['tenant_id'], volume['project_id'])
self.assertEquals(payload['user_id'], volume['user_id'])
self.assertEquals(payload['volume_id'], volume['id'])
self.assertEquals(payload['status'], 'creating')
self.assertEquals(payload['size'], volume['size'])
self.assertTrue('display_name' in payload)
self.assertTrue('snapshot_id' in payload)
self.assertTrue('launched_at' in payload)
self.assertTrue('created_at' in payload)
self.volume.delete_volume(self.context, volume_id)
class DriverTestCase(test.TestCase):
"""Base Test class for Drivers."""

@ -0,0 +1,86 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Tests For miscellaneous util methods used with volume."""
from cinder import db
from cinder import flags
from cinder import context
from cinder import test
from cinder.volume import utils as volume_utils
from cinder.openstack.common import importutils
from cinder.openstack.common import log as logging
from cinder.openstack.common.notifier import test_notifier
LOG = logging.getLogger(__name__)
FLAGS = flags.FLAGS
class UsageInfoTestCase(test.TestCase):
def setUp(self):
super(UsageInfoTestCase, self).setUp()
self.flags(connection_type='fake',
host='fake',
notification_driver='cinder.openstack.common.notifier.test_notifier')
self.stubs.Set(flags.FLAGS, 'notification_driver',
'cinder.openstack.common.notifier.test_notifier')
self.volume = importutils.import_object(FLAGS.volume_manager)
self.user_id = 'fake'
self.project_id = 'fake'
self.snapshot_id = 'fake'
self.volume_size = 0
self.context = context.RequestContext(self.user_id, self.project_id)
test_notifier.NOTIFICATIONS = []
def _create_volume(self, params={}):
"""Create a test volume"""
vol = {}
vol['snapshot_id'] = self.snapshot_id
vol['user_id'] = self.user_id
vol['project_id'] = self.project_id
vol['host'] = FLAGS.host
vol['availability_zone'] = FLAGS.storage_availability_zone
vol['status'] = "creating"
vol['attach_status'] = "detached"
vol['size'] = self.volume_size
vol.update(params)
return db.volume_create(self.context, vol)['id']
def test_notify_usage_exists(self):
"""Ensure 'exists' notification generates appropriate usage data."""
volume_id = self._create_volume()
volume = db.volume_get(self.context, volume_id)
volume_utils.notify_usage_exists(self.context, volume)
LOG.info("%r" % test_notifier.NOTIFICATIONS)
self.assertEquals(len(test_notifier.NOTIFICATIONS), 1)
msg = test_notifier.NOTIFICATIONS[0]
self.assertEquals(msg['priority'], 'INFO')
self.assertEquals(msg['event_type'], 'volume.exists')
payload = msg['payload']
self.assertEquals(payload['tenant_id'], self.project_id)
self.assertEquals(payload['user_id'], self.user_id)
self.assertEquals(payload['snapshot_id'], self.snapshot_id)
self.assertEquals(payload['volume_id'], volume.id)
self.assertEquals(payload['size'], self.volume_size)
for attr in ('display_name', 'created_at', 'launched_at',
'status', 'audit_period_beginning',
'audit_period_ending'):
self.assertTrue(attr in payload,
msg="Key %s not in payload" % attr)
db.volume_destroy(context.get_admin_context(), volume['id'])

@ -1,4 +1,5 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
@ -25,6 +26,7 @@ import errno
import functools
import hashlib
import inspect
import itertools
import os
import pyclbr
import random
@ -1127,3 +1129,71 @@ class UndoManager(object):
LOG.exception(msg, **kwargs)
self._rollback()
def wrap_exception(notifier=None, publisher_id=None, event_type=None,
level=None):
"""This decorator wraps a method to catch any exceptions that may
get thrown. It logs the exception as well as optionally sending
it to the notification system.
"""
# TODO(sandy): Find a way to import cinder.notifier.api so we don't have
# to pass it in as a parameter. Otherwise we get a cyclic import of
# cinder.notifier.api -> cinder.utils -> cinder.exception :(
# TODO(johannes): Also, it would be nice to use
# utils.save_and_reraise_exception() without an import loop
def inner(f):
def wrapped(*args, **kw):
try:
return f(*args, **kw)
except Exception, e:
# Save exception since it can be clobbered during processing
# below before we can re-raise
exc_info = sys.exc_info()
if notifier:
payload = dict(args=args, exception=e)
payload.update(kw)
# Use a temp vars so we don't shadow
# our outer definitions.
temp_level = level
if not temp_level:
temp_level = notifier.ERROR
temp_type = event_type
if not temp_type:
# If f has multiple decorators, they must use
# functools.wraps to ensure the name is
# propagated.
temp_type = f.__name__
context = get_context_from_function_and_args(f,
args,
kw)
notifier.notify(context, publisher_id, temp_type,
temp_level, payload)
# re-raise original exception since it may have been clobbered
raise exc_info[0], exc_info[1], exc_info[2]
return functools.wraps(f)(wrapped)
return inner
def get_context_from_function_and_args(function, args, kwargs):
"""Find an arg of type RequestContext and return it.
This is useful in a couple of decorators where we don't
know much about the function we're wrapping.
"""
# import here to avoid circularity:
from cinder import context
for arg in itertools.chain(kwargs.values(), args):
if isinstance(arg, context.RequestContext):
return arg
return None

@ -48,6 +48,7 @@ from cinder.openstack.common import excutils
from cinder.openstack.common import importutils
from cinder.openstack.common import timeutils
from cinder import utils
from cinder.volume import utils as volume_utils
LOG = logging.getLogger(__name__)
@ -103,6 +104,7 @@ class VolumeManager(manager.SchedulerDependentManager):
"""Creates and exports the volume."""
context = context.elevated()
volume_ref = self.db.volume_get(context, volume_id)
self._notify_about_volume_usage(context, volume_ref, "create.start")
LOG.info(_("volume %s: creating"), volume_ref['name'])
self.db.volume_update(context,
@ -151,6 +153,7 @@ class VolumeManager(manager.SchedulerDependentManager):
if image_id:
#copy the image onto the volume.
self._copy_image_to_volume(context, volume_ref, image_id)
self._notify_about_volume_usage(context, volume_ref, "create.end")
return volume_ref['id']
def delete_volume(self, context, volume_id):
@ -164,6 +167,7 @@ class VolumeManager(manager.SchedulerDependentManager):
raise exception.InvalidVolume(
reason=_("Volume is not local to this node"))
self._notify_about_volume_usage(context, volume_ref, "delete.start")
self._reset_stats()
try:
LOG.debug(_("volume %s: removing export"), volume_ref['name'])
@ -184,6 +188,7 @@ class VolumeManager(manager.SchedulerDependentManager):
self.db.volume_destroy(context, volume_id)
LOG.debug(_("volume %s: deleted successfully"), volume_ref['name'])
self._notify_about_volume_usage(context, volume_ref, "delete.end")
return True
def create_snapshot(self, context, volume_id, snapshot_id):
@ -386,3 +391,9 @@ class VolumeManager(manager.SchedulerDependentManager):
def notification(self, context, event):
LOG.info(_("Notification {%s} received"), event)
self._reset_stats()
def _notify_about_volume_usage(self, context, volume, event_suffix,
extra_usage_info=None):
volume_utils.notify_about_volume_usage(
context, volume, event_suffix,
extra_usage_info=extra_usage_info, host=self.host)

83
cinder/volume/utils.py Normal file

@ -0,0 +1,83 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2012 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Volume-related Utilities and helpers."""
from cinder import flags
from cinder import utils
from cinder.openstack.common.notifier import api as notifier_api
from cinder.openstack.common import log as logging
FLAGS = flags.FLAGS
LOG = logging.getLogger(__name__)
def notify_usage_exists(context, volume_ref, current_period=False):
""" Generates 'exists' notification for a volume for usage auditing
purposes.
Generates usage for last completed period, unless 'current_period'
is True."""
begin, end = utils.last_completed_audit_period()
if current_period:
audit_start = end
audit_end = utils.utcnow()
else:
audit_start = begin
audit_end = end
extra_usage_info = dict(audit_period_beginning=str(audit_start),
audit_period_ending=str(audit_end))
notify_about_volume_usage(
context, volume_ref, 'exists', extra_usage_info=extra_usage_info)
def _usage_from_volume(context, volume_ref, **kw):
def null_safe_str(s):
return str(s) if s else ''
usage_info = dict(
tenant_id=volume_ref['project_id'],
user_id=volume_ref['user_id'],
volume_id=volume_ref['id'],
volume_type=volume_ref['volume_type'],
display_name=volume_ref['display_name'],
launched_at=null_safe_str(volume_ref['launched_at']),
created_at=null_safe_str(volume_ref['created_at']),
status=volume_ref['status'],
snapshot_id=volume_ref['snapshot_id'],
size=volume_ref['size'])
usage_info.update(kw)
return usage_info
def notify_about_volume_usage(context, volume, event_suffix,
extra_usage_info=None, host=None):
if not host:
host = FLAGS.host
if not extra_usage_info:
extra_usage_info = {}
usage_info = _usage_from_volume(
context, volume, **extra_usage_info)
notifier_api.notify(context, 'volume.%s' % host,
'volume.%s' % event_suffix,
notifier_api.INFO, usage_info)

@ -38,5 +38,6 @@ setuptools.setup(name='cinder',
'bin/cinder-manage',
'bin/cinder-rootwrap',
'bin/cinder-scheduler',
'bin/volume-usage-audit',
'bin/cinder-volume'],
py_modules=[])