Implement clustering for Vertica datastore

A specification for this change was submitted for review in
https://review.openstack.org/#/c/151279

- HP Vertica Community Edition supports upto a 3-node cluster.
- HP Vertica requires a minimum of 3 nodes to achieve fault tolerance.
- This patchset provides ability to launch HP Vertica 3-node cluster.
- The cluster-show API, would also list the IPs of underlying instances.

Code Added:
- Added API strategy, taskmanager strategy, and guestagent strategy.
- Included unit tests.

Workflow for building Vertica cluster is as follows:
- Guest instances are booted using new API strategy which then
  sends control to taskmanager strategy for further communication
  and guestagent API execution.
- Once the guest instances are active in nova,
  they receive "prepare" message and following steps are performed:
    - Mount the data disk on device_path.
    - Check if vertica packages have been installed, install_if_needed().
    - Run Vertica pre-install test, prepare_for_install_vertica().
    - Get to a status BUILD_PENDING.
- Cluster-Taskmanager strategy waits for all the instances
  in cluster to get to BUILD_PENDING state.
- Once all instances in a cluster get to BUILD_PENDING state,
  taskmanager first, configures passwordless ssh for os-users(root, dbadmin)
  with the help of guestagent APIs get_keys and authroize_keys.
- Once passwordless ssh has been configured, the taskmanager calls
  install_cluster guestagent API, which installs cluster on
  member instances and creates a database on the cluster.
- Once this method finishes its job then taskmanager calls
  another guestagent API cluster_complete to
  notify cluster member of completion of cluster creation.

New Files:
- A new directory, vertica, has been created, for api, taskmanager,
  guestagent strategies under
  trove/common/strategies/cluster/experimental.
- Unit-tests for cluster-controller, api and taskmanager code.

DocImpact
Change-Id: Ide30d1d2a136c7e638532a115db5ff5ab2a75e72
Implements: blueprint implement-vertica-cluster
This commit is contained in:
Sushil Kumar 2015-03-18 13:59:08 +00:00
parent fbbb80ede3
commit c61a3bf5f9
16 changed files with 1328 additions and 5 deletions

View File

@ -125,3 +125,9 @@ log_file = logfile.txt
# replication_namespace = trove.guestagent.strategies.replication.mysql_binlog
# replication_user = slave_user
# replication_password = slave_password
[vertica]
# For vertica, following are the defaults needed:
# mount_point = /var/lib/vertica
# readahead_size = 2048
# guestagent_strategy = trove.common.strategies.cluster.experimental.vertica.guestagent.VerticaGuestAgentStrategy

View File

@ -243,3 +243,11 @@ device_path = /dev/vdb
[mongodb]
volume_support = True
device_path = /dev/vdb
[vertica]
tcp_ports = 5433, 5434, 22, 5444, 5450, 4803
udp_ports = 5433, 4803, 4804, 6453
volume_support = True
device_path = /dev/vdb
mount_point = /var/lib/vertica
taskmanager_strategy = trove.common.strategies.cluster.experimental.vertica.taskmanager.VerticaTaskManagerStrategy

View File

@ -241,3 +241,12 @@ volume_support = True
device_path = /dev/vdb
num_config_servers_per_cluster = 1
num_query_routers_per_cluster = 1
[vertica]
tcp_ports = 5433, 5434, 22, 5444, 5450, 4803
udp_ports = 5433, 4803, 4804, 6453
volume_support = True
device_path = /dev/vdb
cluster_support = True
cluster_member_count = 3
api_strategy = trove.common.strategies.cluster.experimental.vertica.api.VerticaAPIStrategy

View File

