Support Data Source pluggability

Changes to support the data source abstraction.

Change-Id: I62274c08619c8e720231eb6aa0563ac6e12497f1
Implements: blueprint data-source-plugin
This commit is contained in:
Marianne Linhares Monteiro 2017-01-31 15:50:54 -03:00
parent 793c269925
commit 1d8b52ca00
27 changed files with 1399 additions and 1 deletions

View File

@ -24,6 +24,7 @@ from sahara import exceptions as ex
from sahara.i18n import _
from sahara.plugins import opts as plugins_base
from sahara.service.castellan import config as castellan
from sahara.service.edp.data_sources import opts as data_source
from sahara.topology import topology_helper
from sahara.utils.notification import sender
from sahara.utils.openstack import cinder
@ -180,7 +181,8 @@ def list_opts():
heat_engine.heat_engine_opts,
templates.heat_engine_opts,
ssh_remote.ssh_config_options,
castellan.opts)),
castellan.opts,
data_source.opts)),
(poll_utils.timeouts.name,
itertools.chain(poll_utils.timeouts_opts)),
(api.conductor_group.name,

View File

@ -31,6 +31,7 @@ from sahara.i18n import _LI
from sahara.plugins import base as plugins_base
from sahara.service import api
from sahara.service.castellan import config as castellan
from sahara.service.edp.data_sources import manager as ds_manager
from sahara.service import ops as service_ops
from sahara.service import periodic
from sahara.utils.openstack import cinder
@ -85,6 +86,8 @@ def setup_common(possible_topdir, service_name):
plugins_base.setup_plugins()
ds_manager.setup_data_sources()
LOG.info(_LI('Sahara {service} started').format(service=service_name))

View File

@ -0,0 +1,109 @@
# Copyright (c) 2017 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
import random
import re
import string
import six
from sahara.plugins import base as plugins_base
@six.add_metaclass(abc.ABCMeta)
class DataSourceType(object):
@plugins_base.required_with_default
def construct_url(self, url, job_exec_id):
"""Resolve placeholders in the data source url
Supported placeholders:
* %RANDSTR(len)% - will be replaced with random string of lowercase
letters of length `len`
* %JOB_EXEC_ID% - will be replaced with the job execution ID
:param url: String that represents an url with placeholders
:param job_exec_id: Id of the job execution
:returns: String that is an url without placeholders
"""
def _randstr(match):
random_len = int(match.group(1))
return ''.join(random.choice(string.ascii_lowercase)
for _ in six.moves.range(random_len))
url = url.replace("%JOB_EXEC_ID%", job_exec_id)
url = re.sub(r"%RANDSTR\((\d+)\)%", _randstr, url)
return url
@plugins_base.required_with_default
def prepare_cluster(self, data_source, cluster, **kwargs):
"""Makes a cluster ready to use this data source
Different implementations for each data source, for HDFS
will be configure the cluster, for Swift verify credentials,
and so on
:param data_source: The object handle to a data source
:param cluster: The object handle to a cluster
:returns: None
"""
pass
@plugins_base.required_with_default
def get_runtime_url(self, url, cluster):
"""Get the runtime url of the data source for a cluster
It will construct a runtime url if needed, if it's not needed
it will use the native url as runtime url
:param url: String that represents an already constructed url
:param cluster: The object handle to a cluster
:returns: String representing the runtime url
"""
return url
@plugins_base.required_with_default
def get_urls(self, url, cluster, job_exec_id):
"""Get the native url and runtime url of a determined data source
:param url: String that represents a url (constructed or not)
:param cluster: The object handle to a cluster
:param job_exec_id: Id of the job execution
:returns: A tuple of the form (native_url, runtime_url), where the urls
are Strings
"""
native_url = self.construct_url(url, job_exec_id)
runtime_url = self.get_runtime_url(native_url, cluster)
return (native_url, runtime_url)
@plugins_base.required_with_default
def validate(self, data):
"""Method that validates the data passed through the API
This method will be executed during the data source creation and
update
:raise: If data is invalid, InvalidDataException
"""
pass
@plugins_base.optional
def _validate_url(self, url):
"""Auxiliary method used by the validate method"""
pass

View File

@ -0,0 +1,41 @@
# Copyright (c) 2017 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import six.moves.urllib.parse as urlparse
from sahara import exceptions as ex
from sahara.i18n import _
from sahara.service.edp.data_sources.base import DataSourceType
from sahara.service.edp import hdfs_helper as h
class HDFSType(DataSourceType):
def validate(self, data):
self._validate_url(data['url'])
def _validate_url(self, url):
if len(url) == 0:
raise ex.InvalidDataException(_("HDFS url must not be empty"))
url = urlparse.urlparse(url)
if url.scheme:
if url.scheme != "hdfs":
raise ex.InvalidDataException(_("URL scheme must be 'hdfs'"))
if not url.hostname:
raise ex.InvalidDataException(
_("HDFS url is incorrect, cannot determine a hostname"))
def prepare_cluster(self, data_source, cluster, **kwargs):
runtime_url = kwargs.pop('runtime_url')
h.configure_cluster_for_hdfs(cluster, runtime_url)

View File

@ -0,0 +1,88 @@
# Copyright (c) 2017 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_config import cfg
from oslo_log import log as logging
import six
import six.moves.urllib.parse as urlparse
from stevedore import enabled
from sahara import conductor as cond
from sahara import exceptions as ex
from sahara.i18n import _
from sahara.i18n import _LI
conductor = cond.API
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
class DataSourceManager(object):
def __init__(self):
self.data_sources = {}
self._load_data_sources()
def _load_data_sources(self):
config_ds = CONF.data_source_types
extension_manager = enabled.EnabledExtensionManager(
check_func=lambda ext: ext.name in config_ds,
namespace='sahara.data_source.types',
invoke_on_load=True
)
for ext in extension_manager.extensions:
if ext.name in self.data_sources:
raise ex.ConfigurationError(
_("Data source with name '%s' already exists.") %
ext.name)
ext.obj.name = ext.name
self.data_sources[ext.name] = ext.obj
LOG.info(_LI("Data source name {ds_name} loaded {entry_point}")
.format(ds_name=ext.name,
entry_point=ext.entry_point_target))
if len(self.data_sources) < len(config_ds):
loaded_ds = set(six.iterkeys(self.data_sources))
requested_ds = set(config_ds)
raise ex.ConfigurationError(
_("Data sources couldn't be loaded: %s") %
", ".join(requested_ds - loaded_ds))
def get_data_sources(self):
config_ds = CONF.data_source_types
return [self.get_data_source(name).name for name in config_ds]
def get_data_source(self, name):
res = self.data_sources.get(name)
if res is None:
raise ex.InvalidDataException(_("Invalid data source"))
return res
def get_data_source_by_url(self, url):
url = urlparse.urlparse(url)
if not url.scheme:
raise ex.InvalidDataException(_("Data source url must have a"
" scheme"))
return self.get_data_source(url.scheme)
DATA_SOURCES = None
def setup_data_sources():
global DATA_SOURCES
DATA_SOURCES = DataSourceManager()

View File

@ -0,0 +1,58 @@
# Copyright (c) 2017 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_utils import uuidutils
import six.moves.urllib.parse as urlparse
from sahara import exceptions as ex
from sahara.i18n import _
from sahara.service.edp.data_sources.base import DataSourceType
from sahara.service.edp import job_utils
from sahara.service import shares as shares_service
class ManilaType(DataSourceType):
def validate(self, data):
self._validate_url(data['url'])
def _validate_url(self, url):
if len(url) == 0:
raise ex.InvalidDataException(_("Manila url must not be empty"))
url = urlparse.urlparse(url)
if url.scheme != "manila":
raise ex.InvalidDataException(_("Manila url scheme must be"
" 'manila'"))
if not uuidutils.is_uuid_like(url.netloc):
raise ex.InvalidDataException(_("Manila url netloc must be a"
" uuid"))
if not url.path:
raise ex.InvalidDataException(_("Manila url path must not be"
" empty"))
def _prepare_cluster(self, url, cluster):
path = self._get_share_path(url, cluster.shares or [])
if path is None:
path = job_utils.mount_share_at_default_path(url,
cluster)
return path
def get_runtime_url(self, url, cluster):
path = self._prepare_cluster(url, cluster)
# This gets us the mount point, but we need a file:// scheme to
# indicate a local filesystem path
return "file://{path}".format(path=path)
def _get_share_path(self, url, shares):
return shares_service.get_share_path(url, shares)

View File

@ -0,0 +1,33 @@
# Copyright (c) 2017 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import six.moves.urllib.parse as urlparse
from sahara import exceptions as ex
from sahara.i18n import _
from sahara.service.edp.data_sources.base import DataSourceType
class MapRFSType(DataSourceType):
def validate(self, data):
self._validate_url(data['url'])
def _validate_url(self, url):
if len(url) == 0:
raise ex.InvalidDataException(_("MapR FS url must not be empty"))
url = urlparse.urlparse(url)
if url.scheme:
if url.scheme != "maprfs":
raise ex.InvalidDataException(_("URL scheme must be 'maprfs'"))

View File

@ -0,0 +1,28 @@
# Copyright (c) 2017 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# File contains data sources opts to avoid cyclic imports issue
from oslo_config import cfg
opts = [
cfg.ListOpt('data_source_types',
default=['swift', 'hdfs', 'maprfs', 'manila'],
help='List of data sources types to be loaded. Sahara '
'preserves the order of the list when returning it.'),
]
CONF = cfg.CONF
CONF.register_opts(opts)

View File

@ -0,0 +1,80 @@
# Copyright (c) 2017 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_config import cfg
import six.moves.urllib.parse as urlparse
from sahara import exceptions as ex
from sahara.i18n import _
from sahara.service.edp.data_sources.base import DataSourceType
from sahara.swift import swift_helper as sw
from sahara.swift import utils as su
from sahara.utils.types import FrozenDict
CONF = cfg.CONF
class SwiftType(DataSourceType):
def validate(self, data):
self._validate_url(data['url'])
if not CONF.use_domain_for_proxy_users and "credentials" not in data:
raise ex.InvalidCredentials(_("No credentials provided for Swift"))
if not CONF.use_domain_for_proxy_users and (
"user" not in data["credentials"]):
raise ex.InvalidCredentials(
_("User is not provided in credentials for Swift"))
if not CONF.use_domain_for_proxy_users and (
"password" not in data["credentials"]):
raise ex.InvalidCredentials(
_("Password is not provided in credentials for Swift"))
def _validate_url(self, url):
if len(url) == 0:
raise ex.InvalidDataException(_("Swift url must not be empty"))
url = urlparse.urlparse(url)
if url.scheme != "swift":
raise ex.InvalidDataException(_("URL scheme must be 'swift'"))
# The swift url suffix does not have to be included in the netloc.
# However, if the swift suffix indicator is part of the netloc then
# we require the right suffix.
# Additionally, the path must be more than '/'
if (su.SWIFT_URL_SUFFIX_START in url.netloc and not
url.netloc.endswith(su.SWIFT_URL_SUFFIX)) or len(url.path) <= 1:
raise ex.InvalidDataException(
_("URL must be of the form swift://container%s/object")
% su.SWIFT_URL_SUFFIX)
def prepare_cluster(self, data_source, cluster, **kwargs):
if hasattr(data_source, "credentials"):
job_configs = kwargs.pop('job_configs')
if isinstance(job_configs, FrozenDict) or \
job_configs.get('configs', None) is None:
return
if not job_configs.get('proxy_configs'):
username = data_source.credentials['user']
password = data_source.credentials['password']
# Don't overwrite if there is already a value here
if job_configs['configs'].get(sw.HADOOP_SWIFT_USERNAME, None) \
is None and (username is not None):
job_configs['configs'][sw.HADOOP_SWIFT_USERNAME] = username
if job_configs['configs'].get(sw.HADOOP_SWIFT_PASSWORD, None) \
is None and (password is not None):
job_configs['configs'][sw.HADOOP_SWIFT_PASSWORD] = password

View File

@ -0,0 +1,303 @@
# Copyright (c) 2015 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
import collections
import itertools
from oslo_log import log
import six
from sahara import context
from sahara.i18n import _LW
from sahara.utils.openstack import manila
LOG = log.getLogger(__name__)
def mount_shares(cluster):
"""Mounts all shares specified for the cluster and any of its node groups.
- In the event that a specific share is configured for both the cluster and
a specific node group, configuration at the node group level will be
ignored.
- In the event that utilities required to mount the share are not
already installed on the node, this method may fail if the node cannot
access the internet.
- This method will not remove already-mounted shares.
- This method will not remove or remount (or currently, reconfigure) shares
already mounted to the desired local mount point.
:param cluster: The cluster model.
"""
node_groups = (ng for ng in cluster.node_groups if ng.shares)
ng_mounts = [_mount(ng, share_config) for ng in node_groups
for share_config in ng.shares]
c_mounts = [_mount(ng, share_config) for ng in cluster.node_groups
for share_config in cluster.shares or []]
if not (ng_mounts or c_mounts):
return
ng_mounts_by_share_id = _group_mounts_by_share_id(ng_mounts)
c_mounts_by_share_id = _group_mounts_by_share_id(c_mounts)
all_share_ids = (set(ng_mounts_by_share_id.keys()) |
set(c_mounts_by_share_id.keys()))
mounts_by_share_id = {
share_id: c_mounts_by_share_id.get(share_id) or
ng_mounts_by_share_id[share_id] for share_id in all_share_ids}
all_mounts = itertools.chain(*mounts_by_share_id.values())
mounts_by_ng_id = _group_mounts_by_ng_id(all_mounts)
client = manila.client()
handlers_by_share_id = {id: _ShareHandler.create_from_id(id, client)
for id in all_share_ids}
for mounts in mounts_by_ng_id.values():
node_group_shares = _NodeGroupShares(mounts[0].node_group)
for mount in mounts:
share_id = mount.share_config['id']
node_group_shares.add_share(mount.share_config,
handlers_by_share_id[share_id])
node_group_shares.mount_shares_to_node_group()
def unmount_shares(cluster, unmount_share_list):
"""Unmounts all shares in unmount_share_list on the given cluster
:param cluster: The cluster model.
:param unmount_share_list: list of shares to unmount
"""
client = manila.client()
unmount_share_ids = (set(s['id'] for s in unmount_share_list))
handlers_by_share_id = {id: _ShareHandler.create_from_id(id, client)
for id in unmount_share_ids}
for share in unmount_share_list:
for ng in cluster.node_groups:
for instance in ng.instances:
handlers_by_share_id[share['id']].unmount_from_instance(
instance.remote(), share)
_mount = collections.namedtuple('Mount', ['node_group', 'share_config'])
def _group_mounts(mounts, grouper):
result = collections.defaultdict(list)
for mount in mounts:
result[grouper(mount)].append(mount)
return result
def _group_mounts_by_share_id(mounts):
return _group_mounts(mounts, lambda mount: mount.share_config['id'])
def _group_mounts_by_ng_id(mounts):
return _group_mounts(mounts, lambda mount: mount.node_group['id'])
class _NodeGroupShares(object):
"""Organizes share mounting for a single node group."""
_share = collections.namedtuple('Share', ['share_config', 'handler'])
def __init__(self, node_group):
self.node_group = node_group
self.shares = []
def add_share(self, share_config, handler):
"""Adds a share to mount; add all shares before mounting."""
self.shares.append(self._share(share_config, handler))
def mount_shares_to_node_group(self):
"""Mounts all configured shares to the node group."""
for instance in self.node_group.instances:
with context.set_current_instance_id(instance.instance_id):
self._mount_shares_to_instance(instance)
def _mount_shares_to_instance(self, instance):
# Note: Additional iteration here is critical: based on
# experimentation, failure to execute allow_access before spawning
# the remote results in permission failure.
for share in self.shares:
share.handler.allow_access_to_instance(instance,
share.share_config)
with instance.remote() as remote:
share_types = set(type(share.handler) for share in self.shares)
for share_type in share_types:
share_type.setup_instance(remote)
for share in self.shares:
share.handler.mount_to_instance(remote, share.share_config)
@six.add_metaclass(abc.ABCMeta)
class _ShareHandler(object):
"""Handles mounting of a single share to any number of instances."""
@classmethod
def setup_instance(cls, remote):
"""Prepares an instance to mount this type of share."""
pass
@classmethod
def create_from_id(cls, share_id, client):
"""Factory method for creation from a share_id of unknown type."""
share = manila.get_share(client, share_id,
raise_on_error=True)
mounter_class = _share_types[share.share_proto]
return mounter_class(share, client)
def __init__(self, share, client):
self.share = share
self.client = client
def allow_access_to_instance(self, instance, share_config):
"""Mounts a specific share to a specific instance."""
access_level = self._get_access_level(share_config)
accesses = list(filter(lambda x: (x.access_type == 'ip' and
x.access_to == instance.internal_ip),
self.share.access_list()))
if accesses:
access = accesses[0]
if access.access_level not in ('ro', 'rw'):
LOG.warning(
_LW("Unknown permission level {access_level} on share "
"id {share_id} for ip {ip}. Leaving pre-existing "
"permissions.").format(
access_level=access.access_level,
share_id=self.share.id,
ip=instance.internal_ip))
elif access.access_level == 'ro' and access_level == 'rw':
self.share.deny(access.id)
self.share.allow('ip', instance.internal_ip, access_level)
else:
self.share.allow('ip', instance.internal_ip, access_level)
@abc.abstractmethod
def mount_to_instance(self, remote, share_info):
"""Mounts the share to the instance as configured."""
pass
@abc.abstractmethod
def unmount_from_instance(self, remote, share_info):
"""Unmounts the share from the instance."""
pass
def _get_access_level(self, share_config):
return share_config.get('access_level', 'rw')
def _default_mount(self):
return '/mnt/{0}'.format(self.share.id)
def _get_path(self, share_info):
return share_info.get('path', self._default_mount())
class _NFSMounter(_ShareHandler):
"""Handles mounting of a single NFS share to any number of instances."""
_DEBIAN_INSTALL = "dpkg -s nfs-common || apt-get -y install nfs-common"
_REDHAT_INSTALL = "rpm -q nfs-utils || yum install -y nfs-utils"
_NFS_CHECKS = {
"centos": _REDHAT_INSTALL,
"fedora": _REDHAT_INSTALL,
"redhatenterpriseserver": _REDHAT_INSTALL,
"ubuntu": _DEBIAN_INSTALL
}
_MKDIR_COMMAND = 'mkdir -p %s'
_MOUNT_COMMAND = ("mount | grep '%(remote)s' | grep '%(local)s' | "
"grep nfs || mount -t nfs %(access_arg)s %(remote)s "
"%(local)s")
_UNMOUNT_COMMAND = ("umount -f %s ")
_RMDIR_COMMAND = 'rmdir %s'
@classmethod
def setup_instance(cls, remote):
"""Prepares an instance to mount this type of share."""
distro = remote.get_os_distrib()
if distro in cls._NFS_CHECKS:
command = cls._NFS_CHECKS[distro]
remote.execute_command(command, run_as_root=True)
else:
LOG.warning(
_LW("Cannot verify installation of NFS mount tools for "
"unknown distro {distro}.").format(distro=distro))
def mount_to_instance(self, remote, share_info):
"""Mounts the share to the instance as configured."""
local_path = self._get_path(share_info)
access_level = self._get_access_level(share_info)
access_arg = '-w' if access_level == 'rw' else '-r'
remote.execute_command(self._MKDIR_COMMAND % local_path,
run_as_root=True)
mount_command = self._MOUNT_COMMAND % {
"remote": self.share.export_location,
"local": local_path,
"access_arg": access_arg}
remote.execute_command(mount_command, run_as_root=True)
def unmount_from_instance(self, remote, share_info):
"""Unmounts the share from the instance."""
local_path = self._get_path(share_info)
unmount_command = self._UNMOUNT_COMMAND % local_path
rmdir_command = self._RMDIR_COMMAND % local_path
remote.execute_command(unmount_command, run_as_root=True)
remote.execute_command(rmdir_command, run_as_root=True)
_share_types = {"NFS": _NFSMounter}
SUPPORTED_SHARE_TYPES = _share_types.keys()
def make_share_path(mount_point, path):
return "{0}{1}".format(mount_point, path)
def default_mount(share_id):
client = manila.client()
return _ShareHandler.create_from_id(share_id, client)._default_mount()
def get_share_path(url, shares):
# url example: 'manila://ManilaShare-uuid/path_to_file'
url = six.moves.urllib.parse.urlparse(url)
# using list() as a python2/3 workaround
share_list = list(filter(lambda s: s['id'] == url.netloc, shares))
if not share_list:
# Share id is not in the share list, let the caller
# determine a default path if possible
path = None
else:
# We will always select the first one. Let the
# caller determine whether duplicates are okay
mount_point = share_list[0].get('path', None)
# Do this in two steps instead of passing the default
# expression to get(), because it's a big side effect
if mount_point is None:
# The situation here is that the user specified a
# share without a path, so the default mnt was used
# during cluster provisioning.
mount_point = default_mount(share_list[0]['id'])
path = make_share_path(mount_point, url.path)
return path

View File

@ -0,0 +1,70 @@
# Copyright (c) 2017 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
from oslo_utils import uuidutils
from sahara.service.edp.data_sources.base import DataSourceType
import testtools
class DataSourceBaseTestCase(testtools.TestCase):
def setUp(self):
super(DataSourceBaseTestCase, self).setUp()
self.ds_base = DataSourceType()
def test_construct_url_no_placeholders(self):
base_url = "swift://container/input"
job_exec_id = uuidutils.generate_uuid()
url = self.ds_base.construct_url(base_url, job_exec_id)
self.assertEqual(base_url, url)
def test_construct_url_job_exec_id_placeholder(self):
base_url = "swift://container/input.%JOB_EXEC_ID%.out"
job_exec_id = uuidutils.generate_uuid()
url = self.ds_base.construct_url(base_url, job_exec_id)
self.assertEqual(
"swift://container/input." + job_exec_id + ".out", url)
def test_construct_url_randstr_placeholder(self):
base_url = "swift://container/input.%RANDSTR(4)%.%RANDSTR(7)%.out"
job_exec_id = uuidutils.generate_uuid()
url = self.ds_base.construct_url(base_url, job_exec_id)
self.assertRegex(
url, "swift://container/input\.[a-z]{4}\.[a-z]{7}\.out")
def test_construct_url_randstr_and_job_exec_id_placeholder(self):
base_url = "swift://container/input.%JOB_EXEC_ID%.%RANDSTR(7)%.out"
job_exec_id = uuidutils.generate_uuid()
url = self.ds_base.construct_url(base_url, job_exec_id)
self.assertRegex(
url, "swift://container/input." + job_exec_id + "\.[a-z]{7}\.out")
def test_get_urls(self):
url = 'test://url'
cluster = mock.Mock()
job_exec_id = 'test_id'
self.assertEqual((url, url), self.ds_base.get_urls(url,
cluster, job_exec_id))

View File

@ -0,0 +1,76 @@
# Copyright (c) 2017 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import testtools
import sahara.exceptions as ex
from sahara.service.edp.data_sources import manager as ds_manager
from sahara.tests.unit import base
class DataSourceManagerSupportTest(base.SaharaTestCase):
def setUp(self):
super(DataSourceManagerSupportTest, self).setUp()
ds_manager.setup_data_sources()
def test_data_sources_loaded(self):
ds_types = [ds.name for ds in
ds_manager.DATA_SOURCES.get_data_sources()]
self.assertIn('hdfs', ds_types)
self.assertIn('manila', ds_types)
self.assertIn('maprfs', ds_types)
self.assertIn('swift', ds_types)
def test_get_data_source_by_url(self):
with testtools.ExpectedException(ex.InvalidDataException):
ds_manager.DATA_SOURCES.get_data_source_by_url('')
with testtools.ExpectedException(ex.InvalidDataException):
ds_manager.DATA_SOURCES.get_data_source_by_url('hdfs')
self.assertEqual('hdfs', ds_manager.DATA_SOURCES
.get_data_source_by_url('hdfs://').name)
self.assertEqual('manila', ds_manager.DATA_SOURCES
.get_data_source_by_url('manila://').name)
self.assertEqual('maprfs', ds_manager.DATA_SOURCES
.get_data_source_by_url('maprfs://').name)
self.assertEqual('swift', ds_manager.DATA_SOURCES
.get_data_source_by_url('swift://').name)
def test_get_data_source(self):
with testtools.ExpectedException(ex.InvalidDataException):
ds_manager.DATA_SOURCES.get_data_source('')
with testtools.ExpectedException(ex.InvalidDataException):
ds_manager.DATA_SOURCES.get_data_source('hdf')
self.assertEqual('hdfs', ds_manager.DATA_SOURCES
.get_data_source('hdfs').name)
self.assertEqual('manila', ds_manager.DATA_SOURCES
.get_data_source('manila').name)
self.assertEqual('maprfs', ds_manager.DATA_SOURCES
.get_data_source('maprfs').name)
self.assertEqual('swift', ds_manager.DATA_SOURCES
.get_data_source('swift').name)

View File

@ -0,0 +1,77 @@
# Copyright (c) 2017 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
import testtools
import sahara.exceptions as ex
from sahara.service.edp.data_sources.hdfs.implementation import HDFSType
from sahara.tests.unit import base
class TestHDFSType(base.SaharaTestCase):
def setUp(self):
super(TestHDFSType, self).setUp()
self.hdfs_type = HDFSType()
def test_hdfs_type_validation_wrong_schema(self):
data = {
"name": "test_data_data_source",
"url": "hdf://test_cluster/",
"type": "hdfs",
"description": "incorrect url schema"
}
with testtools.ExpectedException(ex.InvalidDataException):
self.hdfs_type.validate(data)
def test_hdfs_type_validation_correct_url(self):
data = {
"name": "test_data_data_source",
"url": "hdfs://test_cluster/",
"type": "hdfs",
"description": "correct url schema"
}
self.hdfs_type.validate(data)
def test_hdfs_type_validation_local_rel_url(self):
data = {
"name": "test_data_data_source",
"url": "mydata/input",
"type": "hdfs",
"description": "correct url schema for relative path on local hdfs"
}
self.hdfs_type.validate(data)
def test_hdfs_type_validation_local_abs_url(self):
data = {
"name": "test_data_data_source",
"url": "/tmp/output",
"type": "hdfs",
"description": "correct url schema for absolute path on local hdfs"
}
self.hdfs_type.validate(data)
@mock.patch('sahara.service.edp.data_sources.hdfs.implementation.h')
def test_prepare_cluster(self, mock_h):
cluster = mock.Mock()
data_source = mock.Mock()
runtime_url = "runtime_url"
mock_h.configure_cluster_for_hdfs = mock.Mock()
self.hdfs_type.prepare_cluster(data_source, cluster,
runtime_url=runtime_url)
mock_h.configure_cluster_for_hdfs.assert_called_once_with(cluster,
runtime_url)

View File

@ -0,0 +1,142 @@
# Copyright (c) 2017 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
from oslo_utils import uuidutils
import testtools
import sahara.exceptions as ex
from sahara.service.edp.data_sources.manila.implementation import ManilaType
from sahara.tests.unit import base
class _FakeShare(object):
def __init__(self, id, share_proto='NFS'):
self.id = id
self.share_proto = share_proto
class TestManilaType(base.SaharaTestCase):
def setUp(self):
super(TestManilaType, self).setUp()
self.manila_type = ManilaType()
@mock.patch('sahara.utils.openstack.manila.client')
@mock.patch('sahara.conductor.API.cluster_update')
@mock.patch('sahara.service.shares.mount_shares')
def test_prepare_cluster(self, mount_shares, cluster_update,
f_manilaclient):
cluster_shares = [
{'id': 'the_share_id',
'path': '/mnt/mymountpoint'}
]
cluster = mock.Mock()
cluster.shares = cluster_shares
# This should return a default path, and should cause
# a mount at the default location
share = _FakeShare("missing_id")
f_manilaclient.return_value = mock.Mock(shares=mock.Mock(
get=mock.Mock(return_value=share)))
url = 'manila://missing_id/the_path'
self.manila_type._prepare_cluster(url, cluster)
self.assertEqual(1, mount_shares.call_count)
self.assertEqual(1, cluster_update.call_count)
@mock.patch('sahara.service.shares.get_share_path')
@mock.patch('sahara.utils.openstack.manila.client')
@mock.patch('sahara.conductor.API.cluster_update')
@mock.patch('sahara.service.shares.mount_shares')
def test_get_runtime_url(self, mount_shares, cluster_update,
f_manilaclient, get_share_path):
# first it finds the path, then it doesn't so it has to mount it
# and only then it finds it
get_share_path.side_effect = ['/mnt/mymountpoint/the_path', None,
'/mnt/missing_id/the_path']
cluster = mock.Mock()
cluster.shares = []
url = 'manila://the_share_id/the_path'
res = self.manila_type.get_runtime_url(url, cluster)
self.assertEqual('file:///mnt/mymountpoint/the_path', res)
self.assertEqual(0, mount_shares.call_count)
self.assertEqual(0, cluster_update.call_count)
# This should return a default path, and should cause
# a mount at the default location
share = _FakeShare("missing_id")
f_manilaclient.return_value = mock.Mock(shares=mock.Mock(
get=mock.Mock(return_value=share)))
url = 'manila://missing_id/the_path'
res = self.manila_type.get_runtime_url(url, cluster)
self.assertEqual('file:///mnt/missing_id/the_path', res)
self.assertEqual(1, mount_shares.call_count)
self.assertEqual(1, cluster_update.call_count)
def test_manila_type_validation_wrong_schema(self):
data = {
"name": "test_data_data_source",
"url": "man://%s" % uuidutils.generate_uuid(),
"type": "manila",
"description": ("incorrect url schema for")
}
with testtools.ExpectedException(ex.InvalidDataException):
self.manila_type.validate(data)
def test_manila_type_validation_empty_url(self):
data = {
"name": "test_data_data_source",
"url": "",
"type": "manila",
"description": ("empty url")
}
with testtools.ExpectedException(ex.InvalidDataException):
self.manila_type.validate(data)
def test_manila_type_validation_no_uuid(self):
data = {
"name": "test_data_data_source",
"url": "manila://bob",
"type": "manila",
"description": ("netloc is not a uuid")
}
with testtools.ExpectedException(ex.InvalidDataException):
self.manila_type.validate(data)
def test_manila_type_validation_no_path(self):
data = {
"name": "test_data_data_source",
"url": "manila://%s" % uuidutils.generate_uuid(),
"type": "manila",
"description": ("netloc is not a uuid")
}
with testtools.ExpectedException(ex.InvalidDataException):
self.manila_type.validate(data)
def test_manila_type_validation_correct(self):
data = {
"name": "test_data_data_source",
"url": "manila://%s/foo" % uuidutils.generate_uuid(),
"type": "manila",
"description": ("correct url")
}
self.manila_type.validate(data)

View File

@ -0,0 +1,65 @@
# Copyright (c) 2017 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import testtools
import sahara.exceptions as ex
from sahara.service.edp.data_sources.maprfs.implementation import MapRFSType
from sahara.tests.unit import base
class TestMapRFSTypeValidation(base.SaharaTestCase):
def setUp(self):
super(TestMapRFSTypeValidation, self).setUp()
self.maprfs_type = MapRFSType()
def test_maprfs_type_validation_wrong_schema(self):
data = {
"name": "test_data_data_source",
"url": "maprf://test_cluster/",
"type": "maprfs",
"description": "incorrect url schema"
}
with testtools.ExpectedException(ex.InvalidDataException):
self.maprfs_type.validate(data)
def test_maprfs_type_validation_correct_url(self):
data = {
"name": "test_data_data_source",
"url": "maprfs:///test_cluster/",
"type": "maprfs",
"description": "correct url schema"
}
self.maprfs_type.validate(data)
def test_maprfs_type_validation_local_rel_url(self):
data = {
"name": "test_data_data_source",
"url": "mydata/input",
"type": "maprfs",
"description": ("correct url schema for"
" relative path on local maprfs")
}
self.maprfs_type.validate(data)
def test_maprfs_type_validation_local_abs_url(self):
data = {
"name": "test_data_data_source",
"url": "/tmp/output",
"type": "maprfs",
"description": ("correct url schema for"
" absolute path on local maprfs")
}
self.maprfs_type.validate(data)

View File

@ -0,0 +1,217 @@
# Copyright (c) 2017 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
import mock
from oslo_utils import uuidutils
import testtools
import sahara.exceptions as ex
from sahara.service.edp.data_sources.swift.implementation import SwiftType
from sahara.service.edp import job_utils
from sahara.swift import utils as su
from sahara.tests.unit import base
from sahara.tests.unit.service.edp import edp_test_utils as u
from sahara.utils.types import FrozenDict
SAMPLE_SWIFT_URL = "swift://1234/object"
SAMPLE_SWIFT_URL_WITH_SUFFIX = "swift://1234%s/object" % su.SWIFT_URL_SUFFIX
class TestSwiftTypeValidation(base.SaharaTestCase):
def setUp(self):
super(TestSwiftTypeValidation, self).setUp()
self.s_type = SwiftType()
@mock.patch('sahara.context.ctx')
def test_prepare_cluster(self, ctx):
ctx.return_value = 'dummy'
ds_url = "swift://container/input"
ds = u.create_data_source(ds_url,
name="data_source",
id=uuidutils.generate_uuid())
job_configs = {
'configs': {
job_utils.DATA_SOURCE_SUBST_NAME: True,
job_utils.DATA_SOURCE_SUBST_UUID: True
}
}
old_configs = copy.deepcopy(job_configs)
self.s_type.prepare_cluster(ds, u.create_cluster(),
job_configs=job_configs)
# Swift configs should be filled in since they were blank
self.assertEqual(ds.credentials['user'],
job_configs['configs']
['fs.swift.service.sahara.username'])
self.assertEqual(ds.credentials['password'],
job_configs['configs']
['fs.swift.service.sahara.password'])
self.assertNotEqual(old_configs, job_configs)
job_configs['configs'] = {'fs.swift.service.sahara.username': 'sam',
'fs.swift.service.sahara.password': 'gamgee',
job_utils.DATA_SOURCE_SUBST_NAME: False,
job_utils.DATA_SOURCE_SUBST_UUID: True}
old_configs = copy.deepcopy(job_configs)
self.s_type.prepare_cluster(ds, u.create_cluster(),
job_configs=job_configs)
# Swift configs should not be overwritten
self.assertEqual(old_configs['configs'], job_configs['configs'])
job_configs['configs'] = {job_utils.DATA_SOURCE_SUBST_NAME: True,
job_utils.DATA_SOURCE_SUBST_UUID: False}
job_configs['proxy_configs'] = {'proxy_username': 'john',
'proxy_password': 'smith',
'proxy_trust_id': 'trustme'}
old_configs = copy.deepcopy(job_configs)
self.s_type.prepare_cluster(ds, u.create_cluster(),
job_configs=job_configs)
# Swift configs should be empty and proxy configs should be preserved
self.assertEqual(old_configs['configs'], job_configs['configs'])
self.assertEqual(old_configs['proxy_configs'],
job_configs['proxy_configs'])
# If there's no configs do nothing
job_configs['configs'] = None
old_configs = copy.deepcopy(job_configs)
self.s_type.prepare_cluster(ds, u.create_cluster(),
job_configs=job_configs)
self.assertEqual(old_configs, job_configs)
# If it's a FrozenDict do nothing
job_configs = {
'configs': {
job_utils.DATA_SOURCE_SUBST_NAME: True,
job_utils.DATA_SOURCE_SUBST_UUID: True
}
}
old_configs = copy.deepcopy(job_configs)
job_configs = FrozenDict(job_configs)
self.s_type.prepare_cluster(ds, u.create_cluster(),
job_configs=job_configs)
self.assertEqual(old_configs, job_configs)
def test_swift_type_validation(self):
data = {
"name": "test_data_data_source",
"url": SAMPLE_SWIFT_URL,
"type": "swift",
"credentials": {
"user": "user",
"password": "password"
},
"description": "long description"
}
self.s_type.validate(data)
def test_swift_type_validation_missing_credentials(self):
data = {
"name": "test_data_data_source",
"url": SAMPLE_SWIFT_URL,
"type": "swift",
"description": "long description"
}
with testtools.ExpectedException(ex.InvalidCredentials):
self.s_type.validate(data)
# proxy enabled should allow creation without credentials
self.override_config('use_domain_for_proxy_users', True)
self.s_type.validate(data)
def test_swift_type_validation_credentials_missing_user(self):
data = {
"name": "test_data_data_source",
"url": SAMPLE_SWIFT_URL,
"type": "swift",
"credentials": {
"password": "password"
},
"description": "long description"
}
with testtools.ExpectedException(ex.InvalidCredentials):
self.s_type.validate(data)
# proxy enabled should allow creation without credentials
self.override_config('use_domain_for_proxy_users', True)
self.s_type.validate(data)
def test_swift_type_validation_credentials_missing_password(self):
data = {
"name": "test_data_data_source",
"url": SAMPLE_SWIFT_URL,
"type": "swift",
"credentials": {
"user": "user",
},
"description": "long description"
}
with testtools.ExpectedException(ex.InvalidCredentials):
self.s_type.validate(data)
# proxy enabled should allow creation without credentials
self.override_config('use_domain_for_proxy_users', True)
self.s_type.validate(data)
def test_swift_type_validation_wrong_schema(self):
data = {
"name": "test_data_data_source",
"url": "swif://1234/object",
"type": "swift",
"description": "incorrect url schema"
}
with testtools.ExpectedException(ex.InvalidDataException):
self.s_type.validate(data)
def test_swift_type_validation_explicit_suffix(self):
data = {
"name": "test_data_data_source",
"url": SAMPLE_SWIFT_URL_WITH_SUFFIX,
"type": "swift",
"description": "incorrect url schema",
"credentials": {
"user": "user",
"password": "password"
}
}
self.s_type.validate(data)
def test_swift_type_validation_wrong_suffix(self):
data = {
"name": "test_data_data_source",
"url": "swift://1234.suffix/object",
"type": "swift",
"description": "incorrect url schema"
}
with testtools.ExpectedException(ex.InvalidDataException):
self.s_type.validate(data)
def test_swift_type_validation_missing_object(self):
data = {
"name": "test_data_data_source",
"url": "swift://1234/",
"type": "swift",
"description": "incorrect url schema"
}
with testtools.ExpectedException(ex.InvalidDataException):
self.s_type.validate(data)

View File

@ -52,6 +52,12 @@ sahara.cluster.plugins =
spark = sahara.plugins.spark.plugin:SparkProvider
storm = sahara.plugins.storm.plugin:StormProvider
sahara.data_source.types =
hdfs = sahara.service.edp.data_sources.hdfs.implementation:HDFSType
manila = sahara.service.edp.data_sources.manila.implementation:ManilaType
maprfs = sahara.service.edp.data_sources.maprfs.implementation:MapRFSType
swift = sahara.service.edp.data_sources.swift.implementation:SwiftType
sahara.infrastructure.engine =
heat = sahara.service.heat.heat_engine:HeatEngine