Merge "Tiramisu: replication group support"

This commit is contained in:
Jenkins 2017-06-15 21:47:51 +00:00 committed by Gerrit Code Review
commit 9769c6c463
26 changed files with 1424 additions and 15 deletions

View File

@ -91,6 +91,7 @@ REST_API_VERSION_HISTORY = """
* 3.35 - Add ``volume-type`` filter to Get-Pools API.
* 3.36 - Add metadata to volumes/summary response body.
* 3.37 - Support sort backup by "name".
* 3.38 - Add replication group API (Tiramisu).
"""
# The minimum and maximum versions of the API supported
@ -98,7 +99,7 @@ REST_API_VERSION_HISTORY = """
# minimum version of the API supported.
# Explicitly using /v1 or /v2 endpoints will still work
_MIN_API_VERSION = "3.0"
_MAX_API_VERSION = "3.37"
_MAX_API_VERSION = "3.38"
_LEGACY_API_VERSION1 = "1.0"
_LEGACY_API_VERSION2 = "2.0"

View File

@ -325,3 +325,8 @@ user documentation.
3.37
----
Support sort backup by "name".
3.38
----
Added enable_replication/disable_replication/failover_replication/
list_replication_targets for replication groups (Tiramisu).

View File

@ -34,6 +34,7 @@ LOG = logging.getLogger(__name__)
GROUP_API_VERSION = '3.13'
GROUP_CREATE_FROM_SRC_API_VERSION = '3.14'
GROUP_REPLICATION_API_VERSION = '3.38'
class GroupsController(wsgi.Controller):
@ -372,6 +373,111 @@ class GroupsController(wsgi.Controller):
return webob.Response(status_int=http_client.ACCEPTED)
@wsgi.Controller.api_version(GROUP_REPLICATION_API_VERSION)
@wsgi.action("enable_replication")
def enable_replication(self, req, id, body):
"""Enables replications for a group."""
context = req.environ['cinder.context']
if body:
if not self.is_valid_body(body, 'enable_replication'):
msg = _("Missing required element 'enable_replication' in "
"request body.")
raise exc.HTTPBadRequest(explanation=msg)
LOG.info('Enable replication group with id: %s.', id,
context=context)
try:
group = self.group_api.get(context, id)
self.group_api.enable_replication(context, group)
# Not found exception will be handled at the wsgi level
except (exception.InvalidGroup, exception.InvalidGroupType,
exception.InvalidVolume, exception.InvalidVolumeType) as error:
raise exc.HTTPBadRequest(explanation=error.msg)
return webob.Response(status_int=202)
@wsgi.Controller.api_version(GROUP_REPLICATION_API_VERSION)
@wsgi.action("disable_replication")
def disable_replication(self, req, id, body):
"""Disables replications for a group."""
context = req.environ['cinder.context']
if body:
if not self.is_valid_body(body, 'disable_replication'):
msg = _("Missing required element 'disable_replication' in "
"request body.")
raise exc.HTTPBadRequest(explanation=msg)
LOG.info('Disable replication group with id: %s.', id,
context=context)
try:
group = self.group_api.get(context, id)
self.group_api.disable_replication(context, group)
# Not found exception will be handled at the wsgi level
except (exception.InvalidGroup, exception.InvalidGroupType,
exception.InvalidVolume, exception.InvalidVolumeType) as error:
raise exc.HTTPBadRequest(explanation=error.msg)
return webob.Response(status_int=202)
@wsgi.Controller.api_version(GROUP_REPLICATION_API_VERSION)
@wsgi.action("failover_replication")
def failover_replication(self, req, id, body):
"""Fails over replications for a group."""
context = req.environ['cinder.context']
if body:
if not self.is_valid_body(body, 'failover_replication'):
msg = _("Missing required element 'failover_replication' in "
"request body.")
raise exc.HTTPBadRequest(explanation=msg)
grp_body = body['failover_replication']
try:
allow_attached = strutils.bool_from_string(
grp_body.get('allow_attached_volume', False),
strict=True)
except ValueError:
msg = (_("Invalid value '%s' for allow_attached_volume flag.")
% grp_body)
raise exc.HTTPBadRequest(explanation=msg)
secondary_backend_id = grp_body.get('secondary_backend_id')
LOG.info('Failover replication group with id: %s.', id,
context=context)
try:
group = self.group_api.get(context, id)
self.group_api.failover_replication(context, group, allow_attached,
secondary_backend_id)
# Not found exception will be handled at the wsgi level
except (exception.InvalidGroup, exception.InvalidGroupType,
exception.InvalidVolume, exception.InvalidVolumeType) as error:
raise exc.HTTPBadRequest(explanation=error.msg)
return webob.Response(status_int=202)
@wsgi.Controller.api_version(GROUP_REPLICATION_API_VERSION)
@wsgi.action("list_replication_targets")
def list_replication_targets(self, req, id, body):
"""List replication targets for a group."""
context = req.environ['cinder.context']
if body:
if not self.is_valid_body(body, 'list_replication_targets'):
msg = _("Missing required element 'list_replication_targets' "
"in request body.")
raise exc.HTTPBadRequest(explanation=msg)
LOG.info('List replication targets for group with id: %s.', id,
context=context)
# Not found exception will be handled at the wsgi level
group = self.group_api.get(context, id)
replication_targets = self.group_api.list_replication_targets(
context, group)
return replication_targets
def create_resource():
return wsgi.Resource(GroupsController())

View File

@ -71,6 +71,11 @@ class ViewBuilder(common.ViewBuilder):
group_ref['group']['volumes'] = [volume.id
for volume in group.volumes]
# Add replication_status if min version is greater than or equal
# to 3.38.
if req_version.matches("3.38", None):
group_ref['group']['replication_status'] = group.replication_status
return group_ref
def _list_view(self, func, request, groups):

View File

@ -5563,6 +5563,16 @@ def _group_snapshot_get_query(context, session=None, project_only=False):
@apply_like_filters(model=models.Group)
def _process_groups_filters(query, filters):
if filters:
# NOTE(xyang): backend_match_level needs to be handled before
# is_valid_model_filters is called as it is not a column name
# in the db.
backend_match_level = filters.pop('backend_match_level', 'backend')
# host is a valid filter. Filter the query by host and
# backend_match_level first.
host = filters.pop('host', None)
if host:
query = query.filter(_filter_host(models.Group.host, host,
match_level=backend_match_level))
# Ensure that filters' keys exist on the model
if not is_valid_model_filters(models.Group, filters):
return
@ -5582,10 +5592,9 @@ def _process_group_snapshot_filters(query, filters):
def _group_get_all(context, filters=None, marker=None, limit=None,
offset=None, sort_keys=None, sort_dirs=None):
if filters and not is_valid_model_filters(models.Group,
filters):
return []
# No need to call is_valid_model_filters here. It is called
# in _process_group_filters when _generate_paginate_query
# is called below.
session = get_session()
with session.begin():
# Generate the paginate query

View File

@ -0,0 +1,28 @@
# Copyright (C) 2017 Dell Inc. or its subsidiaries.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from sqlalchemy import Column
from sqlalchemy import MetaData, String, Table
def upgrade(migrate_engine):
meta = MetaData()
meta.bind = migrate_engine
# Add replication_status column to groups table
table = Table('groups', meta, autoload=True)
if not hasattr(table.c, 'replication_status'):
new_column = Column('replication_status', String(255), nullable=True)
table.create_column(new_column)

View File

@ -193,6 +193,8 @@ class Group(BASE, CinderBase):
group_snapshot_id = Column(String(36))
source_group_id = Column(String(36))
replication_status = Column(String(255))
class Cgsnapshot(BASE, CinderBase):
"""Represents a cgsnapshot."""

View File

@ -846,6 +846,11 @@ class ReplicationError(CinderException):
"error: %(reason)s")
class ReplicationGroupError(CinderException):
message = _("Group %(group_id)s replication "
"error: %(reason)s.")
class ReplicationNotFound(NotFound):
message = _("Volume replication for %(volume_id)s "
"could not be found.")

View File

@ -151,7 +151,8 @@ class API(base.Base):
'name': name,
'description': description,
'volume_type_ids': [t['id'] for t in req_volume_types],
'group_type_id': req_group_type['id']}
'group_type_id': req_group_type['id'],
'replication_status': c_fields.ReplicationStatus.DISABLED}
group = None
try:
group = objects.Group(context=context, **kwargs)
@ -212,6 +213,7 @@ class API(base.Base):
'source_group_id': source_group_id,
'group_type_id': group_type_id,
'volume_type_ids': volume_type_ids,
'replication_status': c_fields.ReplicationStatus.DISABLED
}
group = None
@ -898,3 +900,202 @@ class API(base.Base):
'status': status}
gsnapshot.update(field)
gsnapshot.save()
def _check_type(self, group):
if not vol_utils.is_group_a_replication_group_type(group):
msg = _("Group %s is not a replication group type.") % group.id
LOG.error(msg)
raise exception.InvalidGroupType(reason=msg)
for vol_type in group.volume_types:
if not vol_utils.is_replicated_spec(vol_type.extra_specs):
msg = _("Volume type %s does not have 'replication_enabled' "
"spec key set to '<is> True'.") % vol_type.id
LOG.error(msg)
raise exception.InvalidVolumeType(reason=msg)
# Replication group API (Tiramisu)
@wrap_check_policy
def enable_replication(self, context, group):
self._check_type(group)
valid_status = [c_fields.GroupStatus.AVAILABLE]
if group.status not in valid_status:
params = {'valid': valid_status,
'current': group.status,
'id': group.id}
msg = _("Group %(id)s status must be %(valid)s, "
"but current status is: %(current)s. "
"Cannot enable replication.") % params
LOG.error(msg)
raise exception.InvalidGroup(reason=msg)
valid_rep_status = [c_fields.ReplicationStatus.DISABLED,
c_fields.ReplicationStatus.ENABLED]
if group.replication_status not in valid_rep_status:
params = {'valid': valid_rep_status,
'current': group.replication_status,
'id': group.id}
msg = _("Group %(id)s replication status must be %(valid)s, "
"but current status is: %(current)s. "
"Cannot enable replication.") % params
LOG.error(msg)
raise exception.InvalidGroup(reason=msg)
volumes = objects.VolumeList.get_all_by_generic_group(
context.elevated(), group.id)
valid_status = ['available', 'in-use']
for vol in volumes:
if vol.status not in valid_status:
params = {'valid': valid_status,
'current': vol.status,
'id': vol.id}
msg = _("Volume %(id)s status must be %(valid)s, "
"but current status is: %(current)s. "
"Cannot enable replication.") % params
LOG.error(msg)
raise exception.InvalidVolume(reason=msg)
# replication_status could be set to enabled when volume is
# created and the mirror is built.
if vol.replication_status not in valid_rep_status:
params = {'valid': valid_rep_status,
'current': vol.replication_status,
'id': vol.id}
msg = _("Volume %(id)s replication status must be %(valid)s, "
"but current status is: %(current)s. "
"Cannot enable replication.") % params
LOG.error(msg)
raise exception.InvalidVolume(reason=msg)
vol.replication_status = c_fields.ReplicationStatus.ENABLING
vol.save()
group.replication_status = c_fields.ReplicationStatus.ENABLING
group.save()
self.volume_rpcapi.enable_replication(context, group)
@wrap_check_policy
def disable_replication(self, context, group):
self._check_type(group)
valid_status = [c_fields.GroupStatus.AVAILABLE,
c_fields.GroupStatus.ERROR]
if group.status not in valid_status:
params = {'valid': valid_status,
'current': group.status,
'id': group.id}
msg = _("Group %(id)s status must be %(valid)s, "
"but current status is: %(current)s. "
"Cannot disable replication.") % params
LOG.error(msg)
raise exception.InvalidGroup(reason=msg)
valid_rep_status = [c_fields.ReplicationStatus.ENABLED,
c_fields.ReplicationStatus.ERROR]
if group.replication_status not in valid_rep_status:
params = {'valid': valid_rep_status,
'current': group.replication_status,
'id': group.id}
msg = _("Group %(id)s replication status must be %(valid)s, "
"but current status is: %(current)s. "
"Cannot disable replication.") % params
LOG.error(msg)
raise exception.InvalidGroup(reason=msg)
volumes = objects.VolumeList.get_all_by_generic_group(
context.elevated(), group.id)
for vol in volumes:
if vol.replication_status not in valid_rep_status:
params = {'valid': valid_rep_status,
'current': vol.replication_status,
'id': vol.id}
msg = _("Volume %(id)s replication status must be %(valid)s, "
"but current status is: %(current)s. "
"Cannot disable replication.") % params
LOG.error(msg)
raise exception.InvalidVolume(reason=msg)
vol.replication_status = c_fields.ReplicationStatus.DISABLING
vol.save()
group.replication_status = c_fields.ReplicationStatus.DISABLING
group.save()
self.volume_rpcapi.disable_replication(context, group)
@wrap_check_policy
def failover_replication(self, context, group,
allow_attached_volume=False,
secondary_backend_id=None):
self._check_type(group)
valid_status = [c_fields.GroupStatus.AVAILABLE]
if group.status not in valid_status:
params = {'valid': valid_status,
'current': group.status,
'id': group.id}
msg = _("Group %(id)s status must be %(valid)s, "
"but current status is: %(current)s. "
"Cannot failover replication.") % params
LOG.error(msg)
raise exception.InvalidGroup(reason=msg)
valid_rep_status = [c_fields.ReplicationStatus.ENABLED,
c_fields.ReplicationStatus.FAILED_OVER]
if group.replication_status not in valid_rep_status:
params = {'valid': valid_rep_status,
'current': group.replication_status,
'id': group.id}
msg = _("Group %(id)s replication status must be %(valid)s, "
"but current status is: %(current)s. "
"Cannot failover replication.") % params
LOG.error(msg)
raise exception.InvalidGroup(reason=msg)
volumes = objects.VolumeList.get_all_by_generic_group(
context.elevated(), group.id)
valid_status = ['available', 'in-use']
for vol in volumes:
if vol.status not in valid_status:
params = {'valid': valid_status,
'current': vol.status,
'id': vol.id}
msg = _("Volume %(id)s status must be %(valid)s, "
"but current status is: %(current)s. "
"Cannot failover replication.") % params
LOG.error(msg)
raise exception.InvalidVolume(reason=msg)
if vol.status == 'in-use' and not allow_attached_volume:
msg = _("Volume %s is attached but allow_attached_volume flag "
"is False. Cannot failover replication.") % vol.id
LOG.error(msg)
raise exception.InvalidVolume(reason=msg)
if vol.replication_status not in valid_rep_status:
params = {'valid': valid_rep_status,
'current': vol.replication_status,
'id': vol.id}
msg = _("Volume %(id)s replication status must be %(valid)s, "
"but current status is: %(current)s. "
"Cannot failover replication.") % params
LOG.error(msg)
raise exception.InvalidVolume(reason=msg)
vol.replication_status = c_fields.ReplicationStatus.FAILING_OVER
vol.save()
group.replication_status = c_fields.ReplicationStatus.FAILING_OVER
group.save()
self.volume_rpcapi.failover_replication(context, group,
allow_attached_volume,
secondary_backend_id)
@wrap_check_policy
def list_replication_targets(self, context, group):
self._check_type(group)
return self.volume_rpcapi.list_replication_targets(context, group)

View File

@ -132,6 +132,7 @@ OBJ_VERSIONS.add('1.21', {'ManageableSnapshot': '1.0',
OBJ_VERSIONS.add('1.22', {'Snapshot': '1.4'})
OBJ_VERSIONS.add('1.23', {'VolumeAttachment': '1.2'})
OBJ_VERSIONS.add('1.24', {'LogLevel': '1.0', 'LogLevelList': '1.0'})
OBJ_VERSIONS.add('1.25', {'Group': '1.2'})
class CinderObjectRegistry(base.VersionedObjectRegistry):

View File

@ -105,9 +105,11 @@ class ReplicationStatus(BaseCinderEnum):
FAILING_OVER = 'failing-over'
FAILOVER_ERROR = 'failover-error'
FAILED_OVER = 'failed-over'
ENABLING = 'enabling'
DISABLING = 'disabling'
ALL = (ERROR, ENABLED, DISABLED, NOT_CAPABLE, FAILOVER_ERROR, FAILING_OVER,
FAILED_OVER)
FAILED_OVER, ENABLING, DISABLING)
class ReplicationStatusField(BaseEnumField):

View File

@ -29,7 +29,8 @@ class Group(base.CinderPersistentObject, base.CinderObject,
# Version 1.0: Initial version
# Version 1.1: Added group_snapshots, group_snapshot_id, and
# source_group_id
VERSION = '1.1'
# Version 1.2: Added replication_status
VERSION = '1.2'
OPTIONAL_FIELDS = ['volumes', 'volume_types', 'group_snapshots']
@ -47,6 +48,7 @@ class Group(base.CinderPersistentObject, base.CinderObject,
'status': c_fields.GroupStatusField(nullable=True),
'group_snapshot_id': fields.UUIDField(nullable=True),
'source_group_id': fields.UUIDField(nullable=True),
'replication_status': c_fields.ReplicationStatusField(nullable=True),
'volumes': fields.ObjectField('VolumeList', nullable=True),
'volume_types': fields.ObjectField('VolumeTypeList',
nullable=True),
@ -62,6 +64,8 @@ class Group(base.CinderPersistentObject, base.CinderObject,
for key in ('group_snapshot_id', 'source_group_id',
'group_snapshots'):
primitive.pop(key, None)
if target_version < (1, 2):
primitive.pop('replication_status', None)
@staticmethod
def _from_db_object(context, group, db_group,

View File

@ -21,6 +21,7 @@ from cinder.tests.unit.brick import fake_lvm
from cinder import utils
from cinder.volume import driver
from cinder.volume.drivers import lvm
from cinder.volume import utils as vol_utils
from cinder.zonemanager import utils as fczm_utils
@ -44,7 +45,20 @@ class FakeLoggingVolumeDriver(lvm.LVMVolumeDriver):
@utils.trace_method
def create_volume(self, volume):
pass
"""Creates a volume."""
super(FakeLoggingVolumeDriver, self).create_volume(volume)
model_update = {}
try:
if (volume.volume_type and volume.volume_type.extra_specs and
vol_utils.is_replicated_spec(
volume.volume_type.extra_specs)):
# Sets the new volume's replication_status to disabled
model_update['replication_status'] = (
fields.ReplicationStatus.DISABLED)
except exception.VolumeTypeNotFound:
pass
if model_update:
return model_update
@utils.trace_method
def delete_volume(self, volume):
@ -122,6 +136,68 @@ class FakeLoggingVolumeDriver(lvm.LVMVolumeDriver):
def terminate_connection(self, volume, connector, **kwargs):
pass
# Replication Group (Tiramisu)
@utils.trace_method
def enable_replication(self, context, group, volumes):
"""Enables replication for a group and volumes in the group."""
model_update = {
'replication_status': fields.ReplicationStatus.ENABLED}
volume_model_updates = []
for volume_ref in volumes:
volume_model_update = {'id': volume_ref.id}
volume_model_update['replication_status'] = (
fields.ReplicationStatus.ENABLED)
volume_model_updates.append(volume_model_update)
return model_update, volume_model_updates
# Replication Group (Tiramisu)
@utils.trace_method
def disable_replication(self, context, group, volumes):
"""Disables replication for a group and volumes in the group."""
model_update = {
'replication_status': fields.ReplicationStatus.DISABLED}
volume_model_updates = []
for volume_ref in volumes:
volume_model_update = {'id': volume_ref.id}
volume_model_update['replication_status'] = (
fields.ReplicationStatus.DISABLED)
volume_model_updates.append(volume_model_update)
return model_update, volume_model_updates
# Replication Group (Tiramisu)
@utils.trace_method
def failover_replication(self, context, group, volumes,
secondary_backend_id=None):
"""Fails over replication for a group and volumes in the group."""
model_update = {
'replication_status': fields.ReplicationStatus.FAILED_OVER}
volume_model_updates = []
for volume_ref in volumes:
volume_model_update = {'id': volume_ref.id}
volume_model_update['replication_status'] = (
fields.ReplicationStatus.FAILED_OVER)
volume_model_updates.append(volume_model_update)
return model_update, volume_model_updates
# Replication Group (Tiramisu)
@utils.trace_method
def create_group(self, context, group):
"""Creates a group."""
model_update = super(FakeLoggingVolumeDriver, self).create_group(
context, group)
try:
if vol_utils.is_group_a_replication_group_type(group):
# Sets the new group's replication_status to disabled
model_update['replication_status'] = (
fields.ReplicationStatus.DISABLED)
except exception.GroupTypeNotFound:
pass
return model_update
def _update_volume_stats(self):
data = {'volume_backend_name': self.backend_name,
'vendor_name': 'Open Source',
@ -138,7 +214,8 @@ class FakeLoggingVolumeDriver(lvm.LVMVolumeDriver):
'filter_function': self.get_filter_function(),
'goodness_function': self.get_goodness_function(),
'consistencygroup_support': False,
'replication_enabled': False}
'replication_enabled': True,
'group_replication_enabled': True, }
data['pools'].append(fake_pool)
self._stats = data
@ -218,7 +295,6 @@ class FakeGateDriver(lvm.LVMVolumeDriver):
def _update_volume_stats(self):
super(FakeGateDriver, self)._update_volume_stats()
self._stats["pools"][0]["consistencygroup_support"] = True
self._stats["pools"][0]["replication_enabled"] = True
# NOTE(xyang): Consistency Group functions implemented below
# are for testing purpose only. Data consistency cannot be

View File

@ -38,6 +38,8 @@ from cinder.volume import api as volume_api
GROUP_MICRO_VERSION = '3.13'
GROUP_FROM_SRC_MICRO_VERSION = '3.14'
GROUP_REPLICATION_MICRO_VERSION = '3.38'
INVALID_GROUP_REPLICATION_MICRO_VERSION = '3.37'
@ddt.ddt
@ -75,6 +77,7 @@ class GroupsAPITestCase(test.TestCase):
availability_zone='az1',
host='fakehost',
status=fields.GroupStatus.CREATING,
replication_status=fields.ReplicationStatus.DISABLED,
**kwargs):
"""Create a group object."""
ctxt = ctxt or self.ctxt
@ -88,6 +91,7 @@ class GroupsAPITestCase(test.TestCase):
group.volume_type_ids = volume_type_ids
group.host = host
group.status = status
group.replication_status = replication_status
group.update(kwargs)
group.create()
return group
@ -1049,3 +1053,244 @@ class GroupsAPITestCase(test.TestCase):
grp.destroy()
volume.destroy()
source_grp.destroy()
@mock.patch('cinder.volume.utils.is_replicated_spec',
return_value=True)
@mock.patch('cinder.volume.utils.is_group_a_replication_group_type',
return_value=True)
def test_enable_replication(self, mock_rep_grp_type, mock_rep_vol_type):
req = fakes.HTTPRequest.blank('/v3/%s/groups/%s/action' %
(fake.PROJECT_ID, self.group3.id),
version=GROUP_REPLICATION_MICRO_VERSION)
self.group3.status = fields.GroupStatus.AVAILABLE
self.group3.save()
body = {"enable_replication": {}}
response = self.controller.enable_replication(req,
self.group3.id, body)
group = objects.Group.get_by_id(self.ctxt, self.group3.id)
self.assertEqual(202, response.status_int)
self.assertEqual(fields.GroupStatus.AVAILABLE, group.status)
self.assertEqual(fields.ReplicationStatus.ENABLING,
group.replication_status)
@ddt.data((True, False), (False, True), (False, False))
@ddt.unpack
@mock.patch('cinder.volume.utils.is_replicated_spec')
@mock.patch('cinder.volume.utils.is_group_a_replication_group_type')
def test_enable_replication_wrong_type(self, is_grp_rep_type,
is_vol_rep_type,
mock_rep_grp_type,
mock_rep_vol_type):
mock_rep_grp_type.return_value = is_grp_rep_type
mock_rep_vol_type.return_value = is_vol_rep_type
req = fakes.HTTPRequest.blank('/v3/%s/groups/%s/action' %
(fake.PROJECT_ID, self.group3.id),
version=GROUP_REPLICATION_MICRO_VERSION)
self.group3.status = fields.GroupStatus.AVAILABLE
self.group3.save()
body = {"enable_replication": {}}
self.assertRaises(webob.exc.HTTPBadRequest,
self.controller.enable_replication,
req, self.group3.id, body)
@mock.patch('cinder.volume.utils.is_replicated_spec',
return_value=False)
@mock.patch('cinder.volume.utils.is_group_a_replication_group_type',
return_value=True)
def test_enable_replication_wrong_group_type(self, mock_rep_grp_type,
mock_rep_vol_type):
req = fakes.HTTPRequest.blank('/v3/%s/groups/%s/action' %
(fake.PROJECT_ID, self.group3.id),
version=GROUP_REPLICATION_MICRO_VERSION)
self.group3.status = fields.GroupStatus.AVAILABLE
self.group3.save()
body = {"enable_replication": {}}
self.assertRaises(webob.exc.HTTPBadRequest,
self.controller.enable_replication,
req, self.group3.id, body)
@mock.patch('cinder.volume.utils.is_replicated_spec',
return_value=True)
@mock.patch('cinder.volume.utils.is_group_a_replication_group_type',
return_value=True)
@ddt.data((GROUP_REPLICATION_MICRO_VERSION, True,
fields.GroupStatus.CREATING,
webob.exc.HTTPBadRequest),
(GROUP_REPLICATION_MICRO_VERSION, False,
fields.GroupStatus.AVAILABLE,
exception.GroupNotFound),
(INVALID_GROUP_REPLICATION_MICRO_VERSION, True,
fields.GroupStatus.AVAILABLE,
exception.VersionNotFoundForAPIMethod),
)
@ddt.unpack
def test_enable_replication_negative(self, version, not_fake,
status, exceptions,
mock_rep_grp_type, mock_rep_vol_type):
if not_fake:
group_id = self.group3.id
else:
group_id = fake.GROUP_ID
req = fakes.HTTPRequest.blank('/v3/%s/groups/%s/action' %
(fake.PROJECT_ID, group_id),
version=version)
if not_fake:
self.group3.status = status
self.group3.save()
body = {"enable_replication": {}}
self.assertRaises(exceptions,
self.controller.enable_replication,
req, group_id, body)
@mock.patch('cinder.volume.utils.is_replicated_spec',
return_value=True)
@mock.patch('cinder.volume.utils.is_group_a_replication_group_type',
return_value=True)
def test_disable_replication(self, mock_rep_grp_type, mock_rep_vol_type):
req = fakes.HTTPRequest.blank('/v3/%s/groups/%s/action' %
(fake.PROJECT_ID, self.group3.id),
version=GROUP_REPLICATION_MICRO_VERSION)
self.group3.status = fields.GroupStatus.AVAILABLE
self.group3.replication_status = fields.ReplicationStatus.ENABLED
self.group3.save()
body = {"disable_replication": {}}
response = self.controller.disable_replication(req,
self.group3.id, body)
group = objects.Group.get_by_id(self.ctxt, self.group3.id)
self.assertEqual(202, response.status_int)
self.assertEqual(fields.GroupStatus.AVAILABLE, group.status)
self.assertEqual(fields.ReplicationStatus.DISABLING,
group.replication_status)
@mock.patch('cinder.volume.utils.is_replicated_spec',
return_value=True)
@mock.patch('cinder.volume.utils.is_group_a_replication_group_type',
return_value=True)
@ddt.data((GROUP_REPLICATION_MICRO_VERSION, True,
fields.GroupStatus.CREATING,
fields.ReplicationStatus.ENABLED,
webob.exc.HTTPBadRequest),
(GROUP_REPLICATION_MICRO_VERSION, True,
fields.GroupStatus.AVAILABLE,
fields.ReplicationStatus.DISABLED,
webob.exc.HTTPBadRequest),
(GROUP_REPLICATION_MICRO_VERSION, False,
fields.GroupStatus.AVAILABLE,
fields.ReplicationStatus.DISABLED,
exception.GroupNotFound),
(INVALID_GROUP_REPLICATION_MICRO_VERSION, True,
fields.GroupStatus.AVAILABLE,
fields.ReplicationStatus.ENABLED,
exception.VersionNotFoundForAPIMethod),
)
@ddt.unpack
def test_disable_replication_negative(self, version, not_fake,
status, rep_status, exceptions,
mock_rep_grp_type,
mock_rep_vol_type):
if not_fake:
group_id = self.group3.id
else:
group_id = fake.GROUP_ID
req = fakes.HTTPRequest.blank('/v3/%s/groups/%s/action' %
(fake.PROJECT_ID, group_id),
version=version)
if not_fake:
self.group3.status = status
self.group3.replication_status = rep_status
self.group3.save()
body = {"disable_replication": {}}
self.assertRaises(exceptions,
self.controller.disable_replication,
req, group_id, body)
@mock.patch('cinder.volume.utils.is_replicated_spec',
return_value=True)
@mock.patch('cinder.volume.utils.is_group_a_replication_group_type',
return_value=True)
def test_failover_replication(self, mock_rep_grp_type, mock_rep_vol_type):
req = fakes.HTTPRequest.blank('/v3/%s/groups/%s/action' %
(fake.PROJECT_ID, self.group3.id),
version=GROUP_REPLICATION_MICRO_VERSION)
self.group3.status = fields.GroupStatus.AVAILABLE
self.group3.replication_status = fields.ReplicationStatus.ENABLED
self.group3.save()
body = {"failover_replication": {}}
response = self.controller.failover_replication(req,
self.group3.id, body)
group = objects.Group.get_by_id(self.ctxt, self.group3.id)
self.assertEqual(202, response.status_int)
self.assertEqual(fields.GroupStatus.AVAILABLE, group.status)
self.assertEqual(fields.ReplicationStatus.FAILING_OVER,
group.replication_status)
@mock.patch('cinder.volume.utils.is_replicated_spec',
return_value=True)
@mock.patch('cinder.volume.utils.is_group_a_replication_group_type',
return_value=True)
@ddt.data((GROUP_REPLICATION_MICRO_VERSION, True,
fields.GroupStatus.CREATING,
fields.ReplicationStatus.ENABLED,
webob.exc.HTTPBadRequest),
(GROUP_REPLICATION_MICRO_VERSION, True,
fields.GroupStatus.AVAILABLE,
fields.ReplicationStatus.DISABLED,
webob.exc.HTTPBadRequest),
(GROUP_REPLICATION_MICRO_VERSION, False,
fields.GroupStatus.AVAILABLE,
fields.ReplicationStatus.DISABLED,
exception.GroupNotFound),
(INVALID_GROUP_REPLICATION_MICRO_VERSION, True,
fields.GroupStatus.AVAILABLE,
fields.ReplicationStatus.ENABLED,
exception.VersionNotFoundForAPIMethod),
)
@ddt.unpack
def test_failover_replication_negative(self, version, not_fake,
status, rep_status, exceptions,
mock_rep_grp_type,
mock_rep_vol_type):
if not_fake:
group_id = self.group3.id
else:
group_id = fake.GROUP_ID
req = fakes.HTTPRequest.blank('/v3/%s/groups/%s/action' %
(fake.PROJECT_ID, group_id),
version=version)
if not_fake:
self.group3.status = status
self.group3.replication_status = rep_status
self.group3.save()
body = {"failover_replication": {}}
self.assertRaises(exceptions,
self.controller.failover_replication,
req, group_id, body)
@mock.patch('cinder.volume.utils.is_replicated_spec',
return_value=True)
@mock.patch('cinder.volume.utils.is_group_a_replication_group_type',
return_value=True)
@mock.patch('cinder.volume.rpcapi.VolumeAPI.list_replication_targets')
def test_list_replication_targets(self, mock_list_rep_targets,
mock_rep_grp_type, mock_rep_vol_type):
req = fakes.HTTPRequest.blank('/v3/%s/groups/%s/action' %
(fake.PROJECT_ID, self.group3.id),
version=GROUP_REPLICATION_MICRO_VERSION)
targets = {
'replication_targets': [
{'backend_id': 'lvm_backend_1'}
]
}
mock_list_rep_targets.return_value = targets
self.group3.status = fields.GroupStatus.AVAILABLE
self.group3.save()
body = {"list_replication_targets": {}}
response = self.controller.list_replication_targets(
req, self.group3.id, body)
self.assertIn('replication_targets', response)
self.assertEqual('lvm_backend_1',
response['replication_targets'][0]['backend_id'])

View File

@ -0,0 +1,133 @@
# Copyright (C) 2017 Dell Inc. or its subsidiaries.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import ddt
import mock
from oslo_config import cfg
from oslo_utils import importutils
from cinder import context
from cinder import exception
from cinder import objects
from cinder.objects import fields
from cinder import quota
from cinder import test
from cinder.tests.unit import fake_constants as fake
from cinder.tests.unit import utils as tests_utils
from cinder.volume import api as volume_api
from cinder.volume import configuration as conf
from cinder.volume import driver
from cinder.volume import utils as volutils
GROUP_QUOTAS = quota.GROUP_QUOTAS
CONF = cfg.CONF
@ddt.ddt
class GroupManagerTestCase(test.TestCase):
def setUp(self):
super(GroupManagerTestCase, self).setUp()
self.volume = importutils.import_object(CONF.volume_manager)
self.configuration = mock.Mock(conf.Configuration)
self.context = context.get_admin_context()
self.context.user_id = fake.USER_ID
self.project_id = fake.PROJECT3_ID
self.context.project_id = self.project_id
self.volume.driver.set_initialized()
self.volume.stats = {'allocated_capacity_gb': 0,
'pools': {}}
self.volume_api = volume_api.API()
@mock.patch.object(GROUP_QUOTAS, "reserve",
return_value=["RESERVATION"])
@mock.patch.object(GROUP_QUOTAS, "commit")
@mock.patch.object(GROUP_QUOTAS, "rollback")
@mock.patch.object(driver.VolumeDriver,
"delete_group",
return_value=({'status': (
fields.GroupStatus.DELETED)}, []))
@mock.patch.object(driver.VolumeDriver,
"enable_replication",
return_value=(None, []))
@mock.patch.object(driver.VolumeDriver,
"disable_replication",
return_value=(None, []))
@mock.patch.object(driver.VolumeDriver,
"failover_replication",
return_value=(None, []))
def test_replication_group(self, fake_failover_rep, fake_disable_rep,
fake_enable_rep, fake_delete_grp,
fake_rollback, fake_commit, fake_reserve):
"""Test enable, disable, and failover replication for group."""
def fake_driver_create_grp(context, group):
"""Make sure that the pool is part of the host."""
self.assertIn('host', group)
host = group.host
pool = volutils.extract_host(host, level='pool')
self.assertEqual('fakepool', pool)
return {'status': fields.GroupStatus.AVAILABLE,
'replication_status': fields.ReplicationStatus.DISABLING}
self.mock_object(self.volume.driver, 'create_group',
fake_driver_create_grp)
group = tests_utils.create_group(
self.context,
availability_zone=CONF.storage_availability_zone,
volume_type_ids=[fake.VOLUME_TYPE_ID],
host='fakehost@fakedrv#fakepool',
group_type_id=fake.GROUP_TYPE_ID)
group = objects.Group.get_by_id(self.context, group.id)
self.volume.create_group(self.context, group)
self.assertEqual(
group.id,
objects.Group.get_by_id(context.get_admin_context(),
group.id).id)
self.volume.disable_replication(self.context, group)
group = objects.Group.get_by_id(
context.get_admin_context(), group.id)
self.assertEqual(fields.ReplicationStatus.DISABLED,
group.replication_status)
group.replication_status = fields.ReplicationStatus.ENABLING
group.save()
self.volume.enable_replication(self.context, group)
group = objects.Group.get_by_id(
context.get_admin_context(), group.id)
self.assertEqual(fields.ReplicationStatus.ENABLED,
group.replication_status)
group.replication_status = fields.ReplicationStatus.FAILING_OVER
group.save()
self.volume.failover_replication(self.context, group)
group = objects.Group.get_by_id(
context.get_admin_context(), group.id)
self.assertEqual(fields.ReplicationStatus.FAILED_OVER,
group.replication_status)
targets = self.volume.list_replication_targets(self.context, group)
self.assertIn('replication_targets', targets)
self.volume.delete_group(self.context, group)
grp = objects.Group.get_by_id(
context.get_admin_context(read_deleted='yes'), group.id)
self.assertEqual(fields.GroupStatus.DELETED, grp.status)
self.assertRaises(exception.NotFound,
objects.Group.get_by_id,
self.context,
group.id)

View File

@ -28,7 +28,7 @@ object_data = {
'BackupImport': '1.4-c50f7a68bb4c400dd53dd219685b3992',
'BackupList': '1.0-15ecf022a68ddbb8c2a6739cfc9f8f5e',
'CleanupRequest': '1.0-e7c688b893e1d5537ccf65cc3eb10a28',
'Cluster': '1.1-cdb1572b250837933d950cc6662313b8',
'Cluster': '1.1-e2c533eb8cdd8d229b6c45c6cf3a9e2c',
'ClusterList': '1.0-15ecf022a68ddbb8c2a6739cfc9f8f5e',
'CGSnapshot': '1.1-3212ac2b4c2811b7134fb9ba2c49ff74',
'CGSnapshotList': '1.0-15ecf022a68ddbb8c2a6739cfc9f8f5e',
@ -43,7 +43,7 @@ object_data = {
'QualityOfServiceSpecs': '1.0-0b212e0a86ee99092229874e03207fe8',
'QualityOfServiceSpecsList': '1.0-15ecf022a68ddbb8c2a6739cfc9f8f5e',
'RequestSpec': '1.1-b0bd1a28d191d75648901fa853e8a733',
'Service': '1.4-c7d011989d1718ca0496ccf640b42712',
'Service': '1.4-a6727ccda6d4043f5e38e75c7c518c7f',
'ServiceList': '1.1-15ecf022a68ddbb8c2a6739cfc9f8f5e',
'Snapshot': '1.4-b7aa184837ccff570b8443bfd1773017',
'SnapshotList': '1.0-15ecf022a68ddbb8c2a6739cfc9f8f5e',
@ -56,7 +56,7 @@ object_data = {
'VolumeTypeList': '1.1-15ecf022a68ddbb8c2a6739cfc9f8f5e',
'GroupType': '1.0-d4a7b272199d0b0d6fc3ceed58539d30',
'GroupTypeList': '1.0-15ecf022a68ddbb8c2a6739cfc9f8f5e',
'Group': '1.1-bd853b1d1ee05949d9ce4b33f80ac1a0',
'Group': '1.2-2ade6acf2e55687b980048fc3f51dad9',
'GroupList': '1.0-15ecf022a68ddbb8c2a6739cfc9f8f5e',
'GroupSnapshot': '1.0-9af3e994e889cbeae4427c3e351fa91d',
'GroupSnapshotList': '1.0-15ecf022a68ddbb8c2a6739cfc9f8f5e',

View File

@ -136,6 +136,10 @@
"group:get_all_group_snapshots": "",
"group:reset_group_snapshot_status":"",
"group:reset_status":"",
"group:enable_replication": "",
"group:disable_replication": "",
"group:failover_replication": "",
"group:list_replication_targets": "",
"scheduler_extension:scheduler_stats:get_pools" : "rule:admin_api",

View File

@ -3057,3 +3057,58 @@ class DBAPIBackendTestCase(BaseTest):
cluster += '#poolname'
self.assertEqual(frozen,
db.is_backend_frozen(self.ctxt, host, cluster))
class DBAPIGroupTestCase(BaseTest):
def test_group_get_all_by_host(self):
grp_type = db.group_type_create(self.ctxt, {'name': 'my_group_type'})
groups = []
backend = 'host1@lvm'
for i in range(3):
groups.append([db.group_create(
self.ctxt,
{'host': '%(b)s%(n)d' % {'b': backend, 'n': i},
'group_type_id': grp_type['id']})
for j in range(3)])
for i in range(3):
host = '%(b)s%(n)d' % {'b': backend, 'n': i}
filters = {'host': host, 'backend_match_level': 'backend'}
grps = db.group_get_all(
self.ctxt, filters=filters)
self._assertEqualListsOfObjects(groups[i], grps)
for grp in grps:
db.group_destroy(self.ctxt, grp['id'])
db.group_type_destroy(self.ctxt, grp_type['id'])
def test_group_get_all_by_host_with_pools(self):
grp_type = db.group_type_create(self.ctxt, {'name': 'my_group_type'})
groups = []
backend = 'host1@lvm'
pool = '%s#pool1' % backend
grp_on_host_wo_pool = [db.group_create(
self.ctxt,
{'host': backend,
'group_type_id': grp_type['id']})
for j in range(3)]
grp_on_host_w_pool = [db.group_create(
self.ctxt,
{'host': pool,
'group_type_id': grp_type['id']})]
groups.append(grp_on_host_wo_pool + grp_on_host_w_pool)
# insert an additional record that doesn't belongs to the same
# host as 'foo' and test if it is included in the result
grp_foobar = db.group_create(self.ctxt,
{'host': '%sfoo' % backend,
'group_type_id': grp_type['id']})
filters = {'host': backend, 'backend_match_level': 'backend'}
grps = db.group_get_all(self.ctxt, filters=filters)
self._assertEqualListsOfObjects(groups[0], grps)
for grp in grps:
db.group_destroy(self.ctxt, grp['id'])
db.group_destroy(self.ctxt, grp_foobar['id'])
db.group_type_destroy(self.ctxt, grp_type['id'])

View File

@ -1238,6 +1238,12 @@ class MigrationsMixin(test_migrations.WalkVersionsMixin):
self.assertEqual(data[volume.id], volume.replication_status,
'id %s' % volume.id)
def _check_102(self, engine, data):
"""Test adding replication_status to groups table."""
groups = db_utils.get_table(engine, 'groups')
self.assertIsInstance(groups.c.replication_status.type,
self.VARCHAR_TYPE)
def test_walk_versions(self):
self.walk_versions(False, False)
self.assert_each_foreign_key_is_part_of_an_index()

View File

@ -629,3 +629,29 @@ class VolumeRPCAPITestCase(test.RPCAPITestCase):
expected_kwargs_diff={
'snapshot_id': self.fake_snapshot.id},
version='3.13')
def test_enable_replication(self):
self._test_rpc_api('enable_replication', rpc_method='cast',
server=self.fake_group.host,
group=self.fake_group,
version='3.14')
def test_disable_replication(self):
self._test_rpc_api('disable_replication', rpc_method='cast',
server=self.fake_group.host,
group=self.fake_group,
version='3.14')
def test_failover_replication(self):
self._test_rpc_api('failover_replication', rpc_method='cast',
server=self.fake_group.host,
group=self.fake_group,
allow_attached_volume=False,
secondary_backend_id=None,
version='3.14')
def test_list_replication_targets(self):
self._test_rpc_api('list_replication_targets', rpc_method='call',
server=self.fake_group.host,
group=self.fake_group,
version='3.14')

View File

@ -1505,6 +1505,55 @@ class BaseVD(object):
method = getattr(cls, method_name)
return method.__module__ == getattr(BaseVD, method_name).__module__
# Replication Group (Tiramisu)
def enable_replication(self, context, group, volumes):
"""Enables replication for a group and volumes in the group.
:param group: group object
:param volumes: list of volume objects in the group
:returns: model_update - dict of group updates
:returns: volume_model_updates - list of dicts of volume updates
"""
raise NotImplementedError()
# Replication Group (Tiramisu)
def disable_replication(self, context, group, volumes):
"""Disables replication for a group and volumes in the group.
:param group: group object
:param volumes: list of volume objects in the group
:returns: model_update - dict of group updates
:returns: volume_model_updates - list of dicts of volume updates
"""
raise NotImplementedError()
# Replication Group (Tiramisu)
def failover_replication(self, context, group, volumes,
secondary_backend_id=None):
"""Fails over replication for a group and volumes in the group.
:param group: group object
:param volumes: list of volume objects in the group
:param secondary_backend_id: backend_id of the secondary site
:returns: model_update - dict of group updates
:returns: volume_model_updates - list of dicts of volume updates
"""
raise NotImplementedError()
def get_replication_error_status(self, context, groups):
"""Returns error info for replicated groups and its volumes.
:returns: group_model_updates - list of dicts of group updates
if error happens. For example, a dict of a group can be as follows:
{'group_id': xxxx,
'replication_status': fields.ReplicationStatus.ERROR}
:returns: volume_model_updates - list of dicts of volume updates
if error happens. For example, a dict of a volume can be as follows:
{'volume_id': xxxx,
'replication_status': fields.ReplicationStatus.ERROR}
"""
return [], []
@classmethod
def supports_replication_feature(cls, feature):
"""Check if driver class supports replication features.

