diff --git a/cinder/tests/unit/volume/test_manage_volume.py b/cinder/tests/unit/volume/test_manage_volume.py
new file mode 100644
index 00000000000..1df5e51b347
--- /dev/null
+++ b/cinder/tests/unit/volume/test_manage_volume.py
@@ -0,0 +1,175 @@
+# Copyright (c) 2016 Chuck Fouts. All rights reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+import mock
+
+from cinder.tests.unit import fake_constants as fake
+from cinder.tests.unit import fake_volume
+from cinder.tests.unit import test_volume
+
+from cinder import context
+from cinder import exception
+from cinder import objects
+from cinder.volume.flows.manager import manage_existing
+from cinder.volume import manager
+from cinder.volume import utils
+
+FAKE_HOST_POOL = 'volPool'
+FAKE_HOST = 'hostname@backend'
+
+
+class ManageVolumeTestCase(test_volume.BaseVolumeTestCase):
+
+    def setUp(self):
+        super(ManageVolumeTestCase, self).setUp()
+        self.context = context.RequestContext(fake.USER_ID, fake.PROJECT_ID,
+                                              True)
+        self.manager = manager.VolumeManager()
+        self.manager.stats = {'allocated_capacity_gb': 0, 'pools': {}}
+
+    @staticmethod
+    def _stub_volume_object_get(cls, host=FAKE_HOST):
+        volume = {
+            'id': fake.VOLUME_ID,
+            'size': 1,
+            'name': fake.VOLUME_NAME,
+            'host': host,
+        }
+        return fake_volume.fake_volume_obj(cls.context, **volume)
+
+    def test_manage_existing(self):
+        volume_object = self._stub_volume_object_get(self)
+        mock_object_volume = self.mock_object(
+            objects.Volume, 'get_by_id', mock.Mock(return_value=volume_object))
+        mock_run_flow_engine = self.mock_object(
+            self.manager, '_run_manage_existing_flow_engine',
+            mock.Mock(return_value=volume_object))
+        mock_update_volume_stats = self.mock_object(
+            self.manager, '_update_stats_for_managed')
+
+        result = self.manager.manage_existing(self.context, volume_object.id)
+
+        self.assertEqual(fake.VOLUME_ID, result)
+        mock_object_volume.assert_called_once_with(self.context,
+                                                   volume_object.id)
+        mock_run_flow_engine.assert_called_once_with(self.context,
+                                                     volume_object,
+                                                     None)
+        mock_update_volume_stats.assert_called_once_with(volume_object)
+
+    def test_manage_existing_with_volume_object(self):
+        volume_object = self._stub_volume_object_get(self)
+        mock_object_volume = self.mock_object(objects.Volume, 'get_by_id')
+        mock_run_flow_engine = self.mock_object(
+            self.manager, '_run_manage_existing_flow_engine',
+            mock.Mock(return_value=volume_object))
+        mock_update_volume_stats = self.mock_object(
+            self.manager, '_update_stats_for_managed')
+
+        result = self.manager.manage_existing(
+            self.context, volume_object.id, volume=volume_object)
+
+        self.assertEqual(fake.VOLUME_ID, result)
+        mock_object_volume.assert_not_called()
+        mock_run_flow_engine.assert_called_once_with(self.context,
+                                                     volume_object,
+                                                     None)
+        mock_update_volume_stats.assert_called_once_with(volume_object)
+
+    def test_run_manage_existing_flow_engine(self):
+        mock_volume = mock.Mock()
+        volume_object = self._stub_volume_object_get(self)
+
+        mock_flow_engine = mock.Mock()
+        mock_flow_engine_run = self.mock_object(mock_flow_engine, 'run')
+        mock_flow_engine_fetch = self.mock_object(
+            mock_flow_engine.storage, 'fetch',
+            mock.Mock(return_value=volume_object))
+        mock_get_flow = self.mock_object(
+            manage_existing, 'get_flow',
+            mock.Mock(return_value=mock_flow_engine))
+
+        result = self.manager._run_manage_existing_flow_engine(self.context,
+                                                               mock_volume,
+                                                               None)
+
+        self.assertEqual(volume_object, result)
+
+        mock_get_flow.assert_called_once_with(self.context,
+                                              self.manager.db,
+                                              self.manager.driver,
+                                              self.manager.host,
+                                              mock_volume,
+                                              None)
+        mock_flow_engine_run.assert_called_once_with()
+        mock_flow_engine_fetch.assert_called_once_with('volume')
+
+    def test_run_manage_existing_flow_engine_exception(self):
+        mock_get_flow = self.mock_object(
+            manage_existing, 'get_flow',
+            mock.Mock(side_effect=Exception))
+        volume_object = self._stub_volume_object_get(self)
+        self.assertRaises(exception.CinderException,
+                          self.manager._run_manage_existing_flow_engine,
+                          self.context,
+                          volume_object,
+                          None)
+
+        mock_get_flow.assert_called_once_with(self.context,
+                                              self.manager.db,
+                                              self.manager.driver,
+                                              self.manager.host,
+                                              volume_object,
+                                              None)
+
+    def test_update_stats_for_managed(self):
+        volume_object = self._stub_volume_object_get(self,
+                                                     host=FAKE_HOST +
+                                                     '#volPool')
+        self.manager._update_stats_for_managed(volume_object)
+        backend_stats = self.manager.stats['pools'][FAKE_HOST_POOL]
+        self.assertEqual(
+            1, backend_stats['allocated_capacity_gb'])
+
+    def test_update_stats_for_managed_no_pool(self):
+        safe_get_backend = 'safe_get_backend'
+        volume_obj = self._stub_volume_object_get(self)
+        mock_safe_get = self.mock_object(
+            self.manager.driver.configuration, 'safe_get',
+            mock.Mock(return_value=safe_get_backend))
+
+        self.manager._update_stats_for_managed(volume_obj)
+
+        mock_safe_get.assert_called_once_with('volume_backend_name')
+        backend_stats = self.manager.stats['pools'][safe_get_backend]
+        self.assertEqual(1, backend_stats['allocated_capacity_gb'])
+
+    def test_update_stats_for_managed_default_backend(self):
+        volume_obj = self._stub_volume_object_get(self)
+        mock_safe_get = self.mock_object(
+            self.manager.driver.configuration, 'safe_get',
+            mock.Mock(return_value=None))
+
+        self.manager._update_stats_for_managed(volume_obj)
+
+        mock_safe_get.assert_called_once_with('volume_backend_name')
+        backend_stats = self.manager.stats['pools'][utils.DEFAULT_POOL_NAME]
+        self.assertEqual(1, backend_stats['allocated_capacity_gb'])
+
+    def test_update_stats_key_error(self):
+        self.manager.stats = {}
+
+        self.assertRaises(
+            KeyError, self.manager._update_stats_for_managed,
+            self._stub_volume_object_get(self))
diff --git a/cinder/volume/flows/api/manage_existing.py b/cinder/volume/flows/api/manage_existing.py
index fcbe0e25bc5..3cb800ae4c2 100644
--- a/cinder/volume/flows/api/manage_existing.py
+++ b/cinder/volume/flows/api/manage_existing.py
@@ -65,11 +65,11 @@ class EntryCreateTask(flow_utils.CinderTask):
             'host': kwargs.pop('host'),
             'availability_zone': kwargs.pop('availability_zone'),
             'volume_type_id': volume_type_id,
