Redis Cluster Initial Implementation

Implements support for clustering for the Redis database,
including a clustering stategy and guest agent APIs.
Includes support for the new cluster scaling methods.

Includes unit tests and int-tests (run cluster specific
int-tests with "redstack int-tests --group=dbaas.api.redis").

Change-Id: Id701a75b5a8cf1fa2c69f0111ee820021bd16c09
Depends-On: Icc9b4eea8ed7db1455692823d29586088cfc9434
Implements: blueprint redis-cluster
This commit is contained in:
Morgan Jones 2015-07-20 11:36:57 -04:00 committed by Craig Vyvial
parent f8f3183527
commit b3f62412c2
20 changed files with 1634 additions and 23 deletions

View File

@ -20,6 +20,7 @@ from trove.cluster.tasks import ClusterTasks
from trove.common import cfg
from trove.common import exception
from trove.common.i18n import _
from trove.common import remote
from trove.common.strategies.cluster import strategy
from trove.datastore import models as datastore_models
from trove.db import models as dbmodels
@ -84,6 +85,12 @@ class Cluster(object):
self.ds = (datastore_models.Datastore.
load(self.ds_version.datastore_id))
@classmethod
def get_guest(cls, instance):
return remote.create_guest_client(instance.context,
instance.db_info.id,
instance.datastore_version.manager)
@classmethod
def load_all(cls, context, tenant_id):
db_infos = DBCluster.find_all(tenant_id=tenant_id,
@ -186,16 +193,19 @@ class Cluster(object):
return api_strategy.cluster_class.create(context, name, datastore,
datastore_version, instances)
def delete(self):
if self.db_info.task_status not in (ClusterTasks.NONE,
ClusterTasks.DELETING):
current_task = self.db_info.task_status.name
msg = _("This action cannot be performed on the cluster while "
"the current cluster task is '%s'.") % current_task
def validate_cluster_available(self, valid_states=[ClusterTasks.NONE]):
if self.db_info.task_status not in valid_states:
msg = (_("This action cannot be performed on the cluster while "
"the current cluster task is '%s'.") %
self.db_info.task_status.name)
LOG.error(msg)
raise exception.UnprocessableEntity(msg)
def delete(self):
self.validate_cluster_available([ClusterTasks.NONE,
ClusterTasks.DELETING])
db_insts = inst_models.DBInstance.find_all(cluster_id=self.id,
deleted=False).all()
@ -223,7 +233,8 @@ class Cluster(object):
def is_cluster_deleting(context, cluster_id):
cluster = Cluster.load(context, cluster_id)
return cluster.db_info.task_status == ClusterTasks.DELETING
return (cluster.db_info.task_status == ClusterTasks.DELETING
or cluster.db_info.task_status == ClusterTasks.SHRINKING_CLUSTER)
def validate_volume_size(size):

View File

@ -72,7 +72,13 @@ class ClusterController(wsgi.Controller):
"by strategy for manager '%(manager)s'") % (
{'action': key, 'manager': manager})
raise exception.TroveError(message)
return selected_action(cluster, body)
cluster = selected_action(cluster, body)
if cluster:
view = views.load_view(cluster, req=req, load_servers=False)
wsgi_result = wsgi.Result(view.data(), 202)
else:
wsgi_result = wsgi.Result(None, 202)
return wsgi_result
def show(self, req, tenant_id, id):
"""Return a single cluster."""

View File

@ -65,6 +65,10 @@ class ClusterTasks(object):
DELETING = ClusterTask(0x03, 'DELETING', 'Deleting the cluster.')
ADDING_SHARD = ClusterTask(
0x04, 'ADDING_SHARD', 'Adding a shard to the cluster.')
GROWING_CLUSTER = ClusterTask(
0x05, 'GROWING_CLUSTER', 'Increasing the size of the cluster.')
SHRINKING_CLUSTER = ClusterTask(
0x06, 'SHRINKING_CLUSTER', 'Decreasing the size of the cluster.')
# Dissuade further additions at run-time.
ClusterTask.__init__ = None

View File

@ -255,6 +255,46 @@ cluster = {
"type": "object"
}
}
},
"grow": {
"type": "object",
"required": ["grow"],
"additionalProperties": True,
"properties": {
"grow": {
"type": "array",
"items": {
"type": "object",
"required": ["flavorRef"],
"additionalProperties": True,
"properties": {
"name": non_empty_string,
"flavorRef": flavorref,
"volume": volume,
"nics": nics,
"availability_zone": non_empty_string
}
}
}
}
},
"shrink": {
"type": "object",
"required": ["shrink"],
"additionalProperties": True,
"properties": {
"shrink": {
"type": "array",
"items": {
"type": "object",
"required": ["id"],
"additionalProperties": True,
"properties": {
"id": uuid
}
}
}
}
}
}

View File

