Clean up expired user messages
Use periodic task to clean up expired messages. Change-Id: Ia44f46497b8a515de73e73c0ad70966a094a9b76 Partial-Implements: blueprint summarymessage
This commit is contained in:
parent
4d71d60da6
commit
dd42d0af96
@ -1651,6 +1651,11 @@ def message_destroy(context, message_id):
|
|||||||
return IMPL.message_destroy(context, message_id)
|
return IMPL.message_destroy(context, message_id)
|
||||||
|
|
||||||
|
|
||||||
|
def cleanup_expired_messages(context):
|
||||||
|
"""Soft delete expired messages"""
|
||||||
|
return IMPL.cleanup_expired_messages(context)
|
||||||
|
|
||||||
|
|
||||||
###################
|
###################
|
||||||
|
|
||||||
|
|
||||||
|
@ -6224,6 +6224,17 @@ def message_destroy(context, message):
|
|||||||
return updated_values
|
return updated_values
|
||||||
|
|
||||||
|
|
||||||
|
@require_admin_context
|
||||||
|
def cleanup_expired_messages(context):
|
||||||
|
session = get_session()
|
||||||
|
now = timeutils.utcnow()
|
||||||
|
with session.begin():
|
||||||
|
# NOTE(tommylikehu): Directly delete the expired
|
||||||
|
# messages here.
|
||||||
|
return session.query(models.Message).filter(
|
||||||
|
models.Message.expires_at < now).delete()
|
||||||
|
|
||||||
|
|
||||||
###############################
|
###############################
|
||||||
|
|
||||||
|
|
||||||
|
@ -0,0 +1,20 @@
|
|||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
from sqlalchemy import Index, MetaData, Table
|
||||||
|
|
||||||
|
|
||||||
|
def upgrade(migrate_engine):
|
||||||
|
meta = MetaData(migrate_engine)
|
||||||
|
|
||||||
|
messages = Table('messages', meta, autoload=True)
|
||||||
|
return Index('messages_expire_at_idx', messages.c.expires_at)
|
@ -24,7 +24,11 @@ from cinder.message import defined_messages
|
|||||||
|
|
||||||
messages_opts = [
|
messages_opts = [
|
||||||
cfg.IntOpt('message_ttl', default=2592000,
|
cfg.IntOpt('message_ttl', default=2592000,
|
||||||
help='message minimum life in seconds.')]
|
help='message minimum life in seconds.'),
|
||||||
|
cfg.IntOpt('message_reap_interval', default=86400,
|
||||||
|
help='interval between period task to clean expired messages.')
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
CONF = cfg.CONF
|
CONF = cfg.CONF
|
||||||
CONF.register_opts(messages_opts)
|
CONF.register_opts(messages_opts)
|
||||||
@ -80,3 +84,8 @@ class API(base.Base):
|
|||||||
"""Delete message with the specified id."""
|
"""Delete message with the specified id."""
|
||||||
ctx = context.elevated()
|
ctx = context.elevated()
|
||||||
return self.db.message_destroy(ctx, id)
|
return self.db.message_destroy(ctx, id)
|
||||||
|
|
||||||
|
def cleanup_expired_messages(self, context):
|
||||||
|
ctx = context.elevated()
|
||||||
|
count = self.db.cleanup_expired_messages(ctx)
|
||||||
|
LOG.info("Deleted %s expired messages.", count)
|
||||||
|
@ -26,6 +26,7 @@ import eventlet
|
|||||||
from oslo_config import cfg
|
from oslo_config import cfg
|
||||||
from oslo_log import log as logging
|
from oslo_log import log as logging
|
||||||
import oslo_messaging as messaging
|
import oslo_messaging as messaging
|
||||||
|
from oslo_service import periodic_task
|
||||||
from oslo_utils import excutils
|
from oslo_utils import excutils
|
||||||
from oslo_utils import importutils
|
from oslo_utils import importutils
|
||||||
from oslo_utils import timeutils
|
from oslo_utils import timeutils
|
||||||
@ -38,6 +39,7 @@ from cinder import exception
|
|||||||
from cinder import flow_utils
|
from cinder import flow_utils
|
||||||
from cinder.i18n import _
|
from cinder.i18n import _
|
||||||
from cinder import manager
|
from cinder import manager
|
||||||
|
from cinder.message import api as mess_api
|
||||||
from cinder import objects
|
from cinder import objects
|
||||||
from cinder import quota
|
from cinder import quota
|
||||||
from cinder import rpc
|
from cinder import rpc
|
||||||
@ -75,6 +77,7 @@ class SchedulerManager(manager.CleanableManager, manager.Manager):
|
|||||||
self._startup_delay = True
|
self._startup_delay = True
|
||||||
self.volume_api = volume_rpcapi.VolumeAPI()
|
self.volume_api = volume_rpcapi.VolumeAPI()
|
||||||
self.sch_api = scheduler_rpcapi.SchedulerAPI()
|
self.sch_api = scheduler_rpcapi.SchedulerAPI()
|
||||||
|
self.message_api = mess_api.API()
|
||||||
self.rpc_api_version = versionutils.convert_version_to_int(
|
self.rpc_api_version = versionutils.convert_version_to_int(
|
||||||
self.RPC_API_VERSION)
|
self.RPC_API_VERSION)
|
||||||
|
|
||||||
@ -91,6 +94,11 @@ class SchedulerManager(manager.CleanableManager, manager.Manager):
|
|||||||
self.sch_api = scheduler_rpcapi.SchedulerAPI()
|
self.sch_api = scheduler_rpcapi.SchedulerAPI()
|
||||||
self.driver.reset()
|
self.driver.reset()
|
||||||
|
|
||||||
|
@periodic_task.periodic_task(spacing=CONF.message_reap_interval,
|
||||||
|
run_immediately=True)
|
||||||
|
def _clean_expired_messages(self, context):
|
||||||
|
self.message_api.cleanup_expired_messages(context)
|
||||||
|
|
||||||
def update_service_capabilities(self, context, service_name=None,
|
def update_service_capabilities(self, context, service_name=None,
|
||||||
host=None, capabilities=None,
|
host=None, capabilities=None,
|
||||||
cluster_name=None, timestamp=None,
|
cluster_name=None, timestamp=None,
|
||||||
|
@ -106,6 +106,13 @@ class MessageApiTest(test.TestCase):
|
|||||||
self.message_api.db.message_destroy.assert_called_once_with(
|
self.message_api.db.message_destroy.assert_called_once_with(
|
||||||
admin_context, 'fake_id')
|
admin_context, 'fake_id')
|
||||||
|
|
||||||
|
def test_cleanup_expired_messages(self):
|
||||||
|
admin_context = mock.Mock()
|
||||||
|
self.mock_object(self.ctxt, 'elevated', return_value=admin_context)
|
||||||
|
self.message_api.cleanup_expired_messages(self.ctxt)
|
||||||
|
self.message_api.db.cleanup_expired_messages.assert_called_once_with(
|
||||||
|
admin_context)
|
||||||
|
|
||||||
def create_message_for_tests(self):
|
def create_message_for_tests(self):
|
||||||
"""Create messages to test pagination functionality"""
|
"""Create messages to test pagination functionality"""
|
||||||
utils.create_message(
|
utils.create_message(
|
||||||
|
@ -93,6 +93,13 @@ class SchedulerManagerTestCase(test.TestCase):
|
|||||||
volume_rpcapi.client.serializer._base.version_cap)
|
volume_rpcapi.client.serializer._base.version_cap)
|
||||||
self.assertIsNone(volume_rpcapi.client.serializer._base.manifest)
|
self.assertIsNone(volume_rpcapi.client.serializer._base.manifest)
|
||||||
|
|
||||||
|
@mock.patch('cinder.message.api.API.cleanup_expired_messages')
|
||||||
|
def test__clean_expired_messages(self, mock_clean):
|
||||||
|
|
||||||
|
self.manager._clean_expired_messages(self.context)
|
||||||
|
|
||||||
|
mock_clean.assert_called_once_with(self.context)
|
||||||
|
|
||||||
@mock.patch('cinder.scheduler.driver.Scheduler.'
|
@mock.patch('cinder.scheduler.driver.Scheduler.'
|
||||||
'update_service_capabilities')
|
'update_service_capabilities')
|
||||||
def test_update_service_capabilities_empty_dict(self, _mock_update_cap):
|
def test_update_service_capabilities_empty_dict(self, _mock_update_cap):
|
||||||
|
@ -2155,6 +2155,40 @@ class DBAPIReservationTestCase(BaseTest):
|
|||||||
'project1'))
|
'project1'))
|
||||||
|
|
||||||
|
|
||||||
|
class DBAPIMessageTestCase(BaseTest):
|
||||||
|
|
||||||
|
"""Tests for message operations"""
|
||||||
|
def setUp(self):
|
||||||
|
super(DBAPIMessageTestCase, self).setUp()
|
||||||
|
self.context = context.get_admin_context()
|
||||||
|
|
||||||
|
def _create_fake_messages(self, m_id, time):
|
||||||
|
db.message_create(self.context,
|
||||||
|
{'id': m_id,
|
||||||
|
'event_id': m_id,
|
||||||
|
'message_level': 'error',
|
||||||
|
'project_id': 'fake_id',
|
||||||
|
'expires_at': time})
|
||||||
|
|
||||||
|
def test_cleanup_expired_messages(self):
|
||||||
|
now = timeutils.utcnow()
|
||||||
|
# message expired 1 day ago
|
||||||
|
self._create_fake_messages(
|
||||||
|
uuidutils.generate_uuid(), now - datetime.timedelta(days=1))
|
||||||
|
# message expired now
|
||||||
|
self._create_fake_messages(
|
||||||
|
uuidutils.generate_uuid(), now)
|
||||||
|
# message expired 1 day after
|
||||||
|
self._create_fake_messages(
|
||||||
|
uuidutils.generate_uuid(), now + datetime.timedelta(days=1))
|
||||||
|
|
||||||
|
with mock.patch.object(timeutils, 'utcnow') as mock_time_now:
|
||||||
|
mock_time_now.return_value = now
|
||||||
|
db.cleanup_expired_messages(self.context)
|
||||||
|
messages = db.message_get_all(self.context)
|
||||||
|
self.assertEqual(2, len(messages))
|
||||||
|
|
||||||
|
|
||||||
class DBAPIQuotaClassTestCase(BaseTest):
|
class DBAPIQuotaClassTestCase(BaseTest):
|
||||||
|
|
||||||
"""Tests for db.api.quota_class_* methods."""
|
"""Tests for db.api.quota_class_* methods."""
|
||||||
|
@ -0,0 +1,5 @@
|
|||||||
|
---
|
||||||
|
features:
|
||||||
|
- Added perodic task to clean expired messages in
|
||||||
|
cinder scheduler, also added a configuration option
|
||||||
|
``message_reap_interval`` to handle the interval.
|
Loading…
x
Reference in New Issue
Block a user