-            'metadata': kwargs.pop('metadata'),
+            'metadata': kwargs.pop('metadata') or {},
             'bootable': kwargs.pop('bootable'),
         }
 
-        volume = objects.Volume(context, volume_properties)
+        volume = objects.Volume(context=context, **volume_properties)
         volume.create()
 
         return {
diff --git a/cinder/volume/manager.py b/cinder/volume/manager.py
index 0454175b819..06f76408810 100644
--- a/cinder/volume/manager.py
+++ b/cinder/volume/manager.py
@@ -2226,21 +2226,46 @@ class VolumeManager(manager.SchedulerDependentManager):
         if volume is None:
             # For older clients, mimic the old behavior and look up the volume
             # by its volume_id.
-            volume = objects.Volume.get_by_id(context, volume_id)
+            volume = objects.Volume.get_by_id(ctxt, volume_id)
 
+        vol_ref = self._run_manage_existing_flow_engine(
+            ctxt, volume, ref)
+
+        self._update_stats_for_managed(vol_ref)
+
+        LOG.info(_LI("Manage existing volume completed successfully."),
+                 resource=vol_ref)
+        return vol_ref.id
+
+    def _update_stats_for_managed(self, volume_reference):
+        # Update volume stats
+        pool = vol_utils.extract_host(volume_reference.host, 'pool')
+        if pool is None:
+            # Legacy volume, put them into default pool
+            pool = self.driver.configuration.safe_get(
+                'volume_backend_name') or vol_utils.extract_host(
+                    volume_reference.host, 'pool', True)
+
+        try:
+            self.stats['pools'][pool]['allocated_capacity_gb'] \
+                += volume_reference.size
+        except KeyError:
+            self.stats['pools'][pool] = dict(
+                allocated_capacity_gb=volume_reference.size)
+
+    def _run_manage_existing_flow_engine(self, ctxt, volume, ref):
         try:
             flow_engine = manage_existing.get_flow(
                 ctxt,
                 self.db,
                 self.driver,
                 self.host,
-                volume_id,
-                ref,
                 volume,
+                ref,
             )
         except Exception:
             msg = _("Failed to create manage_existing flow.")
-            LOG.exception(msg, resource={'type': 'volume', 'id': volume_id})
+            LOG.exception(msg, resource={'type': 'volume', 'id': volume.id})
             raise exception.CinderException(msg)
 
         with flow_utils.DynamicLogListener(flow_engine, logger=LOG):
@@ -2248,24 +2273,8 @@ class VolumeManager(manager.SchedulerDependentManager):
 
         # Fetch created volume from storage
         vol_ref = flow_engine.storage.fetch('volume')
-        # Update volume stats
-        pool = vol_utils.extract_host(vol_ref['host'], 'pool')
-        if pool is None:
-            # Legacy volume, put them into default pool
-            pool = self.driver.configuration.safe_get(
-                'volume_backend_name') or vol_utils.extract_host(
-                    vol_ref['host'], 'pool', True)
 
-        try:
-            self.stats['pools'][pool]['allocated_capacity_gb'] \
-                += vol_ref['size']
-        except KeyError:
-            self.stats['pools'][pool] = dict(
-                allocated_capacity_gb=vol_ref['size'])
-
-        LOG.info(_LI("Manage existing volume completed successfully."),
-                 resource=vol_ref)
-        return vol_ref['id']
+        return vol_ref
 
     def get_manageable_volumes(self, ctxt, marker, limit, offset, sort_keys,
                                sort_dirs):