View File

@ -2172,6 +2172,49 @@ class VolumeManager(manager.CleanableManager,
if self.extra_capabilities:
volume_stats.update(self.extra_capabilities)
if volume_stats:
# NOTE(xyang): If driver reports replication_status to be
# 'error' in volume_stats, get model updates from driver
# and update db
if volume_stats.get('replication_status') == (
fields.ReplicationStatus.ERROR):
backend = vol_utils.extract_host(self.host, 'backend')
groups = vol_utils.get_replication_groups_by_host(
context, backend)
group_model_updates, volume_model_updates = (
self.driver.get_replication_error_status(context,
groups))
for grp_update in group_model_updates:
try:
grp_obj = objects.Group.get_by_id(
context, grp_update['group_id'])
grp_obj.update(grp_update)
grp_obj.save()
except exception.GroupNotFound:
# Group may be deleted already. Log a warning
# and continue.
LOG.warning("Group %(grp)s not found while "
"updating driver status.",
{'grp': grp_update['group_id']},
resource={
'type': 'group',
'id': grp_update['group_id']})
for vol_update in volume_model_updates:
try:
vol_obj = objects.Volume.get_by_id(
context, vol_update['volume_id'])
vol_obj.update(vol_update)
vol_obj.save()
except exception.VolumeNotFound:
# Volume may be deleted already. Log a warning
# and continue.
LOG.warning("Volume %(vol)s not found while "
"updating driver status.",
{'vol': vol_update['volume_id']},
resource={
'type': 'volume',
'id': vol_update['volume_id']})
# Append volume stats with 'allocated_capacity_gb'
self._append_volume_stats(volume_stats)
@ -4182,3 +4225,332 @@ class VolumeManager(manager.CleanableManager,
'attached_mode')
self._notify_about_volume_usage(context, vref, "detach.end")
return has_shared_connection
# Replication group API (Tiramisu)
def enable_replication(self, ctxt, group):
"""Enable replication."""
group.refresh()
if group.replication_status != fields.ReplicationStatus.ENABLING:
msg = _("Replication status in group %s is not "
"enabling. Cannot enable replication.") % group.id
LOG.error(msg)
raise exception.InvalidGroup(reason=msg)
volumes = group.volumes
for vol in volumes:
vol.refresh()
if vol.replication_status != fields.ReplicationStatus.ENABLING:
msg = _("Replication status in volume %s is not "
"enabling. Cannot enable replication.") % vol.id
LOG.error(msg)
raise exception.InvalidVolume(reason=msg)
self._notify_about_group_usage(
ctxt, group, "enable_replication.start")
volumes_model_update = None
model_update = None
try:
utils.require_driver_initialized(self.driver)
model_update, volumes_model_update = (
self.driver.enable_replication(ctxt, group, volumes))
if volumes_model_update:
for update in volumes_model_update:
vol_obj = objects.Volume.get_by_id(ctxt, update['id'])
vol_obj.update(update)
vol_obj.save()
# If we failed to enable a volume, make sure the status
# for the group is set to error as well
if (update.get('replication_status') ==
fields.ReplicationStatus.ERROR and
model_update.get('replication_status') !=
fields.ReplicationStatus.ERROR):
model_update['replication_status'] = update.get(
'replication_status')
if model_update:
if (model_update.get('replication_status') ==
fields.ReplicationStatus.ERROR):
msg = _('Enable replication failed.')
LOG.error(msg,
resource={'type': 'group',
'id': group.id})
raise exception.VolumeDriverException(message=msg)
else:
group.update(model_update)
group.save()
except exception.CinderException as ex:
group.status = fields.GroupStatus.ERROR
group.replication_status = fields.ReplicationStatus.ERROR
group.save()
# Update volume status to 'error' if driver returns
# None for volumes_model_update.
if not volumes_model_update:
for vol in volumes:
vol.status = 'error'
vol.replication_status = fields.ReplicationStatus.ERROR
vol.save()
err_msg = _("Enable replication group failed: "
"%s.") % six.text_type(ex)
raise exception.ReplicationGroupError(reason=err_msg,
group_id=group.id)
for vol in volumes:
vol.replication_status = fields.ReplicationStatus.ENABLED
vol.save()
group.replication_status = fields.ReplicationStatus.ENABLED
group.save()
self._notify_about_group_usage(
ctxt, group, "enable_replication.end", volumes)
LOG.info("Enable replication completed successfully.",
resource={'type': 'group',
'id': group.id})
# Replication group API (Tiramisu)
def disable_replication(self, ctxt, group):
"""Disable replication."""
group.refresh()
if group.replication_status != fields.ReplicationStatus.DISABLING:
msg = _("Replication status in group %s is not "
"disabling. Cannot disable replication.") % group.id
LOG.error(msg)
raise exception.InvalidGroup(reason=msg)
volumes = group.volumes
for vol in volumes:
vol.refresh()
if (vol.replication_status !=
fields.ReplicationStatus.DISABLING):
msg = _("Replication status in volume %s is not "
"disabling. Cannot disable replication.") % vol.id
LOG.error(msg)
raise exception.InvalidVolume(reason=msg)
self._notify_about_group_usage(
ctxt, group, "disable_replication.start")
volumes_model_update = None
model_update = None
try:
utils.require_driver_initialized(self.driver)
model_update, volumes_model_update = (
self.driver.disable_replication(ctxt, group, volumes))
if volumes_model_update:
for update in volumes_model_update:
vol_obj = objects.Volume.get_by_id(ctxt, update['id'])
vol_obj.update(update)
vol_obj.save()
# If we failed to enable a volume, make sure the status
# for the group is set to error as well
if (update.get('replication_status') ==
fields.ReplicationStatus.ERROR and
model_update.get('replication_status') !=
fields.ReplicationStatus.ERROR):
model_update['replication_status'] = update.get(
'replication_status')
if model_update:
if (model_update.get('replication_status') ==
fields.ReplicationStatus.ERROR):
msg = _('Disable replication failed.')
LOG.error(msg,
resource={'type': 'group',
'id': group.id})
raise exception.VolumeDriverException(message=msg)
else:
group.update(model_update)
group.save()
except exception.CinderException as ex:
group.status = fields.GroupStatus.ERROR
group.replication_status = fields.ReplicationStatus.ERROR
group.save()
# Update volume status to 'error' if driver returns
# None for volumes_model_update.
if not volumes_model_update:
for vol in volumes:
vol.status = 'error'
vol.replication_status = fields.ReplicationStatus.ERROR
vol.save()
err_msg = _("Disable replication group failed: "
"%s.") % six.text_type(ex)
raise exception.ReplicationGroupError(reason=err_msg,
group_id=group.id)
for vol in volumes:
vol.replication_status = fields.ReplicationStatus.DISABLED
vol.save()
group.replication_status = fields.ReplicationStatus.DISABLED
group.save()
self._notify_about_group_usage(
ctxt, group, "disable_replication.end", volumes)
LOG.info("Disable replication completed successfully.",
resource={'type': 'group',
'id': group.id})
# Replication group API (Tiramisu)
def failover_replication(self, ctxt, group, allow_attached_volume=False,
secondary_backend_id=None):
"""Failover replication."""
group.refresh()
if group.replication_status != fields.ReplicationStatus.FAILING_OVER:
msg = _("Replication status in group %s is not "
"failing-over. Cannot failover replication.") % group.id
LOG.error(msg)
raise exception.InvalidGroup(reason=msg)
volumes = group.volumes
for vol in volumes:
vol.refresh()
if vol.status == 'in-use' and not allow_attached_volume:
msg = _("Volume %s is attached but allow_attached_volume flag "
"is False. Cannot failover replication.") % vol.id
LOG.error(msg)
raise exception.InvalidVolume(reason=msg)
if (vol.replication_status !=
fields.ReplicationStatus.FAILING_OVER):
msg = _("Replication status in volume %s is not "
"failing-over. Cannot failover replication.") % vol.id
LOG.error(msg)
raise exception.InvalidVolume(reason=msg)
self._notify_about_group_usage(
ctxt, group, "failover_replication.start")
volumes_model_update = None
model_update = None
try:
utils.require_driver_initialized(self.driver)
model_update, volumes_model_update = (
self.driver.failover_replication(
ctxt, group, volumes, secondary_backend_id))
if volumes_model_update:
for update in volumes_model_update:
vol_obj = objects.Volume.get_by_id(ctxt, update['id'])
vol_obj.update(update)
vol_obj.save()
# If we failed to enable a volume, make sure the status
# for the group is set to error as well
if (update.get('replication_status') ==
fields.ReplicationStatus.ERROR and
model_update.get('replication_status') !=
fields.ReplicationStatus.ERROR):
model_update['replication_status'] = update.get(
'replication_status')
if model_update:
if (model_update.get('replication_status') ==
fields.ReplicationStatus.ERROR):
msg = _('Failover replication failed.')
LOG.error(msg,
resource={'type': 'group',
'id': group.id})
raise exception.VolumeDriverException(message=msg)
else:
group.update(model_update)
group.save()
except exception.CinderException as ex:
group.status = fields.GroupStatus.ERROR
group.replication_status = fields.ReplicationStatus.ERROR
group.save()
# Update volume status to 'error' if driver returns
# None for volumes_model_update.
if not volumes_model_update:
for vol in volumes:
vol.status = 'error'
vol.replication_status = fields.ReplicationStatus.ERROR
vol.save()
err_msg = _("Failover replication group failed: "
"%s.") % six.text_type(ex)
raise exception.ReplicationGroupError(reason=err_msg,
group_id=group.id)
for vol in volumes:
if secondary_backend_id == "default":
vol.replication_status = fields.ReplicationStatus.ENABLED
else:
vol.replication_status = (
fields.ReplicationStatus.FAILED_OVER)
vol.save()
if secondary_backend_id == "default":
group.replication_status = fields.ReplicationStatus.ENABLED
else:
group.replication_status = fields.ReplicationStatus.FAILED_OVER
group.save()
self._notify_about_group_usage(
ctxt, group, "failover_replication.end", volumes)
LOG.info("Failover replication completed successfully.",
resource={'type': 'group',
'id': group.id})
def list_replication_targets(self, ctxt, group):
"""Provide a means to obtain replication targets for a group.
This method is used to find the replication_device config
info. 'backend_id' is a required key in 'replication_device'.
Response Example for admin:
{
'replication_targets': [
{
'backend_id': 'vendor-id-1',
'unique_key': 'val1',
......
},
{
'backend_id': 'vendor-id-2',
'unique_key': 'val2',
......
}
]
}
Response example for non-admin:
{
'replication_targets': [
{
'backend_id': 'vendor-id-1'
},
{
'backend_id': 'vendor-id-2'
}
]
}
"""
replication_targets = []
try:
group = objects.Group.get_by_id(ctxt, group.id)
if self.configuration.replication_device:
if ctxt.is_admin:
for rep_dev in self.configuration.replication_device:
keys = rep_dev.keys()
dev = {}
for k in keys:
dev[k] = rep_dev[k]
replication_targets.append(dev)
else:
for rep_dev in self.configuration.replication_device:
dev = rep_dev.get('backend_id')
if dev:
replication_targets.append({'backend_id': dev})
except exception.GroupNotFound:
err_msg = (_("Get replication targets failed. Group %s not "
"found.") % group.id)
LOG.exception(err_msg)
raise exception.VolumeBackendAPIException(data=err_msg)
return {'replication_targets': replication_targets}

View File

@ -130,9 +130,11 @@ class VolumeAPI(rpc.RPCAPI):
3.12 - Adds set_log_levels and get_log_levels
3.13 - Add initialize_connection_snapshot,
terminate_connection_snapshot, and remove_export_snapshot.
3.14 - Adds enable_replication, disable_replication,
failover_replication, and list_replication_targets.
"""
RPC_API_VERSION = '3.13'
RPC_API_VERSION = '3.14'
RPC_DEFAULT_VERSION = '3.0'
TOPIC = constants.VOLUME_TOPIC
BINARY = 'cinder-volume'
@ -459,3 +461,29 @@ class VolumeAPI(rpc.RPCAPI):
def get_log_levels(self, context, service, log_request):
cctxt = self._get_cctxt(host=service.host, version='3.12')
return cctxt.call(context, 'get_log_levels', log_request=log_request)
@rpc.assert_min_rpc_version('3.14')
def enable_replication(self, ctxt, group):
cctxt = self._get_cctxt(group.host, version='3.14')
cctxt.cast(ctxt, 'enable_replication',
group=group)
@rpc.assert_min_rpc_version('3.14')
def disable_replication(self, ctxt, group):
cctxt = self._get_cctxt(group.host, version='3.14')
cctxt.cast(ctxt, 'disable_replication',
group=group)
@rpc.assert_min_rpc_version('3.14')
def failover_replication(self, ctxt, group, allow_attached_volume=False,
secondary_backend_id=None):
cctxt = self._get_cctxt(group.host, version='3.14')
cctxt.cast(ctxt, 'failover_replication',
group=group, allow_attached_volume=allow_attached_volume,
secondary_backend_id=secondary_backend_id)
@rpc.assert_min_rpc_version('3.14')
def list_replication_targets(self, ctxt, group):
cctxt = self._get_cctxt(group.host, version='3.14')
return cctxt.call(ctxt, 'list_replication_targets',
group=group)

View File

@ -926,3 +926,38 @@ def is_group_a_cg_snapshot_type(group_or_snap):
)
return spec == "<is> True"
return False
def is_group_a_type(group, key):
if group.group_type_id is not None:
spec = group_types.get_group_type_specs(
group.group_type_id, key=key
)
return spec == "<is> True"
return False
def is_group_a_non_consistent_replication_group_type(group):
return is_group_a_type(group, "group_replication_enabled")
def is_group_a_consistent_replication_group_type(group):
return is_group_a_type(group, "consistent_group_replication_enabled")
def is_group_a_replication_group_type(group):
if (is_group_a_non_consistent_replication_group_type(group) or
is_group_a_consistent_replication_group_type(group)):
return True
return False
def get_replication_groups_by_host(ctxt, host):
groups = []
filters = {'host': host, 'backend_match_level': 'backend'}
grps = objects.GroupList.get_all(ctxt, filters=filters)
for grp in grps:
if is_group_a_replication_group_type(grp):
groups.append(grp)
return groups

View File

@ -132,6 +132,11 @@
"group:reset_group_snapshot_status":"rule:admin_api",
"group:reset_status":"rule:admin_api",
"group:enable_replication": "rule:admin_or_owner",
"group:disable_replication": "rule:admin_or_owner",
"group:failover_replication": "rule:admin_or_owner",
"group:list_replication_targets": "rule:admin_or_owner",
"scheduler_extension:scheduler_stats:get_pools" : "rule:admin_api",
"message:delete": "rule:admin_or_owner",
"message:get": "rule:admin_or_owner",

View File

@ -0,0 +1,6 @@
---
features:
- |
Introduced replication group support and added group action APIs
enable_replication, disable_replication, failover_replication and
list_replication_targets.