@ -822,11 +822,13 @@ vertica_group = cfg.OptGroup(
'vertica', title='Vertica options',
help="Oslo option group designed for Vertica datastore")
vertica_opts = [
cfg.ListOpt('tcp_ports', default=["5433"],
cfg.ListOpt('tcp_ports',
default=["5433", "5434", "22", "5444", "5450", "4803"],
help='List of TCP ports and/or port ranges to open '
'in the security group (only applicable '
'if trove_security_groups_support is True).'),
cfg.ListOpt('udp_ports', default=["5433"],
cfg.ListOpt('udp_ports',
default=["5433", "4803", "4804", "6453"],
help='List of UDP ports and/or port ranges to open '
'in the security group (only applicable '
'if trove_security_groups_support is True).'),
@ -851,6 +853,24 @@ vertica_opts = [
help='Namespace to load restore strategies from.'),
cfg.IntOpt('readahead_size', default=2048,
help='Size(MB) to be set as readahead_size for data volume'),
cfg.BoolOpt('cluster_support', default=True,
help='Enable clusters to be created and managed.'),
cfg.IntOpt('cluster_member_count', default=3,
help='Number of members in Vertica cluster.'),
cfg.StrOpt('api_strategy',
default='trove.common.strategies.cluster.experimental.vertica.'
'api.VerticaAPIStrategy',
help='Class that implements datastore-specific API logic.'),
cfg.StrOpt('taskmanager_strategy',
default='trove.common.strategies.cluster.experimental.vertica.'
'taskmanager.VerticaTaskManagerStrategy',
help='Class that implements datastore-specific task manager '
'logic.'),
cfg.StrOpt('guestagent_strategy',
default='trove.common.strategies.cluster.experimental.vertica.'
'guestagent.VerticaGuestAgentStrategy',
help='Class that implements datastore-specific Guest Agent API '
'logic.'),
]
# RPC version groups

View File

@ -0,0 +1,189 @@
#Copyright [2015] Hewlett-Packard Development Company, L.P.
#Licensed under the Apache License, Version 2.0 (the "License");
#you may not use this file except in compliance with the License.
#You may obtain a copy of the License at
#
#http://www.apache.org/licenses/LICENSE-2.0
#
#Unless required by applicable law or agreed to in writing, software
#distributed under the License is distributed on an "AS IS" BASIS,
#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#See the License for the specific language governing permissions and
#limitations under the License.
from novaclient import exceptions as nova_exceptions
from trove.cluster import models
from trove.cluster.tasks import ClusterTasks
from trove.cluster.views import ClusterView
from trove.common import cfg
from trove.common import exception
from trove.common import remote
from trove.common.strategies.cluster import base
from trove.common.views import create_links
from trove.extensions.mgmt.clusters.views import MgmtClusterView
from trove.instance import models as inst_models
from trove.openstack.common import log as logging
from trove.quota.quota import check_quotas
from trove.taskmanager import api as task_api
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
class VerticaAPIStrategy(base.BaseAPIStrategy):
@property
def cluster_class(self):
return VerticaCluster
@property
def cluster_controller_actions(self):
return {}
@property
def cluster_view_class(self):
return VerticaClusterView
@property
def mgmt_cluster_view_class(self):
return VerticaMgmtClusterView
class VerticaCluster(models.Cluster):
@classmethod
def create(cls, context, name, datastore, datastore_version, instances):
LOG.debug("Initiating cluster creation.")
vertica_conf = CONF.get(datastore_version.manager)
num_instances = len(instances)
# Matching number of instances with configured cluster_member_count
if num_instances != vertica_conf.cluster_member_count:
raise exception.ClusterNumInstancesNotSupported(
num_instances=vertica_conf.cluster_member_count)
# Checking flavors
flavor_ids = [instance['flavor_id'] for instance in instances]
if len(set(flavor_ids)) != 1:
raise exception.ClusterFlavorsNotEqual()
flavor_id = flavor_ids[0]
nova_client = remote.create_nova_client(context)
try:
flavor = nova_client.flavors.get(flavor_id)
except nova_exceptions.NotFound:
raise exception.FlavorNotFound(uuid=flavor_id)
deltas = {'instances': num_instances}
# Checking volumes
volume_sizes = [instance['volume_size'] for instance in instances
if instance.get('volume_size', None)]
volume_size = None
if vertica_conf.volume_support:
if len(volume_sizes) != num_instances:
raise exception.ClusterVolumeSizeRequired()
if len(set(volume_sizes)) != 1:
raise exception.ClusterVolumeSizesNotEqual()
volume_size = volume_sizes[0]
models.validate_volume_size(volume_size)
deltas['volumes'] = volume_size * num_instances
else:
if len(volume_sizes) > 0:
raise exception.VolumeNotSupported()
ephemeral_support = vertica_conf.device_path
if ephemeral_support and flavor.ephemeral == 0:
raise exception.LocalStorageNotSpecified(flavor=flavor_id)
check_quotas(context.tenant, deltas)
# Updating Cluster Task
db_info = models.DBCluster.create(
name=name, tenant_id=context.tenant,
datastore_version_id=datastore_version.id,
task_status=ClusterTasks.BUILDING_INITIAL)
member_config = {"id": db_info.id,
"instance_type": "member"}
# Creating member instances
for i in range(1, num_instances + 1):
instance_name = "%s-member-%s" % (name, str(i))
inst_models.Instance.create(context, instance_name,
flavor_id,
datastore_version.image_id,
[], [], datastore,
datastore_version,
volume_size, None,
availability_zone=None,
nics=None,
configuration_id=None,
cluster_config=member_config)
# Calling taskmanager to further proceed for cluster-configuration
task_api.load(context, datastore_version.manager).create_cluster(
db_info.id)
return VerticaCluster(context, db_info, datastore, datastore_version)
class VerticaClusterView(ClusterView):
def build_instances(self):
instances = []
ip_list = []
if self.load_servers:
cluster_instances = self.cluster.instances
else:
cluster_instances = self.cluster.instances_without_server
for instance in cluster_instances:
if instance.type != 'member':
continue
instance_dict = {
"id": instance.id,
"name": instance.name,
"links": create_links("instances", self.req, instance.id)
}
if self.load_servers:
instance_dict["status"] = instance.status
if CONF.get(instance.datastore_version.manager).volume_support:
instance_dict["volume"] = {"size": instance.volume_size}
instance_dict["flavor"] = self._build_flavor_info(
instance.flavor_id)
instance_ips = instance.get_visible_ip_addresses()
if instance_ips:
instance_dict["ip"] = instance_ips
ip_list.append(instance_ips[0])
instances.append(instance_dict)
ip_list.sort()
return instances, ip_list
class VerticaMgmtClusterView(MgmtClusterView):
def build_instances(self):
instances = []
ip_list = []
if self.load_servers:
cluster_instances = self.cluster.instances
else:
cluster_instances = self.cluster.instances_without_server
for instance in cluster_instances:
instance_dict = {
"id": instance.id,
"name": instance.name,
"type": instance.type,
"links": create_links("instances", self.req, instance.id)
}
instance_ips = instance.get_visible_ip_addresses()
if self.load_servers and instance_ips:
instance_dict["ip"] = instance_ips
if self.load_servers:
instance_dict["status"] = instance.status
if CONF.get(instance.datastore_version.manager).volume_support:
instance_dict["volume"] = {"size": instance.volume_size}
instance_dict["flavor"] = self._build_flavor_info(
instance.flavor_id)
instances.append(instance_dict)
ip_list.sort()
return instances, ip_list

View File

@ -0,0 +1,52 @@
#Copyright [2015] Hewlett-Packard Development Company, L.P.
#Licensed under the Apache License, Version 2.0 (the "License");
#you may not use this file except in compliance with the License.
#You may obtain a copy of the License at
#
#http://www.apache.org/licenses/LICENSE-2.0
#
#Unless required by applicable law or agreed to in writing, software
#distributed under the License is distributed on an "AS IS" BASIS,
#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#See the License for the specific language governing permissions and
#limitations under the License.
from trove.common import cfg
from trove.common.strategies.cluster import base
from trove.guestagent import api as guest_api
from trove.openstack.common import log as logging
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
class VerticaGuestAgentStrategy(base.BaseGuestAgentStrategy):
@property
def guest_client_class(self):
return VerticaGuestAgentAPI
class VerticaGuestAgentAPI(guest_api.API):
def get_public_keys(self, user):
LOG.debug("Getting public keys for user: %s." % user)
return self._call("get_public_keys", guest_api.AGENT_HIGH_TIMEOUT,
self.version_cap, user=user)
def authorize_public_keys(self, user, public_keys):
LOG.debug("Authorizing public keys for user: %s." % user)
return self._call("authorize_public_keys",
guest_api.AGENT_HIGH_TIMEOUT, self.version_cap,
user=user, public_keys=public_keys)
def install_cluster(self, members):
LOG.debug("Installing Vertica cluster on members: %s." % members)
return self._call("install_cluster", CONF.cluster_usage_timeout,
self.version_cap, members=members)
def cluster_complete(self):
LOG.debug("Notifying cluster install completion.")
return self._call("cluster_complete", guest_api.AGENT_HIGH_TIMEOUT,
self.version_cap)

View File

@ -0,0 +1,191 @@
#Copyright [2015] Hewlett-Packard Development Company, L.P.
#Licensed under the Apache License, Version 2.0 (the "License");
#you may not use this file except in compliance with the License.
#You may obtain a copy of the License at
#
#http://www.apache.org/licenses/LICENSE-2.0
#
#Unless required by applicable law or agreed to in writing, software
#distributed under the License is distributed on an "AS IS" BASIS,
#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#See the License for the specific language governing permissions and
#limitations under the License.
from eventlet.timeout import Timeout
from trove.common import cfg
from trove.common.exception import PollTimeOut
from trove.common.instance import ServiceStatuses
from trove.common.remote import create_guest_client
from trove.common.strategies.cluster import base
from trove.common import utils
from trove.instance.models import DBInstance
from trove.instance.models import Instance
from trove.instance.models import InstanceServiceStatus
from trove.instance.tasks import InstanceTasks
from trove.common.i18n import _
from trove.openstack.common import log as logging
from trove.taskmanager import api as task_api
import trove.taskmanager.models as task_models
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
USAGE_SLEEP_TIME = CONF.usage_sleep_time # seconds.
class VerticaTaskManagerStrategy(base.BaseTaskManagerStrategy):
@property
def task_manager_api_class(self):
return VerticaTaskManagerAPI
@property
def task_manager_cluster_tasks_class(self):
return VerticaClusterTasks
class VerticaClusterTasks(task_models.ClusterTasks):
def update_statuses_on_failure(self, cluster_id):
if CONF.update_status_on_fail:
db_instances = DBInstance.find_all(
cluster_id=cluster_id, deleted=False).all()
for db_instance in db_instances:
db_instance.set_task_status(
InstanceTasks.BUILDING_ERROR_SERVER)
db_instance.save()
@classmethod
def get_ip(cls, instance):
return instance.get_visible_ip_addresses()[0]
@classmethod
def get_guest(cls, instance):
return create_guest_client(instance.context, instance.db_info.id,
instance.datastore_version.manager)
def _all_instances_ready(self, instance_ids, cluster_id):
def _all_status_ready(ids):
LOG.debug("Checking service status of instance ids: %s." % ids)
for instance_id in ids:
status = InstanceServiceStatus.find_by(
instance_id=instance_id).get_status()
if (status == ServiceStatuses.FAILED or
status == ServiceStatuses.FAILED_TIMEOUT_GUESTAGENT):
# if one has failed, no need to continue polling
LOG.debug("Instance %s in %s, exiting polling." % (
instance_id, status))
return True
if (status != ServiceStatuses.RUNNING and
status != ServiceStatuses.BUILD_PENDING):
# if one is not in a ready state, continue polling
LOG.debug("Instance %s in %s, continue polling." % (
instance_id, status))
return False
LOG.debug("Instances are ready, exiting polling for: %s." % ids)
return True
def _instance_ids_with_failures(ids):
LOG.debug("Checking for service status failures for "
"instance ids: %s." % ids)
failed_instance_ids = []
for instance_id in ids:
status = InstanceServiceStatus.find_by(
instance_id=instance_id).get_status()
if (status == ServiceStatuses.FAILED or
status == ServiceStatuses.FAILED_TIMEOUT_GUESTAGENT):
failed_instance_ids.append(instance_id)
return failed_instance_ids
LOG.debug("Polling until service status is ready for "
"instance ids: %s." % instance_ids)
try:
utils.poll_until(lambda: instance_ids,
lambda ids: _all_status_ready(ids),
sleep_time=USAGE_SLEEP_TIME,
time_out=CONF.usage_timeout)
except PollTimeOut:
LOG.exception(_("Timeout for all instance service statuses "
"to become ready."))
self.update_statuses_on_failure(cluster_id)
return False
failed_ids = _instance_ids_with_failures(instance_ids)
if failed_ids:
LOG.error(_("Some instances failed to become ready: %s.") %
failed_ids)
self.update_statuses_on_failure(cluster_id)
return False
return True
def create_cluster(self, context, cluster_id):
LOG.debug("Begin create_cluster for id: %s." % cluster_id)
def _create_cluster():
# Fetch instances by cluster_id against instances table.
db_instances = DBInstance.find_all(cluster_id=cluster_id).all()
instance_ids = [db_instance.id for db_instance in db_instances]
# Wait for cluster members to get to cluster-ready status.
if not self._all_instances_ready(instance_ids, cluster_id):
return
LOG.debug("All members ready, proceeding for cluster setup.")
instances = [Instance.load(context, instance_id) for instance_id
in instance_ids]
member_ips = [self.get_ip(instance) for instance in instances]
guests = [self.get_guest(instance) for instance in instances]
# Users to be configured for password-less SSH.
authorized_users_without_password = ['root', 'dbadmin']
# Configuring password-less SSH for cluster members.
# Strategy for setting up SSH:
# get public keys for user from member-instances in cluster,
# combine them, finally push it back to all instances,
# and member instances add them to authorized keys.
LOG.debug("Configuring password-less SSH on cluster members.")
try:
for user in authorized_users_without_password:
pub_key = [guest.get_public_keys(user) for guest in guests]
for guest in guests:
guest.authorize_public_keys(user, pub_key)
LOG.debug("Installing cluster with members: %s." % member_ips)
guests[0].install_cluster(member_ips)
LOG.debug("Finalizing cluster configuration.")
for guest in guests:
guest.cluster_complete()
except Exception:
LOG.exception(_("Error creating cluster."))
self.update_statuses_on_failure(cluster_id)
timeout = Timeout(CONF.cluster_usage_timeout)
try:
_create_cluster()
self.reset_task()
except Timeout as t:
if t is not timeout:
raise # not my timeout
LOG.exception(_("Timeout for building cluster."))
self.update_statuses_on_failure(cluster_id)
finally:
timeout.cancel()
LOG.debug("End create_cluster for id: %s." % cluster_id)
class VerticaTaskManagerAPI(task_api.API):
def _cast(self, method_name, version, **kwargs):
LOG.debug("Casting %s" % method_name)
cctxt = self.client.prepare(version=version)
cctxt.cast(self.context, method_name, **kwargs)

View File

@ -62,9 +62,16 @@ class Manager(periodic_task.PeriodicTasks):
LOG.debug("Mounted the volume.")
self.app.install_if_needed(packages)
self.app.prepare_for_install_vertica()
self.app.install_vertica()
self.app.create_db()
self.app.complete_install_or_restart()
if cluster_config is None:
self.app.install_vertica()
self.app.create_db()
self.app.complete_install_or_restart()
elif cluster_config['instance_type'] == "member":
self.appStatus.set_status(rd_ins.ServiceStatuses.BUILD_PENDING)
else:
LOG.error(_("Bad cluster configuration; instance type "
"given as %s.") % cluster_config['instance_type'])
raise
LOG.info(_('Completed setup of Vertica database instance.'))
except Exception:
LOG.exception(_('Cannot prepare Vertica database instance.'))
@ -191,3 +198,26 @@ class Manager(periodic_task.PeriodicTasks):
LOG.debug("Starting with configuration changes.")
raise exception.DatastoreOperationNotSupported(
operation='start_db_with_conf_changes', datastore=MANAGER)
def get_public_keys(self, context, user):
LOG.debug("Retrieving public keys for %s." % user)
return self.app.get_public_keys(user)
def authorize_public_keys(self, context, user, public_keys):
LOG.debug("Authorizing public keys for %s." % user)
return self.app.authorize_public_keys(user, public_keys)
def install_cluster(self, context, members):
try:
LOG.debug("Installing cluster on members: %s." % members)
self.app.install_cluster(members)
LOG.debug("install_cluster call has finished.")
except Exception:
LOG.exception(_('Cluster installation failed.'))
self.appStatus.set_status(rd_ins.ServiceStatuses.FAILED)
raise
def cluster_complete(self, context):
LOG.debug("Cluster creation complete, starting status checks.")
status = self.appStatus._get_actual_db_status()
self.appStatus.set_status(status)

View File

@ -219,3 +219,72 @@ class VerticaApp(object):
except exception.ProcessExecutionError:
LOG.exception(_("Failed to prepare for install_vertica."))
raise
def get_public_keys(self, user):
"""Generates key (if not found), and sends public key for user."""
LOG.debug("Public keys requested for user: %s." % user)
user_home_directory = os.path.expanduser('~' + user)
public_key_file_name = user_home_directory + '/.ssh/id_rsa.pub'
try:
key_generate_command = (system.SSH_KEY_GEN % user_home_directory)
system.shell_execute(key_generate_command, user)
except exception.ProcessExecutionError:
LOG.debug("Cannot generate key.")
try:
read_key_cmd = ("cat %(file)s" % {'file': public_key_file_name})
out, err = system.shell_execute(read_key_cmd)
except exception.ProcessExecutionError:
LOG.exception(_("Cannot read public key."))
raise
return out.strip()
def authorize_public_keys(self, user, public_keys):
"""Adds public key to authorized_keys for user."""
LOG.debug("public keys to be added for user: %s." % (user))
user_home_directory = os.path.expanduser('~' + user)
authorized_file_name = user_home_directory + '/.ssh/authorized_keys'
try:
read_key_cmd = ("cat %(file)s" % {'file': authorized_file_name})
out, err = system.shell_execute(read_key_cmd)
public_keys.append(out.strip())
except exception.ProcessExecutionError:
LOG.debug("Cannot read authorized_keys.")
all_keys = '\n'.join(public_keys) + "\n"
try:
with tempfile.NamedTemporaryFile(delete=False) as tempkeyfile:
tempkeyfile.write(all_keys)
copy_key_cmd = (("install -o %(user)s -m 600 %(source)s %(target)s"
) % {'user': user, 'source': tempkeyfile.name,
'target': authorized_file_name})
system.shell_execute(copy_key_cmd)
os.remove(tempkeyfile.name)
except exception.ProcessExecutionError:
LOG.exception(_("Cannot install public keys."))
os.remove(tempkeyfile.name)
raise
def _export_conf_to_members(self, members):
"""This method exports conf files to other members."""
try:
for member in members:
COPY_CMD = (system.SEND_CONF_TO_SERVER % (system.VERTICA_CONF,
member,
system.VERTICA_CONF))
system.shell_execute(COPY_CMD)
except exception.ProcessExecutionError:
LOG.exception(_("Cannot export configuration."))
raise
def install_cluster(self, members):
"""Installs & configures cluster."""
cluster_members = ','.join(members)
LOG.debug("Installing cluster with members: %s." % cluster_members)
self.install_vertica(cluster_members)
self._export_conf_to_members(members)
LOG.debug("Creating database with members: %s." % cluster_members)
self.create_db(cluster_members)
LOG.debug("Cluster configured on members: %s." % cluster_members)

View File

@ -24,6 +24,11 @@ STATUS_ACTIVE_DB = "/opt/vertica/bin/adminTools -t show_active_db"
STATUS_DB_DOWN = "/opt/vertica/bin/adminTools -t db_status -s DOWN"
SET_RESTART_POLICY = ("/opt/vertica/bin/adminTools -t set_restart_policy "
"-d %s -p '%s'")
SEND_CONF_TO_SERVER = ("rsync -v -e 'ssh -o "
"UserKnownHostsFile=/dev/null -o "
"StrictHostKeyChecking=no' --perms --owner --group "
"%s %s:%s")
SSH_KEY_GEN = "ssh-keygen -f %s/.ssh/id_rsa -t rsa -N ''"
VERTICA_CONF = "/etc/vertica.cnf"
INSTALL_TIMEOUT = 1000

View File

@ -0,0 +1,345 @@
#Copyright [2015] Hewlett-Packard Development Company, L.P.
#Licensed under the Apache License, Version 2.0 (the "License");
#you may not use this file except in compliance with the License.
#You may obtain a copy of the License at
#
#http://www.apache.org/licenses/LICENSE-2.0
#
#Unless required by applicable law or agreed to in writing, software
#distributed under the License is distributed on an "AS IS" BASIS,
#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#See the License for the specific language governing permissions and
#limitations under the License.
import jsonschema
from mock import MagicMock
from mock import Mock
from mock import patch
from testtools import TestCase
from testtools.matchers import Is, Equals
from trove.cluster import models
from trove.cluster.models import Cluster
from trove.cluster.service import ClusterController
from trove.cluster import views
import trove.common.cfg as cfg
from trove.common import exception
from trove.common.strategies.cluster import strategy
from trove.common import utils
from trove.datastore import models as datastore_models
class TestClusterController(TestCase):
def setUp(self):
super(TestClusterController, self).setUp()
self.controller = ClusterController()
self.cluster = {
"cluster": {
"name": "products",
"datastore": {
"type": "vertica",
"version": "7.1"
},
"instances": [
{
"flavorRef": "7",
"volume": {
"size": 1
},
},
{
"flavorRef": "7",
"volume": {
"size": 1
},
},
{
"flavorRef": "7",
"volume": {
"size": 1
},
},
]
}
}
def test_get_schema_create(self):
schema = self.controller.get_schema('create', self.cluster)
self.assertIsNotNone(schema)
self.assertTrue('cluster' in schema['properties'])
self.assertTrue('cluster')
def test_validate_create(self):
body = self.cluster
schema = self.controller.get_schema('create', body)
validator = jsonschema.Draft4Validator(schema)
self.assertTrue(validator.is_valid(body))
def test_validate_create_blankname(self):
body = self.cluster
body['cluster']['name'] = " "
schema = self.controller.get_schema('create', body)
validator = jsonschema.Draft4Validator(schema)
self.assertFalse(validator.is_valid(body))
errors = sorted(validator.iter_errors(body), key=lambda e: e.path)
self.assertThat(len(errors), Is(1))
self.assertThat(errors[0].message,
Equals("' ' does not match '^.*[0-9a-zA-Z]+.*$'"))
def test_validate_create_blank_datastore(self):
body = self.cluster
body['cluster']['datastore']['type'] = ""
schema = self.controller.get_schema('create', body)
validator = jsonschema.Draft4Validator(schema)
self.assertFalse(validator.is_valid(body))
errors = sorted(validator.iter_errors(body), key=lambda e: e.path)
error_messages = [error.message for error in errors]
error_paths = [error.path.pop() for error in errors]
self.assertThat(len(errors), Is(2))
self.assertIn("'' is too short", error_messages)
self.assertIn("'' does not match '^.*[0-9a-zA-Z]+.*$'", error_messages)
self.assertIn("type", error_paths)
@patch.object(Cluster, 'create')
@patch.object(datastore_models, 'get_datastore_version')
def test_create_clusters_disabled(self,
mock_get_datastore_version,
mock_cluster_create):
body = self.cluster
tenant_id = Mock()
context = Mock()
req = Mock()
req.environ = MagicMock()
req.environ.get = Mock(return_value=context)
datastore_version = Mock()
datastore_version.manager = 'mysql'
mock_get_datastore_version.return_value = (Mock(), datastore_version)
self.assertRaises(exception.ClusterDatastoreNotSupported,
self.controller.create,
req,
body,
tenant_id)
@patch.object(Cluster, 'create')
@patch.object(utils, 'get_id_from_href')
@patch.object(datastore_models, 'get_datastore_version')
def test_create_clusters(self,
mock_get_datastore_version,
mock_id_from_href,
mock_cluster_create):
body = self.cluster
tenant_id = Mock()
context = Mock()
req = Mock()
req.environ = Mock()
req.environ.__getitem__ = Mock(return_value=context)
datastore_version = Mock()
datastore_version.manager = 'vertica'
datastore = Mock()
mock_get_datastore_version.return_value = (datastore,
datastore_version)
instances = [{'volume_size': 1, 'flavor_id': '1234'},
{'volume_size': 1, 'flavor_id': '1234'},
{'volume_size': 1, 'flavor_id': '1234'}]
mock_id_from_href.return_value = '1234'
mock_cluster = Mock()
mock_cluster.instances = []
mock_cluster.instances_without_server = []
mock_cluster.datastore_version.manager = 'vertica'
mock_cluster_create.return_value = mock_cluster
self.controller.create(req, body, tenant_id)
mock_cluster_create.assert_called_with(context, 'products',
datastore, datastore_version,
instances)
@patch.object(Cluster, 'load')
def test_show_cluster(self,
mock_cluster_load):
tenant_id = Mock()
id = Mock()
context = Mock()
req = Mock()
req.environ = Mock()
req.environ.__getitem__ = Mock(return_value=context)
mock_cluster = Mock()
mock_cluster.instances = []
mock_cluster.instances_without_server = []
mock_cluster.datastore_version.manager = 'vertica'
mock_cluster_load.return_value = mock_cluster
self.controller.show(req, tenant_id, id)
mock_cluster_load.assert_called_with(context, id)
@patch.object(Cluster, 'load')
@patch.object(Cluster, 'load_instance')
def test_show_cluster_instance(self,
mock_cluster_load_instance,
mock_cluster_load):
tenant_id = Mock()
cluster_id = Mock()
instance_id = Mock()
context = Mock()
req = Mock()
req.environ = Mock()
req.environ.__getitem__ = Mock(return_value=context)
cluster = Mock()
mock_cluster_load.return_value = cluster
cluster.id = cluster_id
self.controller.show_instance(req, tenant_id, cluster_id, instance_id)
mock_cluster_load_instance.assert_called_with(context, cluster.id,
instance_id)
@patch.object(Cluster, 'load')
def test_delete_cluster(self, mock_cluster_load):
tenant_id = Mock()
cluster_id = Mock()
req = MagicMock()
cluster = Mock()
mock_cluster_load.return_value = cluster
self.controller.delete(req, tenant_id, cluster_id)
cluster.delete.assert_called
class TestClusterControllerWithStrategy(TestCase):
def setUp(self):
super(TestClusterControllerWithStrategy, self).setUp()
self.controller = ClusterController()
self.cluster = {
"cluster": {
"name": "products",
"datastore": {
"type": "vertica",
"version": "7.1"
},
"instances": [
{
"flavorRef": "7",
"volume": {
"size": 1
},
},
{
"flavorRef": "7",
"volume": {
"size": 1
},
},
{
"flavorRef": "7",
"volume": {
"size": 1
},
},
]
}
}
def tearDown(self):
super(TestClusterControllerWithStrategy, self).tearDown()
cfg.CONF.clear_override('cluster_support', group='vertica')
cfg.CONF.clear_override('api_strategy', group='vertica')
@patch.object(datastore_models, 'get_datastore_version')
@patch.object(models.Cluster, 'create')
def test_create_clusters_disabled(self,
mock_cluster_create,
mock_get_datastore_version):
cfg.CONF.set_override('cluster_support', False, group='vertica')
body = self.cluster
tenant_id = Mock()
context = Mock()
req = Mock()
req.environ = MagicMock()
req.environ.get = Mock(return_value=context)
datastore_version = Mock()
datastore_version.manager = 'vertica'
mock_get_datastore_version.return_value = (Mock(), datastore_version)
self.assertRaises(exception.TroveError, self.controller.create, req,
body, tenant_id)
@patch.object(views.ClusterView, 'data', return_value={})
@patch.object(datastore_models, 'get_datastore_version')
@patch.object(models.Cluster, 'create')
def test_create_clusters_enabled(self,
mock_cluster_create,
mock_get_datastore_version,
mock_cluster_view_data):
cfg.CONF.set_override('cluster_support', True, group='vertica')
body = self.cluster
tenant_id = Mock()
context = Mock()
req = Mock()
req.environ = MagicMock()
req.environ.get = Mock(return_value=context)
datastore_version = Mock()
datastore_version.manager = 'vertica'
mock_get_datastore_version.return_value = (Mock(), datastore_version)
mock_cluster = Mock()
mock_cluster.datastore_version.manager = 'vertica'
mock_cluster_create.return_value = mock_cluster
self.controller.create(req, body, tenant_id)
@patch.object(models.Cluster, 'load')
def test_controller_action_no_strategy(self,
mock_cluster_load):
body = {'do_stuff2': {}}
tenant_id = Mock()
context = Mock()
id = Mock()
req = Mock()
req.environ = MagicMock()
req.environ.get = Mock(return_value=context)
cluster = Mock()
cluster.datastore_version.manager = 'vertica'
mock_cluster_load.return_value = cluster
self.assertRaises(exception.TroveError, self.controller.action, req,
body, tenant_id, id)
@patch.object(strategy, 'load_api_strategy')
@patch.object(models.Cluster, 'load')
def test_controller_action_found(self,
mock_cluster_load,
mock_cluster_api_strategy):
body = {'do_stuff': {}}
tenant_id = Mock()
context = Mock()
id = Mock()
req = Mock()
req.environ = MagicMock()
req.environ.get = Mock(return_value=context)
cluster = Mock()
cluster.datastore_version.manager = 'vertica'
mock_cluster_load.return_value = cluster
strat = Mock()
do_stuff_func = Mock()
strat.cluster_controller_actions = \
{'do_stuff': do_stuff_func}
mock_cluster_api_strategy.return_value = strat
self.controller.action(req, body, tenant_id, id)
self.assertEqual(1, do_stuff_func.call_count)

View File

@ -0,0 +1,194 @@
#Copyright [2015] Hewlett-Packard Development Company, L.P.
#Licensed under the Apache License, Version 2.0 (the "License");
#you may not use this file except in compliance with the License.
#You may obtain a copy of the License at
#
#http://www.apache.org/licenses/LICENSE-2.0
#
#Unless required by applicable law or agreed to in writing, software
#distributed under the License is distributed on an "AS IS" BASIS,
#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#See the License for the specific language governing permissions and
#limitations under the License.
import uuid
from mock import Mock
from mock import patch
from testtools import TestCase
from trove.cluster.models import Cluster
from trove.cluster.models import ClusterTasks
from trove.cluster.models import DBCluster
from trove.common import cfg
from trove.common import exception
from trove.common import remote
from trove.common.strategies.cluster.experimental.vertica import (
api as vertica_api)
from trove.instance import models as inst_models
from trove.quota.quota import QUOTAS
from trove.taskmanager import api as task_api
CONF = cfg.CONF
class ClusterTest(TestCase):
def setUp(self):
super(ClusterTest, self).setUp()
task_api.API.get_client = Mock()
self.cluster_id = str(uuid.uuid4())
self.cluster_name = "Cluster" + self.cluster_id
self.tenant_id = "23423432"
self.dv_id = "1"
self.db_info = DBCluster(ClusterTasks.NONE,
id=self.cluster_id,
name=self.cluster_name,
tenant_id=self.tenant_id,
datastore_version_id=self.dv_id,
task_id=ClusterTasks.NONE._code)
self.context = Mock()
self.datastore = Mock()
self.dv = Mock()
self.dv.manager = "vertica"
self.datastore_version = self.dv
self.cluster = vertica_api.VerticaCluster(self.context, self.db_info,
self.datastore,
self.datastore_version)
self.instances = [{'volume_size': 1, 'flavor_id': '1234'},
{'volume_size': 1, 'flavor_id': '1234'},
{'volume_size': 1, 'flavor_id': '1234'}]
self.volume_support = CONF.get(self.dv.manager).volume_support
self.remote_nova = remote.create_nova_client
def tearDown(self):
super(ClusterTest, self).tearDown()
CONF.get(self.dv.manager).volume_support = self.volume_support
remote.create_nova_client = self.remote_nova
def test_create_empty_instances(self):
self.assertRaises(exception.ClusterNumInstancesNotSupported,
Cluster.create,
Mock(),
self.cluster_name,
self.datastore,
self.datastore_version,
[]
)
def test_create_flavor_not_specified(self):
instances = self.instances
instances[0]['flavor_id'] = None
self.assertRaises(exception.ClusterFlavorsNotEqual,
Cluster.create,
Mock(),
self.cluster_name,
self.datastore,
self.datastore_version,
instances
)
@patch.object(remote, 'create_nova_client')
def test_create_volume_no_specified(self,
mock_client):
instances = self.instances
instances[0]['volume_size'] = None
flavors = Mock()
mock_client.return_value.flavors = flavors
self.assertRaises(exception.ClusterVolumeSizeRequired,
Cluster.create,
Mock(),
self.cluster_name,
self.datastore,
self.datastore_version,
instances
)
@patch.object(remote, 'create_nova_client')
def test_create_storage_specified_with_no_volume_support(self,
mock_client):
CONF.get(self.dv.manager).volume_support = False
instances = self.instances
instances[0]['volume_size'] = None
flavors = Mock()
mock_client.return_value.flavors = flavors
self.assertRaises(exception.VolumeNotSupported,
Cluster.create,
Mock(),
self.cluster_name,
self.datastore,
self.datastore_version,
instances
)
@patch.object(remote, 'create_nova_client')
def test_create_storage_not_specified_and_no_ephemeral_flavor(self,
mock_client):
class FakeFlavor:
def __init__(self, flavor_id):
self.flavor_id = flavor_id
@property
def id(self):
return self.flavor.id
@property
def ephemeral(self):
return 0
instances = [{'flavor_id': '1234'},
{'flavor_id': '1234'},
{'flavor_id': '1234'}]
CONF.get(self.dv.manager).volume_support = False
(mock_client.return_value.
flavors.get.return_value) = FakeFlavor('1234')
self.assertRaises(exception.LocalStorageNotSpecified,
Cluster.create,
Mock(),
self.cluster_name,
self.datastore,
self.datastore_version,
instances
)
@patch.object(inst_models.Instance, 'create')
@patch.object(DBCluster, 'create')
@patch.object(task_api, 'load')
@patch.object(QUOTAS, 'check_quotas')
@patch.object(remote, 'create_nova_client')
def test_create(self, mock_client, mock_check_quotas, mock_task_api,
mock_db_create, mock_ins_create):
instances = self.instances
flavors = Mock()
mock_client.return_value.flavors = flavors
self.cluster.create(Mock(),
self.cluster_name,
self.datastore,
self.datastore_version,
instances)
mock_task_api.create_cluster.assert_called
self.assertEqual(3, mock_ins_create.call_count)
def test_delete_bad_task_status(self):
self.cluster.db_info.task_status = ClusterTasks.BUILDING_INITIAL
self.assertRaises(exception.UnprocessableEntity,
self.cluster.delete)
@patch.object(task_api.API, 'delete_cluster')
@patch.object(Cluster, 'update_db')
@patch.object(inst_models.DBInstance, 'find_all')
def test_delete_task_status_none(self,
mock_find_all,
mock_update_db,
mock_delete_cluster):
self.cluster.db_info.task_status = ClusterTasks.NONE
self.cluster.delete()
mock_update_db.assert_called_with(task_status=ClusterTasks.DELETING)
@patch.object(task_api.API, 'delete_cluster')
@patch.object(Cluster, 'update_db')
@patch.object(inst_models.DBInstance, 'find_all')
def test_delete_task_status_deleting(self,
mock_find_all,
mock_update_db,
mock_delete_cluster):
self.cluster.db_info.task_status = ClusterTasks.DELETING
self.cluster.delete()
mock_update_db.assert_called_with(task_status=ClusterTasks.DELETING)

View File

@ -2277,3 +2277,36 @@ class VerticaAppTest(testtools.TestCase):
mock_status.end_install_or_restart = MagicMock(
return_value=None)
self.assertRaises(RuntimeError, app.stop_db)
def test_export_conf_to_members(self):
self.app._export_conf_to_members(members=['member1', 'member2'])
self.assertEqual(vertica_system.shell_execute.call_count, 2)
def test_authorize_public_keys(self):
user = 'test_user'
keys = ['test_key@machine1', 'test_key@machine2']
with patch.object(os.path, 'expanduser',
return_value=('/home/' + user)):
self.app.authorize_public_keys(user=user, public_keys=keys)
self.assertEqual(vertica_system.shell_execute.call_count, 2)
vertica_system.shell_execute.assert_any_call(
'cat ' + '/home/' + user + '/.ssh/authorized_keys')
def test_get_public_keys(self):
user = 'test_user'
with patch.object(os.path, 'expanduser',
return_value=('/home/' + user)):
self.app.get_public_keys(user=user)
self.assertEqual(vertica_system.shell_execute.call_count, 2)
vertica_system.shell_execute.assert_any_call(
(vertica_system.SSH_KEY_GEN % ('/home/' + user)), user)
vertica_system.shell_execute.assert_any_call(
'cat ' + '/home/' + user + '/.ssh/id_rsa.pub')
def test_install_cluster(self):
with patch.object(self.app, 'read_config',
return_value=self.test_config):
self.app.install_cluster(members=['member1', 'member2'])
# Verifying nu,ber of shell calls,
# as command has already been tested in preceeding tests
self.assertEqual(vertica_system.shell_execute.call_count, 5)

View File

@ -13,10 +13,14 @@
import testtools
from mock import MagicMock
from mock import patch
from trove.common import instance as rd_instance
from trove.common.context import TroveContext
from trove.guestagent import volume
from trove.guestagent.datastore.experimental.vertica.manager import Manager
from trove.guestagent.datastore.experimental.vertica.service import VerticaApp
from trove.guestagent.datastore.experimental.vertica.service import (
VerticaAppStatus)
from trove.guestagent.volume import VolumeDevice
@ -144,3 +148,48 @@ class GuestAgentManagerTest(testtools.TestCase):
self.manager.stop_db(self.context)
#verification/assertion
VerticaApp.stop_db.assert_any_call(do_not_start_on_reboot=False)
@patch.object(VerticaApp, 'install_vertica')
@patch.object(VerticaApp, '_export_conf_to_members')
@patch.object(VerticaApp, 'create_db')
def test_install_cluster(self, mock_install, mock_export, mock_create_db):
members = ['test1', 'test2']
self.manager.install_cluster(self.context, members)
mock_install.assert_called_with('test1,test2')
mock_export.assert_called_with(members)
mock_create_db.assert_called_with('test1,test2')
@patch.object(VerticaAppStatus, 'set_status')
@patch.object(VerticaApp, 'install_cluster',
side_effect=RuntimeError("Boom!"))
def test_install_cluster_failure(self, mock_install, mock_set_status):
members = ["test1", "test2"]
self.assertRaises(RuntimeError, self.manager.install_cluster,
self.context, members)
mock_set_status.assert_called_with(rd_instance.ServiceStatuses.FAILED)
@patch.object(volume.VolumeDevice, 'mount_points', return_value=[])
@patch.object(volume.VolumeDevice, 'unmount_device', return_value=None)
@patch.object(volume.VolumeDevice, 'mount', return_value=None)
@patch.object(volume.VolumeDevice, 'migrate_data', return_value=None)
@patch.object(volume.VolumeDevice, 'format', return_value=None)
@patch.object(VerticaApp, 'prepare_for_install_vertica')
@patch.object(VerticaApp, 'install_if_needed')
@patch.object(VerticaAppStatus, 'begin_install')
def _prepare_method(self, instance_id, instance_type, *args):
cluster_config = {"id": instance_id,
"instance_type": instance_type}
# invocation
self.manager.prepare(context=self.context, databases=None,
packages=['vertica'],
memory_mb='2048', users=None,
mount_point='/var/lib/vertica',
overrides=None,
cluster_config=cluster_config)
@patch.object(VerticaAppStatus, 'set_status')
def test_prepare_member(self, mock_set_status):
self._prepare_method("test-instance-3", "member")
mock_set_status.assert_called_with(
rd_instance.ServiceStatuses.BUILD_PENDING)

View File

@ -0,0 +1,123 @@
#Copyright [2015] Hewlett-Packard Development Company, L.P.
#Licensed under the Apache License, Version 2.0 (the "License");
#you may not use this file except in compliance with the License.
#You may obtain a copy of the License at
#
#http://www.apache.org/licenses/LICENSE-2.0
#
#Unless required by applicable law or agreed to in writing, software
#distributed under the License is distributed on an "AS IS" BASIS,
#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#See the License for the specific language governing permissions and
#limitations under the License.
import datetime
import testtools
from mock import Mock
from mock import patch
from trove.cluster.models import ClusterTasks as ClusterTaskStatus
from trove.cluster.models import DBCluster
from trove.common.strategies.cluster.experimental.vertica.taskmanager import \
VerticaClusterTasks as ClusterTasks
from trove.datastore import models as datastore_models
from trove.instance.models import BaseInstance
from trove.instance.models import DBInstance
from trove.instance.models import Instance
from trove.instance.models import InstanceServiceStatus
from trove.instance.models import InstanceTasks
from trove.taskmanager.models import ServiceStatuses
class VerticaClusterTasksTest(testtools.TestCase):
def setUp(self):
super(VerticaClusterTasksTest, self).setUp()
self.cluster_id = "1232"
self.cluster_name = "Cluster-1234"
self.tenant_id = "6789"
self.db_cluster = DBCluster(ClusterTaskStatus.NONE,
id=self.cluster_id,
created=str(datetime.date),
updated=str(datetime.date),
name=self.cluster_name,
task_id=ClusterTaskStatus.NONE._code,
tenant_id=self.tenant_id,
datastore_version_id="1",
deleted=False)
self.dbinst1 = DBInstance(InstanceTasks.NONE, id="1", name="member1",
compute_instance_id="compute-1",
task_id=InstanceTasks.NONE._code,
task_description=
InstanceTasks.NONE._db_text,
volume_id="volume-1",
datastore_version_id="1",
cluster_id=self.cluster_id,
type="member")
self.dbinst2 = DBInstance(InstanceTasks.NONE, id="2", name="member2",
compute_instance_id="compute-2",
task_id=InstanceTasks.NONE._code,
task_description=
InstanceTasks.NONE._db_text,
volume_id="volume-2",
datastore_version_id="1",
cluster_id=self.cluster_id,
type="member")
self.dbinst3 = DBInstance(InstanceTasks.NONE, id="3", name="member3",
compute_instance_id="compute-3",
task_id=InstanceTasks.NONE._code,
task_description=
InstanceTasks.NONE._db_text,
volume_id="volume-3",
datastore_version_id="1",
cluster_id=self.cluster_id,
type="member")
mock_ds1 = Mock()
mock_ds1.name = 'vertica'
mock_dv1 = Mock()
mock_dv1.name = '7.1'
self.clustertasks = ClusterTasks(Mock(),
self.db_cluster,
datastore=mock_ds1,
datastore_version=mock_dv1)
@patch.object(ClusterTasks, 'update_statuses_on_failure')
@patch.object(InstanceServiceStatus, 'find_by')
def test_all_instances_ready_bad_status(self,
mock_find, mock_update):
(mock_find.return_value.
get_status.return_value) = ServiceStatuses.FAILED
ret_val = self.clustertasks._all_instances_ready(["1", "2", "3", "4"],
self.cluster_id)
mock_update.assert_called_with(self.cluster_id)
self.assertEqual(False, ret_val)
@patch.object(InstanceServiceStatus, 'find_by')
def test_all_instances_ready(self, mock_find):
(mock_find.return_value.
get_status.return_value) = ServiceStatuses.RUNNING
ret_val = self.clustertasks._all_instances_ready(["1", "2", "3", "4"],
self.cluster_id)
self.assertEqual(True, ret_val)
@patch.object(ClusterTasks, 'reset_task')
@patch.object(ClusterTasks, 'get_guest')
@patch.object(ClusterTasks, 'get_ip')
@patch.object(ClusterTasks, '_all_instances_ready')
@patch.object(Instance, 'load')
@patch.object(DBInstance, 'find_all')
@patch.object(datastore_models.Datastore, 'load')
@patch.object(datastore_models.DatastoreVersion, 'load_by_uuid')
def test_create_cluster(self, mock_dv, mock_ds, mock_find_all, mock_load,
mock_ready, mock_ip, mock_guest, mock_reset_task):
mock_find_all.return_value.all.return_value = [self.dbinst1]
mock_load.return_value = BaseInstance(Mock(),
self.dbinst1, Mock(),
InstanceServiceStatus(
ServiceStatuses.NEW))
mock_ip.return_value = "10.0.0.2"
self.clustertasks.create_cluster(Mock(), self.cluster_id)
mock_guest.return_value.install_cluster.assert_called_with(['10.0.0.2']
)
mock_reset_task.assert_called()
mock_guest.return_value.cluster_complete.assert_called()