@ -546,7 +546,7 @@ redis_group = cfg.OptGroup(
'redis', title='Redis options',
help="Oslo option group designed for Redis datastore")
redis_opts = [
cfg.ListOpt('tcp_ports', default=["6379"],
cfg.ListOpt('tcp_ports', default=["6379", "16379"],
help='List of TCP ports and/or port ranges to open '
'in the security group (only applicable '
'if trove_security_groups_support is True).'),
@ -569,7 +569,7 @@ redis_opts = [
cfg.StrOpt('mount_point', default='/var/lib/redis',
help="Filesystem path for mounting "
"volumes if volume support is enabled."),
cfg.BoolOpt('volume_support', default=False,
cfg.BoolOpt('volume_support', default=True,
help='Whether to provision a Cinder volume for datadir.'),
cfg.StrOpt('device_path', default=None,
help='Device path for volume if volume support is enabled.'),
@ -585,6 +585,22 @@ redis_opts = [
help='Namespace to load restore strategies from.',
deprecated_name='restore_namespace',
deprecated_group='DEFAULT'),
cfg.BoolOpt('cluster_support', default=True,
help='Enable clusters to be created and managed.'),
cfg.StrOpt('api_strategy',
default='trove.common.strategies.cluster.experimental.'
'redis.api.RedisAPIStrategy',
help='Class that implements datastore-specific API logic.'),
cfg.StrOpt('taskmanager_strategy',
default='trove.common.strategies.cluster.experimental.redis.'
'taskmanager.RedisTaskManagerStrategy',
help='Class that implements datastore-specific task manager '
'logic.'),
cfg.StrOpt('guestagent_strategy',
default='trove.common.strategies.cluster.experimental.'
'redis.guestagent.RedisGuestAgentStrategy',
help='Class that implements datastore-specific Guest Agent API '
'logic.'),
]
# Cassandra

View File

@ -25,7 +25,6 @@ from trove.common.i18n import _
from trove.common import remote
from trove.common.strategies.cluster import base
from trove.common import utils
from trove.common import wsgi
from trove.datastore import models as datastore_models
from trove.extensions.mgmt.clusters.views import MgmtClusterView
from trove.instance import models as inst_models
@ -49,7 +48,6 @@ class MongoDbAPIStrategy(base.BaseAPIStrategy):
def _action_add_shard(self, cluster, body):
cluster.add_shard()
return wsgi.Result(None, 202)
@property
def cluster_view_class(self):

View File

@ -0,0 +1,222 @@
# Copyright [2015] Hewlett-Packard Development Company, L.P.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from novaclient import exceptions as nova_exceptions
from oslo_log import log as logging
from trove.cluster import models
from trove.cluster.models import Cluster
from trove.cluster.tasks import ClusterTasks
from trove.cluster.views import ClusterView
from trove.common import cfg
from trove.common import exception
from trove.common.exception import TroveError
from trove.common import remote
from trove.common.strategies.cluster import base
from trove.common import utils
from trove.extensions.mgmt.clusters.views import MgmtClusterView
from trove.instance import models as inst_models
from trove.quota.quota import check_quotas
from trove.taskmanager import api as task_api
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
class RedisAPIStrategy(base.BaseAPIStrategy):
@property
def cluster_class(self):
return RedisCluster
@property
def cluster_controller_actions(self):
return {
'grow': self._action_grow_cluster,
'shrink': self._action_shrink_cluster
}
def _action_grow_cluster(self, cluster, body):
nodes = body['grow']
instances = []
for node in nodes:
instance = {
'flavor_id': utils.get_id_from_href(node['flavorRef'])
}
if 'name' in node:
instance['name'] = node['name']
if 'volume' in node:
instance['volume_size'] = int(node['volume']['size'])
instances.append(instance)
return cluster.grow(instances)
def _action_shrink_cluster(self, cluster, body):
nodes = body['shrink']
instance_ids = [node['id'] for node in nodes]
return cluster.shrink(instance_ids)
@property
def cluster_view_class(self):
return RedisClusterView
@property
def mgmt_cluster_view_class(self):
return RedisMgmtClusterView
class RedisCluster(models.Cluster):
@staticmethod
def _create_instances(context, db_info, datastore, datastore_version,
instances):
Redis_conf = CONF.get(datastore_version.manager)
num_instances = len(instances)
total_volume_allocation = 0
# Validate and Cache flavors
nova_client = remote.create_nova_client(context)
unique_flavors = set(map(lambda i: i['flavor_id'], instances))
flavor_cache = {}
for fid in unique_flavors:
try:
flavor_cache.update({fid: nova_client.flavors.get(fid)})
except nova_exceptions.NotFound:
raise exception.FlavorNotFound(uuid=fid)
# Checking volumes
name_index = 1
for instance in instances:
if not instance.get('name'):
instance['name'] = "%s-member-%s" % (db_info.name, name_index)
name_index += 1
volume_size = instance.get('volume_size')
if Redis_conf.volume_support:
models.validate_volume_size(volume_size)
total_volume_allocation += volume_size
else:
if volume_size:
raise exception.VolumeNotSupported()
ephemeral_support = Redis_conf.device_path
flavor_id = instance['flavor_id']
flavor = flavor_cache[flavor_id]
if ephemeral_support and flavor.ephemeral == 0:
raise exception.LocalStorageNotSpecified(flavor=flavor_id)
# Check quotas
quota_request = {'instances': num_instances,
'volumes': total_volume_allocation}
check_quotas(context.tenant, quota_request)
# Creating member instances
return map(lambda instance:
inst_models.Instance.create(context,
instance['name'],
instance['flavor_id'],
datastore_version.image_id,
[], [],
datastore, datastore_version,
instance.get('volume_size'),
None,
instance.get(
'availability_zone', None),
instance.get('nics', None),
configuration_id=None,
cluster_config={
"id": db_info.id,
"instance_type": "member"}
),
instances)
@classmethod
def create(cls, context, name, datastore, datastore_version, instances):
LOG.debug("Initiating cluster creation.")
# Updating Cluster Task
db_info = models.DBCluster.create(
name=name, tenant_id=context.tenant,
datastore_version_id=datastore_version.id,
task_status=ClusterTasks.BUILDING_INITIAL)
cls._create_instances(context, db_info, datastore, datastore_version,
instances)
# Calling taskmanager to further proceed for cluster-configuration
task_api.load(context, datastore_version.manager).create_cluster(
db_info.id)
return RedisCluster(context, db_info, datastore, datastore_version)
def grow(self, instances):
LOG.debug("Growing cluster.")
self.validate_cluster_available()
context = self.context
db_info = self.db_info
datastore = self.ds
datastore_version = self.ds_version
db_info.update(task_status=ClusterTasks.GROWING_CLUSTER)
new_instances = self._create_instances(context, db_info,
datastore, datastore_version,
instances)
task_api.load(context, datastore_version.manager).grow_cluster(
db_info.id, [instance.id for instance in new_instances])
return RedisCluster(context, db_info, datastore, datastore_version)
def shrink(self, removal_ids):
LOG.debug("Shrinking cluster %s.", self.id)
self.validate_cluster_available()
cluster_info = self.db_info
cluster_info.update(task_status=ClusterTasks.SHRINKING_CLUSTER)
try:
removal_insts = [inst_models.Instance.load(self.context, inst_id)
for inst_id in removal_ids]
node_ids = [Cluster.get_guest(instance).get_node_id_for_removal()
for instance in removal_insts]
if None in node_ids:
raise TroveError(_("Some nodes cannot be removed (check slots)"
))
all_instances = (
inst_models.DBInstance.find_all(cluster_id=self.id,
deleted=False).all())
remain_insts = [inst_models.Instance.load(self.context, inst.id)
for inst in all_instances
if inst.id not in removal_ids]
map(lambda x: Cluster.get_guest(x).remove_nodes(node_ids),
remain_insts)
map(lambda x: x.update_db(cluster_id=None), removal_insts)
map(inst_models.Instance.delete, removal_insts)
return RedisCluster(self.context, cluster_info,
self.ds, self.ds_version)
finally:
cluster_info.update(task_status=ClusterTasks.NONE)
class RedisClusterView(ClusterView):
def build_instances(self):
return self._build_instances(['member'], ['member'])
class RedisMgmtClusterView(MgmtClusterView):
def build_instances(self):
return self._build_instances(['member'], ['member'])

View File

@ -0,0 +1,63 @@
# Copyright [2015] Hewlett-Packard Development Company, L.P.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_log import log as logging
from trove.common import cfg
from trove.common.strategies.cluster import base
from trove.guestagent import api as guest_api
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
class RedisGuestAgentStrategy(base.BaseGuestAgentStrategy):
@property
def guest_client_class(self):
return RedisGuestAgentAPI
class RedisGuestAgentAPI(guest_api.API):
def get_node_ip(self):
LOG.debug("Retrieve ip info from node.")
return self._call("get_node_ip",
guest_api.AGENT_HIGH_TIMEOUT, self.version_cap)
def get_node_id_for_removal(self):
LOG.debug("Validating cluster node removal.")
return self._call("get_node_id_for_removal",
guest_api.AGENT_HIGH_TIMEOUT, self.version_cap)
def remove_nodes(self, node_ids):
LOG.debug("Removing nodes from cluster.")
return self._call("remove_nodes", guest_api.AGENT_HIGH_TIMEOUT,
self.version_cap, node_ids=node_ids)
def cluster_meet(self, ip, port):
LOG.debug("Joining node to cluster.")
return self._call("cluster_meet", guest_api.AGENT_HIGH_TIMEOUT,
self.version_cap, ip=ip, port=port)
def cluster_addslots(self, first_slot, last_slot):
LOG.debug("Adding slots %s-%s to cluster.", first_slot, last_slot)
return self._call("cluster_addslots",
guest_api.AGENT_HIGH_TIMEOUT, self.version_cap,
first_slot=first_slot, last_slot=last_slot)
def cluster_complete(self):
LOG.debug("Notifying cluster install completion.")
return self._call("cluster_complete", guest_api.AGENT_HIGH_TIMEOUT,
self.version_cap)

View File

@ -0,0 +1,157 @@
# Copyright [2015] Hewlett-Packard Development Company, L.P.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from eventlet.timeout import Timeout
from oslo_log import log as logging
from trove.common import cfg
from trove.common.exception import TroveError
from trove.common.i18n import _
from trove.common.strategies.cluster import base
from trove.instance.models import DBInstance
from trove.instance.models import Instance
from trove.taskmanager import api as task_api
import trove.taskmanager.models as task_models
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
USAGE_SLEEP_TIME = CONF.usage_sleep_time # seconds.
class RedisTaskManagerStrategy(base.BaseTaskManagerStrategy):
@property
def task_manager_api_class(self):
return RedisTaskManagerAPI
@property
def task_manager_cluster_tasks_class(self):
return RedisClusterTasks
class RedisClusterTasks(task_models.ClusterTasks):
def create_cluster(self, context, cluster_id):
LOG.debug("Begin create_cluster for id: %s." % cluster_id)
def _create_cluster():
# Fetch instances by cluster_id against instances table.
db_instances = DBInstance.find_all(cluster_id=cluster_id).all()
instance_ids = [db_instance.id for db_instance in db_instances]
# Wait for cluster members to get to cluster-ready status.
if not self._all_instances_ready(instance_ids, cluster_id):
return
LOG.debug("All members ready, proceeding for cluster setup.")
instances = [Instance.load(context, instance_id) for instance_id
in instance_ids]
# Connect nodes to the first node
guests = [self.get_guest(instance) for instance in instances]
try:
cluster_head = instances[0]
cluster_head_port = '6379'
cluster_head_ip = self.get_ip(cluster_head)
for guest in guests[1:]:
guest.cluster_meet(cluster_head_ip, cluster_head_port)
num_nodes = len(instances)
total_slots = 16384
slots_per_node = total_slots / num_nodes
leftover_slots = total_slots % num_nodes
first_slot = 0
for guest in guests:
last_slot = first_slot + slots_per_node
if leftover_slots > 0:
leftover_slots -= 1
else:
last_slot -= 1
guest.cluster_addslots(first_slot, last_slot)
first_slot = last_slot + 1
for guest in guests:
guest.cluster_complete()
except Exception:
LOG.exception(_("Error creating cluster."))
self.update_statuses_on_failure(cluster_id)
timeout = Timeout(CONF.cluster_usage_timeout)
try:
_create_cluster()
self.reset_task()
except Timeout as t:
if t is not timeout:
raise # not my timeout
LOG.exception(_("Timeout for building cluster."))
self.update_statuses_on_failure(cluster_id)
finally:
timeout.cancel()
LOG.debug("End create_cluster for id: %s." % cluster_id)
def grow_cluster(self, context, cluster_id, new_instance_ids):
LOG.debug("Begin grow_cluster for id: %s." % cluster_id)
def _grow_cluster():
db_instances = DBInstance.find_all(cluster_id=cluster_id).all()
cluster_head = next(Instance.load(context, db_inst.id)
for db_inst in db_instances
if db_inst.id not in new_instance_ids)
if not cluster_head:
raise TroveError("Unable to determine existing Redis cluster "
"member")
(cluster_head_ip, cluster_head_port) = (
self.get_guest(cluster_head).get_node_ip())
# Wait for cluster members to get to cluster-ready status.
if not self._all_instances_ready(new_instance_ids, cluster_id):
return
LOG.debug("All members ready, proceeding for cluster setup.")
new_insts = [Instance.load(context, instance_id)
for instance_id in new_instance_ids]
new_guests = map(self.get_guest, new_insts)
# Connect nodes to the cluster head
for guest in new_guests:
guest.cluster_meet(cluster_head_ip, cluster_head_port)
for guest in new_guests:
guest.cluster_complete()
timeout = Timeout(CONF.cluster_usage_timeout)
try:
_grow_cluster()
self.reset_task()
except Timeout as t:
if t is not timeout:
raise # not my timeout
LOG.exception(_("Timeout for growing cluster."))
self.update_statuses_on_failure(cluster_id)
except Exception:
LOG.exception(_("Error growing cluster %s.") % cluster_id)
self.update_statuses_on_failure(cluster_id)
finally:
timeout.cancel()
LOG.debug("End grow_cluster for id: %s." % cluster_id)
class RedisTaskManagerAPI(task_api.API):
pass

View File

@ -106,20 +106,27 @@ class Manager(periodic_task.PeriodicTasks):
LOG.debug('Mounted the volume.')
self._app.install_if_needed(packages)
LOG.info(_('Writing redis configuration.'))
if cluster_config:
config_contents = (config_contents + "\n"
+ "cluster-enabled yes\n"
+ "cluster-config-file cluster.conf\n")
self._app.configuration_manager.save_configuration(config_contents)
self._app.apply_initial_guestagent_configuration()
if backup_info:
persistence_dir = self._app.get_working_dir()
self._perform_restore(backup_info, context, persistence_dir,
self._app)
self._app.status.end_install_or_restart()
self._app.restart()
if cluster_config:
self._app.status.set_status(
rd_instance.ServiceStatuses.BUILD_PENDING)
else:
self._app.restart()
self._app.complete_install_or_restart()
LOG.info(_('Redis instance has been setup and configured.'))
except Exception:
LOG.exception(_("Error setting up Redis instance."))
self._app.status.set_status(rd_instance.ServiceStatuses.FAILED)
raise RuntimeError("prepare call has failed.")
raise
def restart(self, context):
"""
@ -297,3 +304,28 @@ class Manager(periodic_task.PeriodicTasks):
LOG.debug("Demoting replica source.")
raise exception.DatastoreOperationNotSupported(
operation='demote_replication_master', datastore=MANAGER)
def cluster_meet(self, context, ip, port):
LOG.debug("Executing cluster_meet to join node to cluster.")
self._app.cluster_meet(ip, port)
def get_node_ip(self, context):
LOG.debug("Retrieving cluster node ip address.")
return self._app.get_node_ip()
def get_node_id_for_removal(self, context):
LOG.debug("Validating removal of node from cluster.")
return self._app.get_node_id_for_removal()
def remove_nodes(self, context, node_ids):
LOG.debug("Removing nodes from cluster.")
self._app.remove_nodes(node_ids)
def cluster_addslots(self, context, first_slot, last_slot):
LOG.debug("Executing cluster_addslots to assign hash slots %s-%s.",
first_slot, last_slot)
self._app.cluster_addslots(first_slot, last_slot)
def cluster_complete(self, context):
LOG.debug("Cluster creation complete, starting status checks.")
self._app.complete_install_or_restart()

View File

@ -387,6 +387,73 @@ class RedisApp(object):
return utils.unpack_singleton(
self.configuration_manager.get_value(name, default))
def cluster_meet(self, ip, port):
try:
utils.execute_with_timeout('redis-cli', 'cluster', 'meet',
ip, port)
except exception.ProcessExecutionError:
LOG.exception(_('Error joining node to cluster at %s.'), ip)
raise
def cluster_addslots(self, first_slot, last_slot):
try:
slots = map(str, range(first_slot, last_slot + 1))
group_size = 200
while slots:
cmd = ([system.REDIS_CLI, 'cluster', 'addslots']
+ slots[0:group_size])
out, err = utils.execute_with_timeout(*cmd, run_as_root=True,
root_helper='sudo')
if 'OK' not in out:
raise RuntimeError(_('Error executing addslots: %s')
% out)
del slots[0:group_size]
except exception.ProcessExecutionError:
LOG.exception(_('Error adding slots %(first_slot)s-%(last_slot)s'
' to cluster.'),
{'first_slot': first_slot, 'last_slot': last_slot})
raise
def _get_node_info(self):
try:
out, _ = utils.execute_with_timeout('redis-cli', '--csv',
'cluster', 'nodes')
return [line.split(' ') for line in out.splitlines()]
except exception.ProcessExecutionError:
LOG.exception(_('Error getting node info.'))
raise
def _get_node_details(self):
for node_details in self._get_node_info():
if 'myself' in node_details[2]:
return node_details
raise exception.TroveError(_("Unable to determine node details"))
def get_node_ip(self):
"""Returns [ip, port] where both values are strings"""
return self._get_node_details()[1].split(':')
def get_node_id_for_removal(self):
node_details = self._get_node_details()
node_id = node_details[0]
my_ip = node_details[1].split(':')[0]
try:
slots, _ = utils.execute_with_timeout('redis-cli', '--csv',
'cluster', 'slots')
return node_id if my_ip not in slots else None
except exception.ProcessExecutionError:
LOG.exception(_('Error validating node to for removal.'))
raise
def remove_nodes(self, node_ids):
try:
for node_id in node_ids:
utils.execute_with_timeout('redis-cli', 'cluster',
'forget', node_id)
except exception.ProcessExecutionError:
LOG.exception(_('Error removing node from cluster.'))
raise
class RedisAdmin(object):
"""Handles administrative tasks on the Redis database.

View File

@ -181,6 +181,20 @@ class API(object):
cctxt.cast(self.context, "create_cluster",
cluster_id=cluster_id)
def grow_cluster(self, cluster_id, new_instance_ids):
LOG.debug("Making async call to grow cluster %s " % cluster_id)
cctxt = self.client.prepare(version=self.version_cap)
cctxt.cast(self.context, "grow_cluster",
cluster_id=cluster_id, new_instance_ids=new_instance_ids)
def shrink_cluster(self, cluster_id, instance_ids):
LOG.debug("Making async call to shrink cluster %s " % cluster_id)
cctxt = self.client.prepare(version=self.version_cap)
cctxt.cast(self.context, "shrink_cluster",
cluster_id=cluster_id, instance_ids=instance_ids)
def delete_cluster(self, cluster_id):
LOG.debug("Making async call to delete cluster %s " % cluster_id)

View File

@ -342,6 +342,14 @@ class Manager(periodic_task.PeriodicTasks):
cluster_tasks = models.load_cluster_tasks(context, cluster_id)
cluster_tasks.create_cluster(context, cluster_id)
def grow_cluster(self, context, cluster_id, new_instance_ids):
cluster_tasks = models.load_cluster_tasks(context, cluster_id)
cluster_tasks.grow_cluster(context, cluster_id, new_instance_ids)
def shrink_cluster(self, context, cluster_id, instance_ids):
cluster_tasks = models.load_cluster_tasks(context, cluster_id)
cluster_tasks.shrink_cluster(context, cluster_id, instance_ids)
def delete_cluster(self, context, cluster_id):
cluster_tasks = models.load_cluster_tasks(context, cluster_id)
cluster_tasks.delete_cluster(context, cluster_id)

View File

@ -200,12 +200,6 @@ class ClusterTasks(Cluster):
def get_ip(cls, instance):
return instance.get_visible_ip_addresses()[0]
@classmethod
def get_guest(cls, instance):
return remote.create_guest_client(instance.context,
instance.db_info.id,
instance.datastore_version.manager)
def _all_instances_ready(self, instance_ids, cluster_id,
shard_id=None):

318
trove/tests/api/redis.py Normal file
View File

@ -0,0 +1,318 @@
# Copyright [2015] Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Integration tests for Redis datastore.
APIs tested for Redis are:
1. create
2. restart
3. resize-volume
4. resize-instance
5. delete
6. cluster-create
7. cluster-delete
"""
from proboscis import asserts
from proboscis.decorators import before_class
from proboscis import SkipTest
from proboscis import test
from troveclient.compat import exceptions
from trove.common import cfg
from trove.common.utils import poll_until
from trove.tests.api.instances import EPHEMERAL_SUPPORT
from trove.tests.api.instances import GROUP_START_SIMPLE
from trove.tests.api.instances import instance_info
from trove.tests.api.instances import WaitForGuestInstallationToFinish
from trove.tests.config import CONFIG
from trove.tests.util.check import TypeCheck
from trove.tests.util import create_dbaas_client
CONF = cfg.CONF
REDIS_GROUP = "dbaas.api.redis"
TIMEOUT = 2300
SLEEP_TIME = 60
@test(depends_on_groups=[GROUP_START_SIMPLE], groups=[REDIS_GROUP],
runs_after=[WaitForGuestInstallationToFinish])
class RedisTest(object):
"""Tests Redis Datastore Features."""
@before_class
def setUp(self):
self.instance = instance_info
self.rd_client = create_dbaas_client(self.instance.user)
self.report = CONFIG.get_report()
def _find_status(self, rd_client, instance_id, expected_status):
"""Tracks instance status, until it gets to expected_status."""
instance = rd_client.instances.get(instance_id)
self.report.log("Instance info %s." % instance._info)
if instance.status == expected_status:
self.report.log("Instance: %s is ready." % instance_id)
return True
else:
return False
@test
def test_instance_restart(self):
"""Tests the restart API."""
if not getattr(self, 'instance', None):
raise SkipTest(
"Skipping this test since instance is not available.")
self.rd_client = create_dbaas_client(self.instance.user)
self.rd_client.instances.restart(self.instance.id)
asserts.assert_equal(202, self.rd_client.last_http_code)
test_instance = self.rd_client.instances.get(self.instance.id)
asserts.assert_equal("REBOOT", test_instance.status)
poll_until(lambda: self._find_status(self.rd_client,
self.instance.id, "ACTIVE"),
sleep_time=SLEEP_TIME, time_out=TIMEOUT)
self.report.log("Restarted Instance: %s." % self.instance.id)
@test(depends_on=[test_instance_restart], enabled=False)
def test_instance_resize_volume(self):
"""Tests the resize volume API."""
old_volume_size = int(instance_info.volume['size'])
new_volume_size = old_volume_size + 1
if not getattr(self, 'instance', None):
raise SkipTest(
"Skipping this test since instance is not available.")
self.rd_client = create_dbaas_client(self.instance.user)
self.rd_client.instances.resize_volume(self.instance.id,
new_volume_size)
asserts.assert_equal(202, self.rd_client.last_http_code)
test_instance = self.rd_client.instances.get(self.instance.id)
asserts.assert_equal("RESIZE", test_instance.status)
poll_until(lambda: self._find_status(self.rd_client,
self.instance.id, "ACTIVE"),
sleep_time=SLEEP_TIME, time_out=TIMEOUT)
instance = self.rd_client.instances.get(self.instance.id)
asserts.assert_equal(instance.volume['size'], new_volume_size)
self.report.log("Resized Volume for Instance ID: %s to %s." % (
self.instance.id, new_volume_size))
@test(depends_on=[test_instance_resize_volume])
def test_instance_resize_flavor(self):
"""Tests the resize instance/flavor API."""
if EPHEMERAL_SUPPORT:
flavor_name = CONFIG.values.get('instance_bigger_eph_flavor_name',
'eph.rd-smaller')
else:
flavor_name = CONFIG.values.get('instance_bigger_flavor_name',
'm1.small')
flavors = self.instance.dbaas.find_flavors_by_name(flavor_name)
new_flavor = flavors[0]
asserts.assert_true(new_flavor is not None,
"Flavor '%s' not found!" % flavor_name)
if not getattr(self, 'instance', None):
raise SkipTest(
"Skipping this test since instance is not available.")
self.rd_client = create_dbaas_client(self.instance.user)
self.rd_client.instances.resize_instance(self.instance.id,
new_flavor.id)
asserts.assert_equal(202, self.rd_client.last_http_code)
test_instance = self.rd_client.instances.get(self.instance.id)
asserts.assert_equal("RESIZE", test_instance.status)
poll_until(lambda: self._find_status(self.rd_client,
self.instance.id, "ACTIVE"),
sleep_time=SLEEP_TIME, time_out=TIMEOUT)
test_instance = self.rd_client.instances.get(self.instance.id)
asserts.assert_equal(int(test_instance.flavor['id']), new_flavor.id)
self.report.log("Resized Flavor for Instance ID: %s to %s." % (
self.instance.id, new_flavor.id))
@test(depends_on=[test_instance_resize_flavor])
def test_instance_delete(self):
"""Tests the instance delete."""
if not getattr(self, 'instance', None):
raise SkipTest(
"Skipping this test since instance is not available.")
self.rd_client = create_dbaas_client(self.instance.user)
self.rd_client.instances.delete(self.instance.id)
asserts.assert_equal(202, self.rd_client.last_http_code)
test_instance = self.rd_client.instances.get(self.instance.id)
asserts.assert_equal("SHUTDOWN", test_instance.status)
def _poll():
try:
instance = self.rd_client.instances.get(self.instance.id)
self.report.log("Instance info %s" % instance._info)
asserts.assert_equal("SHUTDOWN", instance.status)
return False
except exceptions.NotFound:
self.report.log("Instance has gone.")
asserts.assert_equal(404, self.rd_client.last_http_code)
return True
poll_until(_poll, sleep_time=SLEEP_TIME, time_out=TIMEOUT)
self.report.log("Deleted Instance ID: %s " % self.instance.id)
@test(depends_on=[test_instance_delete])
def test_create_cluster_successfuly(self):
valid_request_body = [{"flavorRef": self.instance.dbaas_flavor_href,
'volume': {'size': 1}}] * 2
self.cluster = self.rd_client.clusters.create(
"test_cluster", self.instance.dbaas_datastore,
self.instance.dbaas_datastore_version,
instances=valid_request_body)
with TypeCheck('Cluster', self.cluster) as check:
check.has_field("id", basestring)
check.has_field("name", basestring)
check.has_field("datastore", dict)
check.has_field("instances", list)
check.has_field("links", list)
check.has_field("created", unicode)
check.has_field("updated", unicode)
for instance in self.cluster.instances:
isinstance(instance, dict)
asserts.assert_is_not_none(instance['id'])
asserts.assert_is_not_none(instance['links'])
asserts.assert_is_not_none(instance['name'])
asserts.assert_equal(200, self.rd_client.last_http_code)
def _cluster_is_active(self):
cluster = self.rd_client.clusters.get(self.cluster.id)
cluster_instances = [
self.rd_client.instances.get(instance['id'])
for instance in cluster.instances]
self.report.log("Cluster info %s." % cluster._info)
self.report.log("Cluster instances info %s." % cluster_instances)
if cluster.task['name'] == "NONE":
if ["ERROR"] * len(cluster_instances) == [
str(instance.status) for instance in cluster_instances]:
self.report.log("Cluster provisioning failed.")
asserts.fail("Cluster provisioning failed.")
if ["ACTIVE"] * len(cluster_instances) == [
str(instance.status) for instance in cluster_instances]:
self.report.log("Cluster is ready.")
return True
else:
asserts.assert_not_equal(
["ERROR"] * len(cluster_instances),
[instance.status
for instance in cluster_instances])
self.report.log("Continue polling, cluster is not ready yet.")
@test(depends_on=[test_create_cluster_successfuly])
def test_wait_until_cluster_is_active(self):
if not getattr(self, 'cluster', None):
raise SkipTest(
"Skipping this test since cluster is not available.")
poll_until(self._cluster_is_active,
sleep_time=SLEEP_TIME, time_out=TIMEOUT)
self.report.log("Created cluster, ID = %s." % self.cluster.id)
@test(depends_on=[test_wait_until_cluster_is_active])
def test_cluster_grow(self):
if not getattr(self, 'cluster', None):
raise SkipTest(
"Skipping this test since cluster is not available.")
beginning_instance_count = len(self.cluster.instances)
valid_request_body = [
{"name": "foo", "flavorRef": self.instance.dbaas_flavor_href,
'volume': {'size': 1}},
{"name": "bar", "flavorRef": self.instance.dbaas_flavor_href,
'volume': {'size': 1}}]
self.cluster = self.rd_client.clusters.grow(self.cluster.id,
valid_request_body)
asserts.assert_equal(2, len(self.cluster.instances)
- beginning_instance_count)
asserts.assert_equal(202, self.rd_client.last_http_code)
poll_until(self._cluster_is_active,
sleep_time=SLEEP_TIME, time_out=TIMEOUT)
@test(depends_on=[test_cluster_grow])
def test_cluster_shrink(self):
if not getattr(self, 'cluster', None):
raise SkipTest(
"Skipping this test since cluster is not available.")
foo_instance = None
for instance in self.cluster.instances:
if instance['name'] == 'foo':
foo_instance = instance
break
asserts.assert_is_not_none(foo_instance, "Could not find foo instance")
beginning_instance_count = len(self.cluster.instances)
valid_request_body = [{"id": foo_instance['id']}]
self.cluster = self.rd_client.clusters.shrink(self.cluster.id,
valid_request_body)
asserts.assert_equal(-1, len(self.cluster.instances)
- beginning_instance_count)
asserts.assert_equal(202, self.rd_client.last_http_code)
poll_until(self._cluster_is_active,
sleep_time=SLEEP_TIME, time_out=TIMEOUT)
@test(depends_on=[test_create_cluster_successfuly],
runs_after=[test_cluster_shrink])
def test_cluster_delete(self):
if not getattr(self, 'cluster', None):
raise SkipTest(
"Skipping this test since cluster is not available.")
self.rd_client.clusters.delete(self.cluster.id)
asserts.assert_equal(202, self.rd_client.last_http_code)
def _poll():
try:
cluster = self.rd_client.clusters.get(
self.cluster.id)
self.report.log("Cluster info %s" % cluster._info)
asserts.assert_equal("DELETING", cluster.task['name'])
return False
except exceptions.NotFound:
self.report.log("Cluster is not available.")
asserts.assert_equal(404, self.rd_client.last_http_code)
return True
poll_until(_poll, sleep_time=SLEEP_TIME, time_out=TIMEOUT)
self.report.log("Deleted cluster: %s." % self.cluster.id)

View File

@ -27,6 +27,7 @@ from trove.tests.api.mgmt import datastore_versions
from trove.tests.api.mgmt import hosts
from trove.tests.api.mgmt import instances as mgmt_instances
from trove.tests.api.mgmt import storage
from trove.tests.api import redis
from trove.tests.api import replication
from trove.tests.api import root
from trove.tests.api import user_access
@ -151,3 +152,15 @@ register(["mysql_group"], backup_groups, instance_actions_groups,
replication_groups)
register(["redis_group"], backup_groups, instance_actions_groups)
register(["vertica_group"], cluster_actions_groups, instance_actions_groups)
# Redis int-tests
redis_group = [
GROUP_SERVICES_INITIALIZE,
flavors.GROUP,
versions.GROUP,
instances.GROUP_START_SIMPLE,
instances.GROUP_QUOTAS,
redis.REDIS_GROUP,
]
proboscis.register(groups=["redis"],
depends_on_groups=redis_group)

View File

@ -0,0 +1,376 @@
# Copyright [2015] Hewlett-Packard Development Company, L.P.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import jsonschema
from mock import MagicMock
from mock import Mock
from mock import patch
from testtools.matchers import Is, Equals
from trove.cluster import models
from trove.cluster.models import Cluster
from trove.cluster.service import ClusterController
from trove.cluster import views
import trove.common.cfg as cfg
from trove.common import exception
from trove.common.strategies.cluster import strategy
from trove.common import utils
from trove.datastore import models as datastore_models
from trove.tests.unittests import trove_testtools
class TestClusterController(trove_testtools.TestCase):
def setUp(self):
super(TestClusterController, self).setUp()
self.controller = ClusterController()
instances = [
{
"volume_size": None,
"flavorRef": "7",
"availability_zone": "az",
"nics": [
{"net-id": "e89aa5fd-6b0a-436d-a75c-1545d34d5331"}
]
},
{
"volume_size": None,
"flavorRef": "8",
"availability_zone": "az",
"nics": [
{"net-id": "e89aa5fd-6b0a-436d-a75c-1545d34d5331"}
]
},
{
"volume_size": None,
"flavorRef": "7",
"availability_zone": "az",
"nics": [
{"net-id": "e89aa5fd-6b0a-436d-a75c-1545d34d5331"}
]
}
]
self.cluster = {
"cluster": {
"name": "products",
"datastore": {
"type": "redis",
"version": "3.0"
},
"instances": instances
}
}
def test_get_schema_create(self):
schema = self.controller.get_schema('create', self.cluster)
self.assertIsNotNone(schema)
self.assertTrue('cluster' in schema['properties'])
self.assertTrue('cluster')
def test_validate_create(self):
body = self.cluster
schema = self.controller.get_schema('create', body)
validator = jsonschema.Draft4Validator(schema)
self.assertTrue(validator.is_valid(body))
def test_validate_create_blankname(self):
body = self.cluster
body['cluster']['name'] = " "
schema = self.controller.get_schema('create', body)
validator = jsonschema.Draft4Validator(schema)
self.assertFalse(validator.is_valid(body))
errors = sorted(validator.iter_errors(body), key=lambda e: e.path)
self.assertThat(len(errors), Is(1))
self.assertThat(errors[0].message,
Equals("' ' does not match '^.*[0-9a-zA-Z]+.*$'"))
def test_validate_create_blank_datastore(self):
body = self.cluster
body['cluster']['datastore']['type'] = ""
schema = self.controller.get_schema('create', body)
validator = jsonschema.Draft4Validator(schema)
self.assertFalse(validator.is_valid(body))
errors = sorted(validator.iter_errors(body), key=lambda e: e.path)
error_messages = [error.message for error in errors]
error_paths = [error.path.pop() for error in errors]
self.assertThat(len(errors), Is(2))
self.assertIn("'' is too short", error_messages)
self.assertIn("'' does not match '^.*[0-9a-zA-Z]+.*$'", error_messages)
self.assertIn("type", error_paths)
@patch.object(Cluster, 'create')
@patch.object(datastore_models, 'get_datastore_version')
def test_create_clusters_disabled(self,
mock_get_datastore_version,
mock_cluster_create):
body = self.cluster
tenant_id = Mock()
context = Mock()
req = Mock()
req.environ = MagicMock()
req.environ.get = Mock(return_value=context)
datastore_version = Mock()
datastore_version.manager = 'mysql'
mock_get_datastore_version.return_value = (Mock(), datastore_version)
self.assertRaises(exception.ClusterDatastoreNotSupported,
self.controller.create,
req,
body,
tenant_id)
@patch.object(Cluster, 'create')
@patch.object(utils, 'get_id_from_href')
@patch.object(datastore_models, 'get_datastore_version')
def test_create_clusters(self,
mock_get_datastore_version,
mock_id_from_href,
mock_cluster_create):
body = self.cluster
tenant_id = Mock()
context = Mock()
req = Mock()
req.environ = Mock()
req.environ.__getitem__ = Mock(return_value=context)
datastore_version = Mock()
datastore_version.manager = 'redis'
datastore = Mock()
mock_get_datastore_version.return_value = (datastore,
datastore_version)
instances = [
{
"volume_size": None,
"flavor_id": "1234",
"availability_zone": "az",
"nics": [
{"net-id": "e89aa5fd-6b0a-436d-a75c-1545d34d5331"}
]
},
{
"volume_size": None,
"flavor_id": "1234",
"availability_zone": "az",
"nics": [
{"net-id": "e89aa5fd-6b0a-436d-a75c-1545d34d5331"}
]
},
{
"volume_size": None,
"flavor_id": "1234",
"availability_zone": "az",
"nics": [
{"net-id": "e89aa5fd-6b0a-436d-a75c-1545d34d5331"}
]
}
]
mock_id_from_href.return_value = '1234'
mock_cluster = Mock()
mock_cluster.instances = []
mock_cluster.instances_without_server = []
mock_cluster.datastore_version.manager = 'redis'
mock_cluster_create.return_value = mock_cluster
self.controller.create(req, body, tenant_id)
mock_cluster_create.assert_called_with(context, 'products',
datastore, datastore_version,
instances)
@patch.object(Cluster, 'load')
def test_show_cluster(self,
mock_cluster_load):
tenant_id = Mock()
id = Mock()
context = Mock()
req = Mock()
req.environ = Mock()
req.environ.__getitem__ = Mock(return_value=context)
mock_cluster = Mock()
mock_cluster.instances = []
mock_cluster.instances_without_server = []
mock_cluster.datastore_version.manager = 'redis'
mock_cluster_load.return_value = mock_cluster
self.controller.show(req, tenant_id, id)
mock_cluster_load.assert_called_with(context, id)
@patch.object(Cluster, 'load')
@patch.object(Cluster, 'load_instance')
def test_show_cluster_instance(self,
mock_cluster_load_instance,
mock_cluster_load):
tenant_id = Mock()
cluster_id = Mock()
instance_id = Mock()
context = Mock()
req = Mock()
req.environ = Mock()
req.environ.__getitem__ = Mock(return_value=context)
cluster = Mock()
mock_cluster_load.return_value = cluster
cluster.id = cluster_id
self.controller.show_instance(req, tenant_id, cluster_id, instance_id)
mock_cluster_load_instance.assert_called_with(context, cluster.id,
instance_id)
@patch.object(Cluster, 'load')
def test_delete_cluster(self, mock_cluster_load):
tenant_id = Mock()
cluster_id = Mock()
req = MagicMock()
cluster = Mock()
mock_cluster_load.return_value = cluster
self.controller.delete(req, tenant_id, cluster_id)
cluster.delete.assert_called_with()
class TestClusterControllerWithStrategy(trove_testtools.TestCase):
def setUp(self):
super(TestClusterControllerWithStrategy, self).setUp()
self.controller = ClusterController()
self.cluster = {
"cluster": {
"name": "products",
"datastore": {
"type": "redis",
"version": "3.0"
},
"instances": [
{
"flavorRef": "7",
"volume": {
"size": 1
},
},
{
"flavorRef": "7",
"volume": {
"size": 1
},
},
{
"flavorRef": "7",
"volume": {
"size": 1
},
},
]
}
}
def tearDown(self):
super(TestClusterControllerWithStrategy, self).tearDown()
cfg.CONF.clear_override('cluster_support', group='redis')
cfg.CONF.clear_override('api_strategy', group='redis')
@patch.object(datastore_models, 'get_datastore_version')
@patch.object(models.Cluster, 'create')
def test_create_clusters_disabled(self,
mock_cluster_create,
mock_get_datastore_version):
cfg.CONF.set_override('cluster_support', False, group='redis')
body = self.cluster
tenant_id = Mock()
context = Mock()
req = Mock()
req.environ = MagicMock()
req.environ.get = Mock(return_value=context)
datastore_version = Mock()
datastore_version.manager = 'redis'
mock_get_datastore_version.return_value = (Mock(), datastore_version)
self.assertRaises(exception.TroveError, self.controller.create, req,
body, tenant_id)
@patch.object(views.ClusterView, 'data', return_value={})
@patch.object(datastore_models, 'get_datastore_version')
@patch.object(models.Cluster, 'create')
def test_create_clusters_enabled(self,
mock_cluster_create,
mock_get_datastore_version,
mock_cluster_view_data):
cfg.CONF.set_override('cluster_support', True, group='redis')
body = self.cluster
tenant_id = Mock()
context = Mock()
req = Mock()
req.environ = MagicMock()
req.environ.get = Mock(return_value=context)
datastore_version = Mock()
datastore_version.manager = 'redis'
mock_get_datastore_version.return_value = (Mock(), datastore_version)
mock_cluster = Mock()
mock_cluster.datastore_version.manager = 'redis'
mock_cluster_create.return_value = mock_cluster
self.controller.create(req, body, tenant_id)
@patch.object(models.Cluster, 'load')
def test_controller_action_no_strategy(self,
mock_cluster_load):
body = {'do_stuff2': {}}
tenant_id = Mock()
context = Mock()
id = Mock()
req = Mock()
req.environ = MagicMock()
req.environ.get = Mock(return_value=context)
cluster = Mock()
cluster.datastore_version.manager = 'redis'
mock_cluster_load.return_value = cluster
self.assertRaises(exception.TroveError, self.controller.action, req,
body, tenant_id, id)
@patch.object(strategy, 'load_api_strategy')
@patch.object(models.Cluster, 'load')
def test_controller_action_found(self,
mock_cluster_load,
mock_cluster_api_strategy):
body = {'do_stuff': {}}
tenant_id = Mock()
context = Mock()
id = Mock()
req = Mock()
req.environ = MagicMock()
req.environ.get = Mock(return_value=context)
cluster = Mock()
cluster.datastore_version.manager = 'redis'
mock_cluster_load.return_value = cluster
strat = Mock()
do_stuff_func = Mock()
strat.cluster_controller_actions = \
{'do_stuff': do_stuff_func}
mock_cluster_api_strategy.return_value = strat
self.controller.action(req, body, tenant_id, id)
self.assertEqual(1, do_stuff_func.call_count)

View File

@ -0,0 +1,271 @@
# Copyright [2015] Hewlett-Packard Development Company, L.P.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import uuid
from mock import Mock
from mock import patch
from novaclient import exceptions as nova_exceptions
from trove.cluster.models import Cluster
from trove.cluster.models import ClusterTasks
from trove.cluster.models import DBCluster
from trove.common import cfg
from trove.common import exception
from trove.common import remote
from trove.common.strategies.cluster.experimental.redis import api as redis_api
from trove.instance import models as inst_models
from trove.instance.models import DBInstance
from trove.instance.models import InstanceTasks
from trove.quota.quota import QUOTAS
from trove.taskmanager import api as task_api
from trove.tests.unittests import trove_testtools
CONF = cfg.CONF
class FakeOptGroup(object):
def __init__(self, cluster_member_count=3,
volume_support=True, device_path='/dev/vdb'):
self.cluster_member_count = cluster_member_count
self.volume_support = volume_support
self.device_path = device_path
class ClusterTest(trove_testtools.TestCase):
def setUp(self):
super(ClusterTest, self).setUp()
self.cluster_id = str(uuid.uuid4())
self.cluster_name = "Cluster" + self.cluster_id
self.tenant_id = "23423432"
self.dv_id = "1"
self.db_info = DBCluster(ClusterTasks.NONE,
id=self.cluster_id,
name=self.cluster_name,
tenant_id=self.tenant_id,
datastore_version_id=self.dv_id,
task_id=ClusterTasks.NONE._code)
self.get_client_patch = patch.object(task_api.API, 'get_client')
self.get_client_mock = self.get_client_patch.start()
self.addCleanup(self.get_client_patch.stop)
self.dbcreate_patch = patch.object(DBCluster, 'create',
return_value=self.db_info)
self.dbcreate_mock = self.dbcreate_patch.start()
self.addCleanup(self.dbcreate_patch.stop)
self.context = Mock()
self.datastore = Mock()
self.dv = Mock()
self.dv.manager = "redis"
self.datastore_version = self.dv
self.cluster = redis_api.RedisCluster(self.context, self.db_info,
self.datastore,
self.datastore_version)
self.instances_w_volumes = [{'volume_size': 1,
'flavor_id': '1234'}] * 3
self.instances_no_volumes = [{'flavor_id': '1234'}] * 3
def tearDown(self):
super(ClusterTest, self).tearDown()
@patch.object(remote, 'create_nova_client')
def test_create_invalid_flavor_specified(self,
mock_client):
(mock_client.return_value.flavors.get) = Mock(
side_effect=nova_exceptions.NotFound(
404, "Flavor id not found %s" % id))
self.assertRaises(exception.FlavorNotFound,
Cluster.create,
Mock(),
self.cluster_name,
self.datastore,
self.datastore_version,
self.instances_w_volumes
)
@patch.object(remote, 'create_nova_client')
@patch.object(redis_api, 'CONF')
def test_create_volume_no_specified(self, mock_conf, mock_client):
mock_conf.get = Mock(
return_value=FakeOptGroup(volume_support=True))
self.assertRaises(exception.VolumeSizeNotSpecified,
Cluster.create,
Mock(),
self.cluster_name,
self.datastore,
self.datastore_version,
self.instances_no_volumes
)
@patch.object(remote, 'create_nova_client')
@patch.object(redis_api, 'CONF')
def test_create_storage_specified_with_no_volume_support(self,
mock_conf,
mock_client):
mock_conf.get = Mock(
return_value=FakeOptGroup(volume_support=False))
mock_client.return_value.flavors = Mock()
self.assertRaises(exception.VolumeNotSupported,
Cluster.create,
Mock(),
self.cluster_name,
self.datastore,
self.datastore_version,
self.instances_w_volumes
)
@patch.object(remote, 'create_nova_client')
@patch.object(redis_api, 'CONF')
def test_create_storage_not_specified_and_no_ephemeral_flavor(self,
mock_conf,
mock_client):
class FakeFlavor:
def __init__(self, flavor_id):
self.flavor_id = flavor_id
@property
def id(self):
return self.flavor.id
@property
def ephemeral(self):
return 0
mock_conf.get = Mock(
return_value=FakeOptGroup(volume_support=False))
(mock_client.return_value.
flavors.get.return_value) = FakeFlavor('1234')
self.assertRaises(exception.LocalStorageNotSpecified,
Cluster.create,
Mock(),
self.cluster_name,
self.datastore,
self.datastore_version,
self.instances_no_volumes
)
@patch.object(redis_api, 'CONF')
@patch.object(inst_models.Instance, 'create')
@patch.object(task_api, 'load')
@patch.object(QUOTAS, 'check_quotas')
@patch.object(remote, 'create_nova_client')
def test_create(self, mock_client, mock_check_quotas, mock_task_api,
mock_ins_create, mock_conf):
mock_conf.get = Mock(
return_value=FakeOptGroup(volume_support=True))
mock_client.return_value.flavors = Mock()
self.cluster.create(Mock(),
self.cluster_name,
self.datastore,
self.datastore_version,
self.instances_w_volumes)
mock_task_api.return_value.create_cluster.assert_called_with(
self.dbcreate_mock.return_value.id)
self.assertEqual(3, mock_ins_create.call_count)
@patch.object(redis_api, 'CONF')
@patch.object(inst_models.Instance, 'create')
@patch.object(task_api, 'load')
@patch.object(QUOTAS, 'check_quotas')
@patch.object(remote, 'create_nova_client')
def test_create_with_ephemeral_flavor(self, mock_client, mock_check_quotas,
mock_task_api, mock_ins_create,
mock_conf):
class FakeFlavor:
def __init__(self, flavor_id):
self.flavor_id = flavor_id
@property
def id(self):
return self.flavor.id
@property
def ephemeral(self):
return 1
mock_conf.get = Mock(
return_value=FakeOptGroup(volume_support=False))
(mock_client.return_value.
flavors.get.return_value) = FakeFlavor('1234')
self.cluster.create(Mock(),
self.cluster_name,
self.datastore,
self.datastore_version,
self.instances_no_volumes)
mock_task_api.return_value.create_cluster.assert_called_with(
self.dbcreate_mock.return_value.id)
self.assertEqual(3, mock_ins_create.call_count)
@patch.object(DBCluster, 'update')
@patch.object(redis_api, 'CONF')
@patch.object(inst_models.Instance, 'create')
@patch.object(task_api, 'load')
@patch.object(QUOTAS, 'check_quotas')
@patch.object(remote, 'create_nova_client')
def test_grow(self, mock_client, mock_check_quotas, mock_task_api,
mock_ins_create, mock_conf, mock_update):
mock_conf.get = Mock(
return_value=FakeOptGroup(volume_support=True))
mock_client.return_value.flavors = Mock()
self.cluster.grow(self.instances_w_volumes)
mock_task_api.return_value.grow_cluster.assert_called_with(
self.dbcreate_mock.return_value.id,
[mock_ins_create.return_value.id] * 3)
self.assertEqual(3, mock_ins_create.call_count)
@patch.object(DBInstance, 'find_all')
@patch.object(Cluster, 'get_guest')
@patch.object(DBCluster, 'update')
@patch.object(inst_models.Instance, 'load')
@patch.object(inst_models.Instance, 'delete')
def test_shrink(self,
mock_ins_delete, mock_ins_load, mock_update,
mock_guest, mock_find_all):
mock_find_all.return_value.all.return_value = [
DBInstance(InstanceTasks.NONE, id="1", name="member1",
compute_instance_id="compute-1",
task_id=InstanceTasks.NONE._code,
task_description=InstanceTasks.NONE._db_text,
volume_id="volume-1",
datastore_version_id="1",
cluster_id=self.cluster_id,
type="member")]
self.cluster.shrink(['id1'])
self.assertEqual(1, mock_ins_delete.call_count)
def test_delete_bad_task_status(self):
self.cluster.db_info.task_status = ClusterTasks.BUILDING_INITIAL
self.assertRaises(exception.UnprocessableEntity,
self.cluster.delete)
@patch.object(task_api.API, 'delete_cluster')
@patch.object(Cluster, 'update_db')
@patch.object(inst_models.DBInstance, 'find_all')
def test_delete_task_status_none(self,
mock_find_all,
mock_update_db,
mock_delete_cluster):
self.cluster.db_info.task_status = ClusterTasks.NONE
self.cluster.delete()
mock_update_db.assert_called_with(task_status=ClusterTasks.DELETING)
@patch.object(task_api.API, 'delete_cluster')
@patch.object(Cluster, 'update_db')
@patch.object(inst_models.DBInstance, 'find_all')
def test_delete_task_status_deleting(self,
mock_find_all,
mock_update_db,
mock_delete_cluster):
self.cluster.db_info.task_status = ClusterTasks.DELETING
self.cluster.delete()
mock_update_db.assert_called_with(task_status=ClusterTasks.DELETING)

View File

@ -98,9 +98,10 @@ class RedisGuestAgentManagerTest(testtools.TestCase):
mock_status = MagicMock()
self.manager._app.status = mock_status
self.manager._build_admin_client = MagicMock(return_value=MagicMock())
redis_service.RedisApp.start_redis = MagicMock(return_value=None)
redis_service.RedisApp.install_if_needed = MagicMock(return_value=None)
operating_system.chown = MagicMock(return_value=None)
redis_service.RedisApp.stop_db = MagicMock(return_value=None)
redis_service.RedisApp.start_redis = MagicMock(return_value=None)
redis_service.RedisApp.restart = MagicMock(return_value=None)
mock_status.begin_install = MagicMock(return_value=None)
VolumeDevice.format = MagicMock(return_value=None)