diff --git a/sahara/config.py b/sahara/config.py index a8bd4141d0..4edd053cbb 100644 --- a/sahara/config.py +++ b/sahara/config.py @@ -24,6 +24,7 @@ from sahara import exceptions as ex from sahara.i18n import _ from sahara.plugins import opts as plugins_base from sahara.service.castellan import config as castellan +from sahara.service.edp.data_sources import opts as data_source from sahara.topology import topology_helper from sahara.utils.notification import sender from sahara.utils.openstack import cinder @@ -180,7 +181,8 @@ def list_opts(): heat_engine.heat_engine_opts, templates.heat_engine_opts, ssh_remote.ssh_config_options, - castellan.opts)), + castellan.opts, + data_source.opts)), (poll_utils.timeouts.name, itertools.chain(poll_utils.timeouts_opts)), (api.conductor_group.name, diff --git a/sahara/main.py b/sahara/main.py index 2ef97fa914..4631a03906 100644 --- a/sahara/main.py +++ b/sahara/main.py @@ -31,6 +31,7 @@ from sahara.i18n import _LI from sahara.plugins import base as plugins_base from sahara.service import api from sahara.service.castellan import config as castellan +from sahara.service.edp.data_sources import manager as ds_manager from sahara.service import ops as service_ops from sahara.service import periodic from sahara.utils.openstack import cinder @@ -85,6 +86,8 @@ def setup_common(possible_topdir, service_name): plugins_base.setup_plugins() + ds_manager.setup_data_sources() + LOG.info(_LI('Sahara {service} started').format(service=service_name)) diff --git a/sahara/service/edp/data_sources/__init__.py b/sahara/service/edp/data_sources/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/sahara/service/edp/data_sources/base.py b/sahara/service/edp/data_sources/base.py new file mode 100644 index 0000000000..daa8739456 --- /dev/null +++ b/sahara/service/edp/data_sources/base.py @@ -0,0 +1,109 @@ +# Copyright (c) 2017 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import abc +import random +import re +import string + +import six + +from sahara.plugins import base as plugins_base + + +@six.add_metaclass(abc.ABCMeta) +class DataSourceType(object): + @plugins_base.required_with_default + def construct_url(self, url, job_exec_id): + """Resolve placeholders in the data source url + + Supported placeholders: + * %RANDSTR(len)% - will be replaced with random string of lowercase + letters of length `len` + * %JOB_EXEC_ID% - will be replaced with the job execution ID + + :param url: String that represents an url with placeholders + :param job_exec_id: Id of the job execution + + :returns: String that is an url without placeholders + + """ + def _randstr(match): + random_len = int(match.group(1)) + return ''.join(random.choice(string.ascii_lowercase) + for _ in six.moves.range(random_len)) + + url = url.replace("%JOB_EXEC_ID%", job_exec_id) + url = re.sub(r"%RANDSTR\((\d+)\)%", _randstr, url) + + return url + + @plugins_base.required_with_default + def prepare_cluster(self, data_source, cluster, **kwargs): + """Makes a cluster ready to use this data source + + Different implementations for each data source, for HDFS + will be configure the cluster, for Swift verify credentials, + and so on + + :param data_source: The object handle to a data source + :param cluster: The object handle to a cluster + :returns: None + """ + pass + + @plugins_base.required_with_default + def get_runtime_url(self, url, cluster): + """Get the runtime url of the data source for a cluster + + It will construct a runtime url if needed, if it's not needed + it will use the native url as runtime url + + :param url: String that represents an already constructed url + :param cluster: The object handle to a cluster + :returns: String representing the runtime url + """ + return url + + @plugins_base.required_with_default + def get_urls(self, url, cluster, job_exec_id): + """Get the native url and runtime url of a determined data source + + :param url: String that represents a url (constructed or not) + :param cluster: The object handle to a cluster + :param job_exec_id: Id of the job execution + + :returns: A tuple of the form (native_url, runtime_url), where the urls + are Strings + """ + native_url = self.construct_url(url, job_exec_id) + runtime_url = self.get_runtime_url(native_url, cluster) + return (native_url, runtime_url) + + @plugins_base.required_with_default + def validate(self, data): + """Method that validates the data passed through the API + + This method will be executed during the data source creation and + update + + :raise: If data is invalid, InvalidDataException + """ + pass + + @plugins_base.optional + def _validate_url(self, url): + """Auxiliary method used by the validate method""" + pass diff --git a/sahara/service/edp/data_sources/hdfs/__init__.py b/sahara/service/edp/data_sources/hdfs/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/sahara/service/edp/data_sources/hdfs/implementation.py b/sahara/service/edp/data_sources/hdfs/implementation.py new file mode 100644 index 0000000000..a4aca33b92 --- /dev/null +++ b/sahara/service/edp/data_sources/hdfs/implementation.py @@ -0,0 +1,41 @@ +# Copyright (c) 2017 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import six.moves.urllib.parse as urlparse + +from sahara import exceptions as ex +from sahara.i18n import _ +from sahara.service.edp.data_sources.base import DataSourceType +from sahara.service.edp import hdfs_helper as h + + +class HDFSType(DataSourceType): + def validate(self, data): + self._validate_url(data['url']) + + def _validate_url(self, url): + if len(url) == 0: + raise ex.InvalidDataException(_("HDFS url must not be empty")) + url = urlparse.urlparse(url) + if url.scheme: + if url.scheme != "hdfs": + raise ex.InvalidDataException(_("URL scheme must be 'hdfs'")) + if not url.hostname: + raise ex.InvalidDataException( + _("HDFS url is incorrect, cannot determine a hostname")) + + def prepare_cluster(self, data_source, cluster, **kwargs): + runtime_url = kwargs.pop('runtime_url') + h.configure_cluster_for_hdfs(cluster, runtime_url) diff --git a/sahara/service/edp/data_sources/manager.py b/sahara/service/edp/data_sources/manager.py new file mode 100644 index 0000000000..006f806e1c --- /dev/null +++ b/sahara/service/edp/data_sources/manager.py @@ -0,0 +1,88 @@ +# Copyright (c) 2017 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from oslo_config import cfg +from oslo_log import log as logging +import six +import six.moves.urllib.parse as urlparse +from stevedore import enabled + +from sahara import conductor as cond +from sahara import exceptions as ex +from sahara.i18n import _ +from sahara.i18n import _LI + +conductor = cond.API + +LOG = logging.getLogger(__name__) +CONF = cfg.CONF + + +class DataSourceManager(object): + def __init__(self): + self.data_sources = {} + self._load_data_sources() + + def _load_data_sources(self): + config_ds = CONF.data_source_types + extension_manager = enabled.EnabledExtensionManager( + check_func=lambda ext: ext.name in config_ds, + namespace='sahara.data_source.types', + invoke_on_load=True + ) + + for ext in extension_manager.extensions: + if ext.name in self.data_sources: + raise ex.ConfigurationError( + _("Data source with name '%s' already exists.") % + ext.name) + ext.obj.name = ext.name + self.data_sources[ext.name] = ext.obj + LOG.info(_LI("Data source name {ds_name} loaded {entry_point}") + .format(ds_name=ext.name, + entry_point=ext.entry_point_target)) + + if len(self.data_sources) < len(config_ds): + loaded_ds = set(six.iterkeys(self.data_sources)) + requested_ds = set(config_ds) + raise ex.ConfigurationError( + _("Data sources couldn't be loaded: %s") % + ", ".join(requested_ds - loaded_ds)) + + def get_data_sources(self): + config_ds = CONF.data_source_types + return [self.get_data_source(name).name for name in config_ds] + + def get_data_source(self, name): + res = self.data_sources.get(name) + if res is None: + raise ex.InvalidDataException(_("Invalid data source")) + return res + + def get_data_source_by_url(self, url): + url = urlparse.urlparse(url) + if not url.scheme: + raise ex.InvalidDataException(_("Data source url must have a" + " scheme")) + return self.get_data_source(url.scheme) + + +DATA_SOURCES = None + + +def setup_data_sources(): + global DATA_SOURCES + DATA_SOURCES = DataSourceManager() diff --git a/sahara/service/edp/data_sources/manila/__init__.py b/sahara/service/edp/data_sources/manila/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/sahara/service/edp/data_sources/manila/implementation.py b/sahara/service/edp/data_sources/manila/implementation.py new file mode 100644 index 0000000000..d222b99062 --- /dev/null +++ b/sahara/service/edp/data_sources/manila/implementation.py @@ -0,0 +1,58 @@ +# Copyright (c) 2017 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from oslo_utils import uuidutils +import six.moves.urllib.parse as urlparse + +from sahara import exceptions as ex +from sahara.i18n import _ +from sahara.service.edp.data_sources.base import DataSourceType +from sahara.service.edp import job_utils +from sahara.service import shares as shares_service + + +class ManilaType(DataSourceType): + def validate(self, data): + self._validate_url(data['url']) + + def _validate_url(self, url): + if len(url) == 0: + raise ex.InvalidDataException(_("Manila url must not be empty")) + url = urlparse.urlparse(url) + if url.scheme != "manila": + raise ex.InvalidDataException(_("Manila url scheme must be" + " 'manila'")) + if not uuidutils.is_uuid_like(url.netloc): + raise ex.InvalidDataException(_("Manila url netloc must be a" + " uuid")) + if not url.path: + raise ex.InvalidDataException(_("Manila url path must not be" + " empty")) + + def _prepare_cluster(self, url, cluster): + path = self._get_share_path(url, cluster.shares or []) + if path is None: + path = job_utils.mount_share_at_default_path(url, + cluster) + return path + + def get_runtime_url(self, url, cluster): + path = self._prepare_cluster(url, cluster) + # This gets us the mount point, but we need a file:// scheme to + # indicate a local filesystem path + return "file://{path}".format(path=path) + + def _get_share_path(self, url, shares): + return shares_service.get_share_path(url, shares) diff --git a/sahara/service/edp/data_sources/maprfs/__init__.py b/sahara/service/edp/data_sources/maprfs/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/sahara/service/edp/data_sources/maprfs/implementation.py b/sahara/service/edp/data_sources/maprfs/implementation.py new file mode 100644 index 0000000000..0d4ffe2c54 --- /dev/null +++ b/sahara/service/edp/data_sources/maprfs/implementation.py @@ -0,0 +1,33 @@ +# Copyright (c) 2017 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import six.moves.urllib.parse as urlparse + +from sahara import exceptions as ex +from sahara.i18n import _ +from sahara.service.edp.data_sources.base import DataSourceType + + +class MapRFSType(DataSourceType): + def validate(self, data): + self._validate_url(data['url']) + + def _validate_url(self, url): + if len(url) == 0: + raise ex.InvalidDataException(_("MapR FS url must not be empty")) + url = urlparse.urlparse(url) + if url.scheme: + if url.scheme != "maprfs": + raise ex.InvalidDataException(_("URL scheme must be 'maprfs'")) diff --git a/sahara/service/edp/data_sources/opts.py b/sahara/service/edp/data_sources/opts.py new file mode 100644 index 0000000000..7fd5cc3372 --- /dev/null +++ b/sahara/service/edp/data_sources/opts.py @@ -0,0 +1,28 @@ +# Copyright (c) 2017 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# File contains data sources opts to avoid cyclic imports issue + +from oslo_config import cfg + +opts = [ + cfg.ListOpt('data_source_types', + default=['swift', 'hdfs', 'maprfs', 'manila'], + help='List of data sources types to be loaded. Sahara ' + 'preserves the order of the list when returning it.'), +] + +CONF = cfg.CONF +CONF.register_opts(opts) diff --git a/sahara/service/edp/data_sources/swift/__init__.py b/sahara/service/edp/data_sources/swift/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/sahara/service/edp/data_sources/swift/implementation.py b/sahara/service/edp/data_sources/swift/implementation.py new file mode 100644 index 0000000000..5c9d83267c --- /dev/null +++ b/sahara/service/edp/data_sources/swift/implementation.py @@ -0,0 +1,80 @@ +# Copyright (c) 2017 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from oslo_config import cfg +import six.moves.urllib.parse as urlparse + +from sahara import exceptions as ex +from sahara.i18n import _ +from sahara.service.edp.data_sources.base import DataSourceType +from sahara.swift import swift_helper as sw +from sahara.swift import utils as su +from sahara.utils.types import FrozenDict + +CONF = cfg.CONF + + +class SwiftType(DataSourceType): + def validate(self, data): + self._validate_url(data['url']) + + if not CONF.use_domain_for_proxy_users and "credentials" not in data: + raise ex.InvalidCredentials(_("No credentials provided for Swift")) + if not CONF.use_domain_for_proxy_users and ( + "user" not in data["credentials"]): + raise ex.InvalidCredentials( + _("User is not provided in credentials for Swift")) + if not CONF.use_domain_for_proxy_users and ( + "password" not in data["credentials"]): + raise ex.InvalidCredentials( + _("Password is not provided in credentials for Swift")) + + def _validate_url(self, url): + if len(url) == 0: + raise ex.InvalidDataException(_("Swift url must not be empty")) + + url = urlparse.urlparse(url) + if url.scheme != "swift": + raise ex.InvalidDataException(_("URL scheme must be 'swift'")) + + # The swift url suffix does not have to be included in the netloc. + # However, if the swift suffix indicator is part of the netloc then + # we require the right suffix. + # Additionally, the path must be more than '/' + if (su.SWIFT_URL_SUFFIX_START in url.netloc and not + url.netloc.endswith(su.SWIFT_URL_SUFFIX)) or len(url.path) <= 1: + raise ex.InvalidDataException( + _("URL must be of the form swift://container%s/object") + % su.SWIFT_URL_SUFFIX) + + def prepare_cluster(self, data_source, cluster, **kwargs): + if hasattr(data_source, "credentials"): + job_configs = kwargs.pop('job_configs') + + if isinstance(job_configs, FrozenDict) or \ + job_configs.get('configs', None) is None: + return + + if not job_configs.get('proxy_configs'): + username = data_source.credentials['user'] + password = data_source.credentials['password'] + + # Don't overwrite if there is already a value here + if job_configs['configs'].get(sw.HADOOP_SWIFT_USERNAME, None) \ + is None and (username is not None): + job_configs['configs'][sw.HADOOP_SWIFT_USERNAME] = username + if job_configs['configs'].get(sw.HADOOP_SWIFT_PASSWORD, None) \ + is None and (password is not None): + job_configs['configs'][sw.HADOOP_SWIFT_PASSWORD] = password diff --git a/sahara/service/edp/shares.py b/sahara/service/edp/shares.py new file mode 100644 index 0000000000..69593608ea --- /dev/null +++ b/sahara/service/edp/shares.py @@ -0,0 +1,303 @@ +# Copyright (c) 2015 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import abc +import collections +import itertools + +from oslo_log import log +import six + +from sahara import context +from sahara.i18n import _LW +from sahara.utils.openstack import manila + +LOG = log.getLogger(__name__) + + +def mount_shares(cluster): + """Mounts all shares specified for the cluster and any of its node groups. + + - In the event that a specific share is configured for both the cluster and + a specific node group, configuration at the node group level will be + ignored. + - In the event that utilities required to mount the share are not + already installed on the node, this method may fail if the node cannot + access the internet. + - This method will not remove already-mounted shares. + - This method will not remove or remount (or currently, reconfigure) shares + already mounted to the desired local mount point. + + :param cluster: The cluster model. + """ + + node_groups = (ng for ng in cluster.node_groups if ng.shares) + ng_mounts = [_mount(ng, share_config) for ng in node_groups + for share_config in ng.shares] + c_mounts = [_mount(ng, share_config) for ng in cluster.node_groups + for share_config in cluster.shares or []] + + if not (ng_mounts or c_mounts): + return + + ng_mounts_by_share_id = _group_mounts_by_share_id(ng_mounts) + c_mounts_by_share_id = _group_mounts_by_share_id(c_mounts) + + all_share_ids = (set(ng_mounts_by_share_id.keys()) | + set(c_mounts_by_share_id.keys())) + mounts_by_share_id = { + share_id: c_mounts_by_share_id.get(share_id) or + ng_mounts_by_share_id[share_id] for share_id in all_share_ids} + + all_mounts = itertools.chain(*mounts_by_share_id.values()) + mounts_by_ng_id = _group_mounts_by_ng_id(all_mounts) + + client = manila.client() + handlers_by_share_id = {id: _ShareHandler.create_from_id(id, client) + for id in all_share_ids} + + for mounts in mounts_by_ng_id.values(): + node_group_shares = _NodeGroupShares(mounts[0].node_group) + for mount in mounts: + share_id = mount.share_config['id'] + node_group_shares.add_share(mount.share_config, + handlers_by_share_id[share_id]) + node_group_shares.mount_shares_to_node_group() + + +def unmount_shares(cluster, unmount_share_list): + """Unmounts all shares in unmount_share_list on the given cluster + + :param cluster: The cluster model. + :param unmount_share_list: list of shares to unmount + """ + client = manila.client() + unmount_share_ids = (set(s['id'] for s in unmount_share_list)) + handlers_by_share_id = {id: _ShareHandler.create_from_id(id, client) + for id in unmount_share_ids} + for share in unmount_share_list: + for ng in cluster.node_groups: + for instance in ng.instances: + handlers_by_share_id[share['id']].unmount_from_instance( + instance.remote(), share) + + +_mount = collections.namedtuple('Mount', ['node_group', 'share_config']) + + +def _group_mounts(mounts, grouper): + result = collections.defaultdict(list) + for mount in mounts: + result[grouper(mount)].append(mount) + return result + + +def _group_mounts_by_share_id(mounts): + return _group_mounts(mounts, lambda mount: mount.share_config['id']) + + +def _group_mounts_by_ng_id(mounts): + return _group_mounts(mounts, lambda mount: mount.node_group['id']) + + +class _NodeGroupShares(object): + """Organizes share mounting for a single node group.""" + + _share = collections.namedtuple('Share', ['share_config', 'handler']) + + def __init__(self, node_group): + self.node_group = node_group + self.shares = [] + + def add_share(self, share_config, handler): + """Adds a share to mount; add all shares before mounting.""" + self.shares.append(self._share(share_config, handler)) + + def mount_shares_to_node_group(self): + """Mounts all configured shares to the node group.""" + for instance in self.node_group.instances: + with context.set_current_instance_id(instance.instance_id): + self._mount_shares_to_instance(instance) + + def _mount_shares_to_instance(self, instance): + # Note: Additional iteration here is critical: based on + # experimentation, failure to execute allow_access before spawning + # the remote results in permission failure. + for share in self.shares: + share.handler.allow_access_to_instance(instance, + share.share_config) + with instance.remote() as remote: + share_types = set(type(share.handler) for share in self.shares) + for share_type in share_types: + share_type.setup_instance(remote) + for share in self.shares: + share.handler.mount_to_instance(remote, share.share_config) + + +@six.add_metaclass(abc.ABCMeta) +class _ShareHandler(object): + """Handles mounting of a single share to any number of instances.""" + + @classmethod + def setup_instance(cls, remote): + """Prepares an instance to mount this type of share.""" + pass + + @classmethod + def create_from_id(cls, share_id, client): + """Factory method for creation from a share_id of unknown type.""" + share = manila.get_share(client, share_id, + raise_on_error=True) + mounter_class = _share_types[share.share_proto] + return mounter_class(share, client) + + def __init__(self, share, client): + self.share = share + self.client = client + + def allow_access_to_instance(self, instance, share_config): + """Mounts a specific share to a specific instance.""" + access_level = self._get_access_level(share_config) + accesses = list(filter(lambda x: (x.access_type == 'ip' and + x.access_to == instance.internal_ip), + self.share.access_list())) + if accesses: + access = accesses[0] + if access.access_level not in ('ro', 'rw'): + LOG.warning( + _LW("Unknown permission level {access_level} on share " + "id {share_id} for ip {ip}. Leaving pre-existing " + "permissions.").format( + access_level=access.access_level, + share_id=self.share.id, + ip=instance.internal_ip)) + elif access.access_level == 'ro' and access_level == 'rw': + self.share.deny(access.id) + self.share.allow('ip', instance.internal_ip, access_level) + else: + self.share.allow('ip', instance.internal_ip, access_level) + + @abc.abstractmethod + def mount_to_instance(self, remote, share_info): + """Mounts the share to the instance as configured.""" + pass + + @abc.abstractmethod + def unmount_from_instance(self, remote, share_info): + """Unmounts the share from the instance.""" + pass + + def _get_access_level(self, share_config): + return share_config.get('access_level', 'rw') + + def _default_mount(self): + return '/mnt/{0}'.format(self.share.id) + + def _get_path(self, share_info): + return share_info.get('path', self._default_mount()) + + +class _NFSMounter(_ShareHandler): + """Handles mounting of a single NFS share to any number of instances.""" + + _DEBIAN_INSTALL = "dpkg -s nfs-common || apt-get -y install nfs-common" + _REDHAT_INSTALL = "rpm -q nfs-utils || yum install -y nfs-utils" + + _NFS_CHECKS = { + "centos": _REDHAT_INSTALL, + "fedora": _REDHAT_INSTALL, + "redhatenterpriseserver": _REDHAT_INSTALL, + "ubuntu": _DEBIAN_INSTALL + } + + _MKDIR_COMMAND = 'mkdir -p %s' + _MOUNT_COMMAND = ("mount | grep '%(remote)s' | grep '%(local)s' | " + "grep nfs || mount -t nfs %(access_arg)s %(remote)s " + "%(local)s") + _UNMOUNT_COMMAND = ("umount -f %s ") + _RMDIR_COMMAND = 'rmdir %s' + + @classmethod + def setup_instance(cls, remote): + """Prepares an instance to mount this type of share.""" + distro = remote.get_os_distrib() + if distro in cls._NFS_CHECKS: + command = cls._NFS_CHECKS[distro] + remote.execute_command(command, run_as_root=True) + else: + LOG.warning( + _LW("Cannot verify installation of NFS mount tools for " + "unknown distro {distro}.").format(distro=distro)) + + def mount_to_instance(self, remote, share_info): + """Mounts the share to the instance as configured.""" + local_path = self._get_path(share_info) + access_level = self._get_access_level(share_info) + access_arg = '-w' if access_level == 'rw' else '-r' + + remote.execute_command(self._MKDIR_COMMAND % local_path, + run_as_root=True) + mount_command = self._MOUNT_COMMAND % { + "remote": self.share.export_location, + "local": local_path, + "access_arg": access_arg} + remote.execute_command(mount_command, run_as_root=True) + + def unmount_from_instance(self, remote, share_info): + """Unmounts the share from the instance.""" + local_path = self._get_path(share_info) + + unmount_command = self._UNMOUNT_COMMAND % local_path + rmdir_command = self._RMDIR_COMMAND % local_path + remote.execute_command(unmount_command, run_as_root=True) + remote.execute_command(rmdir_command, run_as_root=True) + + +_share_types = {"NFS": _NFSMounter} +SUPPORTED_SHARE_TYPES = _share_types.keys() + + +def make_share_path(mount_point, path): + return "{0}{1}".format(mount_point, path) + + +def default_mount(share_id): + client = manila.client() + return _ShareHandler.create_from_id(share_id, client)._default_mount() + + +def get_share_path(url, shares): + # url example: 'manila://ManilaShare-uuid/path_to_file' + url = six.moves.urllib.parse.urlparse(url) + # using list() as a python2/3 workaround + share_list = list(filter(lambda s: s['id'] == url.netloc, shares)) + if not share_list: + # Share id is not in the share list, let the caller + # determine a default path if possible + path = None + else: + # We will always select the first one. Let the + # caller determine whether duplicates are okay + mount_point = share_list[0].get('path', None) + + # Do this in two steps instead of passing the default + # expression to get(), because it's a big side effect + if mount_point is None: + # The situation here is that the user specified a + # share without a path, so the default mnt was used + # during cluster provisioning. + mount_point = default_mount(share_list[0]['id']) + path = make_share_path(mount_point, url.path) + return path diff --git a/sahara/tests/unit/service/edp/data_sources/__init__.py b/sahara/tests/unit/service/edp/data_sources/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/sahara/tests/unit/service/edp/data_sources/base_test.py b/sahara/tests/unit/service/edp/data_sources/base_test.py new file mode 100644 index 0000000000..e75f3baa8e --- /dev/null +++ b/sahara/tests/unit/service/edp/data_sources/base_test.py @@ -0,0 +1,70 @@ +# Copyright (c) 2017 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import mock +from oslo_utils import uuidutils + +from sahara.service.edp.data_sources.base import DataSourceType + +import testtools + + +class DataSourceBaseTestCase(testtools.TestCase): + def setUp(self): + super(DataSourceBaseTestCase, self).setUp() + self.ds_base = DataSourceType() + + def test_construct_url_no_placeholders(self): + base_url = "swift://container/input" + job_exec_id = uuidutils.generate_uuid() + + url = self.ds_base.construct_url(base_url, job_exec_id) + + self.assertEqual(base_url, url) + + def test_construct_url_job_exec_id_placeholder(self): + base_url = "swift://container/input.%JOB_EXEC_ID%.out" + job_exec_id = uuidutils.generate_uuid() + + url = self.ds_base.construct_url(base_url, job_exec_id) + + self.assertEqual( + "swift://container/input." + job_exec_id + ".out", url) + + def test_construct_url_randstr_placeholder(self): + base_url = "swift://container/input.%RANDSTR(4)%.%RANDSTR(7)%.out" + job_exec_id = uuidutils.generate_uuid() + + url = self.ds_base.construct_url(base_url, job_exec_id) + + self.assertRegex( + url, "swift://container/input\.[a-z]{4}\.[a-z]{7}\.out") + + def test_construct_url_randstr_and_job_exec_id_placeholder(self): + base_url = "swift://container/input.%JOB_EXEC_ID%.%RANDSTR(7)%.out" + job_exec_id = uuidutils.generate_uuid() + + url = self.ds_base.construct_url(base_url, job_exec_id) + + self.assertRegex( + url, "swift://container/input." + job_exec_id + "\.[a-z]{7}\.out") + + def test_get_urls(self): + url = 'test://url' + cluster = mock.Mock() + job_exec_id = 'test_id' + + self.assertEqual((url, url), self.ds_base.get_urls(url, + cluster, job_exec_id)) diff --git a/sahara/tests/unit/service/edp/data_sources/data_source_manager_support_test.py b/sahara/tests/unit/service/edp/data_sources/data_source_manager_support_test.py new file mode 100644 index 0000000000..b6bf8c3a38 --- /dev/null +++ b/sahara/tests/unit/service/edp/data_sources/data_source_manager_support_test.py @@ -0,0 +1,76 @@ +# Copyright (c) 2017 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import testtools + +import sahara.exceptions as ex +from sahara.service.edp.data_sources import manager as ds_manager +from sahara.tests.unit import base + + +class DataSourceManagerSupportTest(base.SaharaTestCase): + + def setUp(self): + super(DataSourceManagerSupportTest, self).setUp() + ds_manager.setup_data_sources() + + def test_data_sources_loaded(self): + ds_types = [ds.name for ds in + ds_manager.DATA_SOURCES.get_data_sources()] + + self.assertIn('hdfs', ds_types) + self.assertIn('manila', ds_types) + self.assertIn('maprfs', ds_types) + self.assertIn('swift', ds_types) + + def test_get_data_source_by_url(self): + + with testtools.ExpectedException(ex.InvalidDataException): + ds_manager.DATA_SOURCES.get_data_source_by_url('') + + with testtools.ExpectedException(ex.InvalidDataException): + ds_manager.DATA_SOURCES.get_data_source_by_url('hdfs') + + self.assertEqual('hdfs', ds_manager.DATA_SOURCES + .get_data_source_by_url('hdfs://').name) + + self.assertEqual('manila', ds_manager.DATA_SOURCES + .get_data_source_by_url('manila://').name) + + self.assertEqual('maprfs', ds_manager.DATA_SOURCES + .get_data_source_by_url('maprfs://').name) + + self.assertEqual('swift', ds_manager.DATA_SOURCES + .get_data_source_by_url('swift://').name) + + def test_get_data_source(self): + + with testtools.ExpectedException(ex.InvalidDataException): + ds_manager.DATA_SOURCES.get_data_source('') + + with testtools.ExpectedException(ex.InvalidDataException): + ds_manager.DATA_SOURCES.get_data_source('hdf') + + self.assertEqual('hdfs', ds_manager.DATA_SOURCES + .get_data_source('hdfs').name) + + self.assertEqual('manila', ds_manager.DATA_SOURCES + .get_data_source('manila').name) + + self.assertEqual('maprfs', ds_manager.DATA_SOURCES + .get_data_source('maprfs').name) + + self.assertEqual('swift', ds_manager.DATA_SOURCES + .get_data_source('swift').name) diff --git a/sahara/tests/unit/service/edp/data_sources/hdfs/__init__.py b/sahara/tests/unit/service/edp/data_sources/hdfs/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/sahara/tests/unit/service/edp/data_sources/hdfs/test_hdfs_type.py b/sahara/tests/unit/service/edp/data_sources/hdfs/test_hdfs_type.py new file mode 100644 index 0000000000..0507b6a885 --- /dev/null +++ b/sahara/tests/unit/service/edp/data_sources/hdfs/test_hdfs_type.py @@ -0,0 +1,77 @@ +# Copyright (c) 2017 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import mock +import testtools + +import sahara.exceptions as ex +from sahara.service.edp.data_sources.hdfs.implementation import HDFSType +from sahara.tests.unit import base + + +class TestHDFSType(base.SaharaTestCase): + def setUp(self): + super(TestHDFSType, self).setUp() + self.hdfs_type = HDFSType() + + def test_hdfs_type_validation_wrong_schema(self): + data = { + "name": "test_data_data_source", + "url": "hdf://test_cluster/", + "type": "hdfs", + "description": "incorrect url schema" + } + with testtools.ExpectedException(ex.InvalidDataException): + self.hdfs_type.validate(data) + + def test_hdfs_type_validation_correct_url(self): + data = { + "name": "test_data_data_source", + "url": "hdfs://test_cluster/", + "type": "hdfs", + "description": "correct url schema" + } + self.hdfs_type.validate(data) + + def test_hdfs_type_validation_local_rel_url(self): + data = { + "name": "test_data_data_source", + "url": "mydata/input", + "type": "hdfs", + "description": "correct url schema for relative path on local hdfs" + } + self.hdfs_type.validate(data) + + def test_hdfs_type_validation_local_abs_url(self): + data = { + "name": "test_data_data_source", + "url": "/tmp/output", + "type": "hdfs", + "description": "correct url schema for absolute path on local hdfs" + } + self.hdfs_type.validate(data) + + @mock.patch('sahara.service.edp.data_sources.hdfs.implementation.h') + def test_prepare_cluster(self, mock_h): + cluster = mock.Mock() + data_source = mock.Mock() + runtime_url = "runtime_url" + mock_h.configure_cluster_for_hdfs = mock.Mock() + + self.hdfs_type.prepare_cluster(data_source, cluster, + runtime_url=runtime_url) + + mock_h.configure_cluster_for_hdfs.assert_called_once_with(cluster, + runtime_url) diff --git a/sahara/tests/unit/service/edp/data_sources/manila/__init__.py b/sahara/tests/unit/service/edp/data_sources/manila/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/sahara/tests/unit/service/edp/data_sources/manila/test_manila_type.py b/sahara/tests/unit/service/edp/data_sources/manila/test_manila_type.py new file mode 100644 index 0000000000..bc9f26bb13 --- /dev/null +++ b/sahara/tests/unit/service/edp/data_sources/manila/test_manila_type.py @@ -0,0 +1,142 @@ +# Copyright (c) 2017 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import mock +from oslo_utils import uuidutils +import testtools + +import sahara.exceptions as ex +from sahara.service.edp.data_sources.manila.implementation import ManilaType +from sahara.tests.unit import base + + +class _FakeShare(object): + def __init__(self, id, share_proto='NFS'): + self.id = id + self.share_proto = share_proto + + +class TestManilaType(base.SaharaTestCase): + def setUp(self): + super(TestManilaType, self).setUp() + self.manila_type = ManilaType() + + @mock.patch('sahara.utils.openstack.manila.client') + @mock.patch('sahara.conductor.API.cluster_update') + @mock.patch('sahara.service.shares.mount_shares') + def test_prepare_cluster(self, mount_shares, cluster_update, + f_manilaclient): + + cluster_shares = [ + {'id': 'the_share_id', + 'path': '/mnt/mymountpoint'} + ] + + cluster = mock.Mock() + cluster.shares = cluster_shares + + # This should return a default path, and should cause + # a mount at the default location + share = _FakeShare("missing_id") + f_manilaclient.return_value = mock.Mock(shares=mock.Mock( + get=mock.Mock(return_value=share))) + + url = 'manila://missing_id/the_path' + self.manila_type._prepare_cluster(url, cluster) + + self.assertEqual(1, mount_shares.call_count) + self.assertEqual(1, cluster_update.call_count) + + @mock.patch('sahara.service.shares.get_share_path') + @mock.patch('sahara.utils.openstack.manila.client') + @mock.patch('sahara.conductor.API.cluster_update') + @mock.patch('sahara.service.shares.mount_shares') + def test_get_runtime_url(self, mount_shares, cluster_update, + f_manilaclient, get_share_path): + + # first it finds the path, then it doesn't so it has to mount it + # and only then it finds it + get_share_path.side_effect = ['/mnt/mymountpoint/the_path', None, + '/mnt/missing_id/the_path'] + + cluster = mock.Mock() + cluster.shares = [] + url = 'manila://the_share_id/the_path' + + res = self.manila_type.get_runtime_url(url, cluster) + self.assertEqual('file:///mnt/mymountpoint/the_path', res) + self.assertEqual(0, mount_shares.call_count) + self.assertEqual(0, cluster_update.call_count) + + # This should return a default path, and should cause + # a mount at the default location + share = _FakeShare("missing_id") + f_manilaclient.return_value = mock.Mock(shares=mock.Mock( + get=mock.Mock(return_value=share))) + + url = 'manila://missing_id/the_path' + res = self.manila_type.get_runtime_url(url, cluster) + self.assertEqual('file:///mnt/missing_id/the_path', res) + self.assertEqual(1, mount_shares.call_count) + self.assertEqual(1, cluster_update.call_count) + + def test_manila_type_validation_wrong_schema(self): + data = { + "name": "test_data_data_source", + "url": "man://%s" % uuidutils.generate_uuid(), + "type": "manila", + "description": ("incorrect url schema for") + } + with testtools.ExpectedException(ex.InvalidDataException): + self.manila_type.validate(data) + + def test_manila_type_validation_empty_url(self): + data = { + "name": "test_data_data_source", + "url": "", + "type": "manila", + "description": ("empty url") + } + with testtools.ExpectedException(ex.InvalidDataException): + self.manila_type.validate(data) + + def test_manila_type_validation_no_uuid(self): + data = { + "name": "test_data_data_source", + "url": "manila://bob", + "type": "manila", + "description": ("netloc is not a uuid") + } + with testtools.ExpectedException(ex.InvalidDataException): + self.manila_type.validate(data) + + def test_manila_type_validation_no_path(self): + data = { + "name": "test_data_data_source", + "url": "manila://%s" % uuidutils.generate_uuid(), + "type": "manila", + "description": ("netloc is not a uuid") + } + with testtools.ExpectedException(ex.InvalidDataException): + self.manila_type.validate(data) + + def test_manila_type_validation_correct(self): + data = { + "name": "test_data_data_source", + "url": "manila://%s/foo" % uuidutils.generate_uuid(), + "type": "manila", + "description": ("correct url") + } + self.manila_type.validate(data) diff --git a/sahara/tests/unit/service/edp/data_sources/maprfs/__init__.py b/sahara/tests/unit/service/edp/data_sources/maprfs/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/sahara/tests/unit/service/edp/data_sources/maprfs/test_maprfs_type_validation.py b/sahara/tests/unit/service/edp/data_sources/maprfs/test_maprfs_type_validation.py new file mode 100644 index 0000000000..32b72d776a --- /dev/null +++ b/sahara/tests/unit/service/edp/data_sources/maprfs/test_maprfs_type_validation.py @@ -0,0 +1,65 @@ +# Copyright (c) 2017 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import testtools + +import sahara.exceptions as ex +from sahara.service.edp.data_sources.maprfs.implementation import MapRFSType +from sahara.tests.unit import base + + +class TestMapRFSTypeValidation(base.SaharaTestCase): + def setUp(self): + super(TestMapRFSTypeValidation, self).setUp() + self.maprfs_type = MapRFSType() + + def test_maprfs_type_validation_wrong_schema(self): + data = { + "name": "test_data_data_source", + "url": "maprf://test_cluster/", + "type": "maprfs", + "description": "incorrect url schema" + } + with testtools.ExpectedException(ex.InvalidDataException): + self.maprfs_type.validate(data) + + def test_maprfs_type_validation_correct_url(self): + data = { + "name": "test_data_data_source", + "url": "maprfs:///test_cluster/", + "type": "maprfs", + "description": "correct url schema" + } + self.maprfs_type.validate(data) + + def test_maprfs_type_validation_local_rel_url(self): + data = { + "name": "test_data_data_source", + "url": "mydata/input", + "type": "maprfs", + "description": ("correct url schema for" + " relative path on local maprfs") + } + self.maprfs_type.validate(data) + + def test_maprfs_type_validation_local_abs_url(self): + data = { + "name": "test_data_data_source", + "url": "/tmp/output", + "type": "maprfs", + "description": ("correct url schema for" + " absolute path on local maprfs") + } + self.maprfs_type.validate(data) diff --git a/sahara/tests/unit/service/edp/data_sources/swift/__init__.py b/sahara/tests/unit/service/edp/data_sources/swift/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/sahara/tests/unit/service/edp/data_sources/swift/test_swift_type.py b/sahara/tests/unit/service/edp/data_sources/swift/test_swift_type.py new file mode 100644 index 0000000000..c09b1a4747 --- /dev/null +++ b/sahara/tests/unit/service/edp/data_sources/swift/test_swift_type.py @@ -0,0 +1,217 @@ +# Copyright (c) 2017 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import copy + +import mock +from oslo_utils import uuidutils +import testtools + +import sahara.exceptions as ex +from sahara.service.edp.data_sources.swift.implementation import SwiftType +from sahara.service.edp import job_utils +from sahara.swift import utils as su +from sahara.tests.unit import base +from sahara.tests.unit.service.edp import edp_test_utils as u +from sahara.utils.types import FrozenDict + +SAMPLE_SWIFT_URL = "swift://1234/object" +SAMPLE_SWIFT_URL_WITH_SUFFIX = "swift://1234%s/object" % su.SWIFT_URL_SUFFIX + + +class TestSwiftTypeValidation(base.SaharaTestCase): + def setUp(self): + super(TestSwiftTypeValidation, self).setUp() + self.s_type = SwiftType() + + @mock.patch('sahara.context.ctx') + def test_prepare_cluster(self, ctx): + + ctx.return_value = 'dummy' + + ds_url = "swift://container/input" + ds = u.create_data_source(ds_url, + name="data_source", + id=uuidutils.generate_uuid()) + + job_configs = { + 'configs': { + job_utils.DATA_SOURCE_SUBST_NAME: True, + job_utils.DATA_SOURCE_SUBST_UUID: True + } + } + + old_configs = copy.deepcopy(job_configs) + + self.s_type.prepare_cluster(ds, u.create_cluster(), + job_configs=job_configs) + # Swift configs should be filled in since they were blank + self.assertEqual(ds.credentials['user'], + job_configs['configs'] + ['fs.swift.service.sahara.username']) + self.assertEqual(ds.credentials['password'], + job_configs['configs'] + ['fs.swift.service.sahara.password']) + + self.assertNotEqual(old_configs, job_configs) + + job_configs['configs'] = {'fs.swift.service.sahara.username': 'sam', + 'fs.swift.service.sahara.password': 'gamgee', + job_utils.DATA_SOURCE_SUBST_NAME: False, + job_utils.DATA_SOURCE_SUBST_UUID: True} + old_configs = copy.deepcopy(job_configs) + + self.s_type.prepare_cluster(ds, u.create_cluster(), + job_configs=job_configs) + # Swift configs should not be overwritten + self.assertEqual(old_configs['configs'], job_configs['configs']) + + job_configs['configs'] = {job_utils.DATA_SOURCE_SUBST_NAME: True, + job_utils.DATA_SOURCE_SUBST_UUID: False} + job_configs['proxy_configs'] = {'proxy_username': 'john', + 'proxy_password': 'smith', + 'proxy_trust_id': 'trustme'} + old_configs = copy.deepcopy(job_configs) + self.s_type.prepare_cluster(ds, u.create_cluster(), + job_configs=job_configs) + + # Swift configs should be empty and proxy configs should be preserved + self.assertEqual(old_configs['configs'], job_configs['configs']) + self.assertEqual(old_configs['proxy_configs'], + job_configs['proxy_configs']) + + # If there's no configs do nothing + job_configs['configs'] = None + old_configs = copy.deepcopy(job_configs) + + self.s_type.prepare_cluster(ds, u.create_cluster(), + job_configs=job_configs) + self.assertEqual(old_configs, job_configs) + + # If it's a FrozenDict do nothing + job_configs = { + 'configs': { + job_utils.DATA_SOURCE_SUBST_NAME: True, + job_utils.DATA_SOURCE_SUBST_UUID: True + } + } + + old_configs = copy.deepcopy(job_configs) + job_configs = FrozenDict(job_configs) + + self.s_type.prepare_cluster(ds, u.create_cluster(), + job_configs=job_configs) + self.assertEqual(old_configs, job_configs) + + def test_swift_type_validation(self): + data = { + "name": "test_data_data_source", + "url": SAMPLE_SWIFT_URL, + "type": "swift", + "credentials": { + "user": "user", + "password": "password" + }, + "description": "long description" + } + self.s_type.validate(data) + + def test_swift_type_validation_missing_credentials(self): + data = { + "name": "test_data_data_source", + "url": SAMPLE_SWIFT_URL, + "type": "swift", + "description": "long description" + } + with testtools.ExpectedException(ex.InvalidCredentials): + self.s_type.validate(data) + # proxy enabled should allow creation without credentials + self.override_config('use_domain_for_proxy_users', True) + self.s_type.validate(data) + + def test_swift_type_validation_credentials_missing_user(self): + data = { + "name": "test_data_data_source", + "url": SAMPLE_SWIFT_URL, + "type": "swift", + "credentials": { + "password": "password" + }, + "description": "long description" + } + with testtools.ExpectedException(ex.InvalidCredentials): + self.s_type.validate(data) + # proxy enabled should allow creation without credentials + self.override_config('use_domain_for_proxy_users', True) + self.s_type.validate(data) + + def test_swift_type_validation_credentials_missing_password(self): + data = { + "name": "test_data_data_source", + "url": SAMPLE_SWIFT_URL, + "type": "swift", + "credentials": { + "user": "user", + }, + "description": "long description" + } + with testtools.ExpectedException(ex.InvalidCredentials): + self.s_type.validate(data) + # proxy enabled should allow creation without credentials + self.override_config('use_domain_for_proxy_users', True) + self.s_type.validate(data) + + def test_swift_type_validation_wrong_schema(self): + data = { + "name": "test_data_data_source", + "url": "swif://1234/object", + "type": "swift", + "description": "incorrect url schema" + } + with testtools.ExpectedException(ex.InvalidDataException): + self.s_type.validate(data) + + def test_swift_type_validation_explicit_suffix(self): + data = { + "name": "test_data_data_source", + "url": SAMPLE_SWIFT_URL_WITH_SUFFIX, + "type": "swift", + "description": "incorrect url schema", + "credentials": { + "user": "user", + "password": "password" + } + } + self.s_type.validate(data) + + def test_swift_type_validation_wrong_suffix(self): + data = { + "name": "test_data_data_source", + "url": "swift://1234.suffix/object", + "type": "swift", + "description": "incorrect url schema" + } + with testtools.ExpectedException(ex.InvalidDataException): + self.s_type.validate(data) + + def test_swift_type_validation_missing_object(self): + data = { + "name": "test_data_data_source", + "url": "swift://1234/", + "type": "swift", + "description": "incorrect url schema" + } + with testtools.ExpectedException(ex.InvalidDataException): + self.s_type.validate(data) diff --git a/setup.cfg b/setup.cfg index ecc5cb135f..40710945b9 100644 --- a/setup.cfg +++ b/setup.cfg @@ -51,6 +51,12 @@ sahara.cluster.plugins = spark = sahara.plugins.spark.plugin:SparkProvider storm = sahara.plugins.storm.plugin:StormProvider +sahara.data_source.types = + hdfs = sahara.service.edp.data_sources.hdfs.implementation:HDFSType + manila = sahara.service.edp.data_sources.manila.implementation:ManilaType + maprfs = sahara.service.edp.data_sources.maprfs.implementation:MapRFSType + swift = sahara.service.edp.data_sources.swift.implementation:SwiftType + sahara.infrastructure.engine = heat = sahara.service.heat.heat_engine:HeatEngine