diff --git a/cinder/db/sqlalchemy/api.py b/cinder/db/sqlalchemy/api.py index 96dccc7d83d..8c4fc25044d 100644 --- a/cinder/db/sqlalchemy/api.py +++ b/cinder/db/sqlalchemy/api.py @@ -1936,28 +1936,31 @@ def sm_volume_get_all(context): @require_context def backup_get(context, backup_id, session=None): result = model_query(context, models.Backup, - read_deleted="yes").filter_by(id=backup_id).first() + session=session, project_only=True).\ + filter_by(id=backup_id).\ + first() + if not result: raise exception.BackupNotFound(backup_id=backup_id) + return result @require_admin_context def backup_get_all(context): - return model_query(context, models.Backup, read_deleted="yes").all() + return model_query(context, models.Backup).all() @require_admin_context def backup_get_all_by_host(context, host): - return model_query(context, models.Backup, - read_deleted="yes").filter_by(host=host).all() + return model_query(context, models.Backup).filter_by(host=host).all() @require_context def backup_get_all_by_project(context, project_id): authorize_project_context(context, project_id) - return model_query(context, models.Backup, read_deleted="yes").\ + return model_query(context, models.Backup).\ filter_by(project_id=project_id).all() @@ -1992,5 +1995,9 @@ def backup_update(context, backup_id, values): def backup_destroy(context, backup_id): session = get_session() with session.begin(): - model_query(context, models.Backup, - read_deleted="yes").filter_by(id=backup_id).delete() + session.query(models.Backup).\ + filter_by(id=backup_id).\ + update({'status': 'deleted', + 'deleted': True, + 'deleted_at': timeutils.utcnow(), + 'updated_at': literal_column('updated_at')}) diff --git a/cinder/tests/test_backup.py b/cinder/tests/test_backup.py index 70a8361185f..bffddb36c86 100644 --- a/cinder/tests/test_backup.py +++ b/cinder/tests/test_backup.py @@ -25,6 +25,7 @@ from cinder import exception from cinder import flags from cinder.openstack.common import importutils from cinder.openstack.common import log as logging +from cinder.openstack.common import timeutils from cinder import test FLAGS = flags.FLAGS @@ -341,6 +342,12 @@ class BackupTestCase(test.TestCase): self.ctxt, backup_id) + ctxt_read_deleted = context.get_admin_context('yes') + backup = db.backup_get(ctxt_read_deleted, backup_id) + self.assertEqual(backup.deleted, True) + self.assertTrue(timeutils.utcnow() > backup.deleted_at) + self.assertEqual(backup.status, 'deleted') + def test_list_backup(self): backups = db.backup_get_all_by_project(self.ctxt, 'project1') self.assertEqual(len(backups), 0) @@ -350,3 +357,39 @@ class BackupTestCase(test.TestCase): backups = db.backup_get_all_by_project(self.ctxt, 'project1') self.assertEqual(len(backups), 1) self.assertEqual(backups[0].id, b2) + + def test_backup_get_all_by_project_with_deleted(self): + """Test deleted backups don't show up in backup_get_all_by_project. + Unless context.read_deleted is 'yes'""" + backups = db.backup_get_all_by_project(self.ctxt, 'fake') + self.assertEqual(len(backups), 0) + + backup_id_keep = self._create_backup_db_entry() + backup_id = self._create_backup_db_entry() + db.backup_destroy(self.ctxt, backup_id) + + backups = db.backup_get_all_by_project(self.ctxt, 'fake') + self.assertEqual(len(backups), 1) + self.assertEqual(backups[0].id, backup_id_keep) + + ctxt_read_deleted = context.get_admin_context('yes') + backups = db.backup_get_all_by_project(ctxt_read_deleted, 'fake') + self.assertEqual(len(backups), 2) + + def test_backup_get_all_by_host_with_deleted(self): + """Test deleted backups don't show up in backup_get_all_by_project. + Unless context.read_deleted is 'yes'""" + backups = db.backup_get_all_by_host(self.ctxt, 'testhost') + self.assertEqual(len(backups), 0) + + backup_id_keep = self._create_backup_db_entry() + backup_id = self._create_backup_db_entry() + db.backup_destroy(self.ctxt, backup_id) + + backups = db.backup_get_all_by_host(self.ctxt, 'testhost') + self.assertEqual(len(backups), 1) + self.assertEqual(backups[0].id, backup_id_keep) + + ctxt_read_deleted = context.get_admin_context('yes') + backups = db.backup_get_all_by_host(ctxt_read_deleted, 'testhost') + self.assertEqual(len(backups), 2)