diff --git a/setup.cfg b/setup.cfg index ddb72148..3837fcdd 100644 --- a/setup.cfg +++ b/setup.cfg @@ -36,8 +36,8 @@ paste.filter_factory = storlet_handler = storlets.swift_middleware.storlet_handler:filter_factory storlets.gateways = - stub = storlets.gateway.gateways.stub:StorletGatewayStub - docker = storlets.gateway.gateways.docker:StorletGatewayDocker + stub = storlets.gateway.gateways.stub:StubStorletGateway + docker = storlets.gateway.gateways.docker:DockerStorletGateway console_scripts = sbus = storlets.sbus.cli:main diff --git a/storlets/gateway/gateways/container/__init__.py b/storlets/gateway/gateways/container/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/storlets/gateway/gateways/container/gateway.py b/storlets/gateway/gateways/container/gateway.py new file mode 100644 index 00000000..8fb99a45 --- /dev/null +++ b/storlets/gateway/gateways/container/gateway.py @@ -0,0 +1,372 @@ +# Copyright IBM Corp. 2015, 2015 All Rights Reserved +# Copyright (c) 2010-2016 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import shutil + +from storlets.gateway.common.stob import StorletRequest +from storlets.gateway.gateways.base import StorletGatewayBase +from storlets.gateway.gateways.container.runtime import RunTimePaths, \ + StorletInvocationProtocol + + +class ContainerStorletRequest(StorletRequest): + """ + The ContainerStorletRequest class represents a request to be processed by + the storlet the request is derived from the Swift request and + essentially consists of: + 1. A data stream to be processed + 2. Metadata identifying the stream + """ + + # TODO(takashi): Some of following parameters should be defined common + # parameters for StorletRequest + required_options = ['storlet_main', 'storlet_language', 'file_manager'] + + def __init__(self, storlet_id, params, user_metadata, data_iter=None, + data_fd=None, options=None): + """ + :param storlet_id: storlet id + :param params: execution parameters + :param user_metadata: user metadata + :param data_iter: an iterator to read data + :param data_fd: a file descriptor to read data + :param options: a dictionaly which stores gateway specific options. + :raises ValueError: when some of the required options (storlet_main + and file_manager) are missing + """ + super(ContainerStorletRequest, self).__init__( + storlet_id, params, user_metadata, data_iter, data_fd, + options=options) + + self.generate_log = self.options.get('generate_log', False) + + self.storlet_main = self.options['storlet_main'] + self.storlet_language = self.options['storlet_language'] + self.storlet_language_version = \ + self.options.get('storlet_language_version') + + if self.options.get('storlet_dependency'): + self.dependencies = [ + x.strip() for x + in self.options['storlet_dependency'].split(',') + if x.strip()] + else: + self.dependencies = [] + + self.file_manager = self.options['file_manager'] + + self.start = self.options.get('range_start') + self.end = self.options.get('range_end') + + @property + def has_range(self): + """ + Whether the input range is given + """ + return self.start is not None and self.end is not None + + +class StorletGatewayContainer(StorletGatewayBase): + + request_class = ContainerStorletRequest + sandbox = None + + def __init__(self, conf, logger, scope): + """ + :param conf: a dict for gateway conf + :param logger: a logger instance + :param scope: scope name to identify the container + """ + super(StorletGatewayContainer, self).__init__(conf, logger, scope) + self.storlet_timeout = float(self.conf.get('storlet_timeout', 40)) + self.paths = RunTimePaths(scope, conf) + + @classmethod + def validate_storlet_registration(cls, params, name): + """ + Validate required parameters for storlet file + + :param params: parameters related to the storlet file + :param name: name of the storlet file + :raises ValueError: if some of the required parameters are missing, + or some of the parameters are invalid + """ + mandatory = ['Language', 'Interface-Version', 'Object-Metadata', + 'Main'] + cls._check_mandatory_params(params, mandatory) + + if params['Language'].lower() == 'java': + if '-' not in name or '.' not in name: + raise ValueError('Storlet name is incorrect') + elif params['Language'].lower() == 'python': + try: + version = float(params.get('Language-Version', 3)) + except ValueError: + raise ValueError('Language-Version is invalid') + + if int(version) != 3: + # TODO(kota_): more strict version check should be nice. + raise ValueError('Not supported version specified') + + if name.endswith('.py'): + cls_name = params['Main'] + if not cls_name.startswith(name[:-3] + '.'): + raise ValueError('Main class should be included in ' + 'storlet file') + + if len(cls_name.split('.')) != 2: + raise ValueError('Submodule is currently not supported') + # TODO(takashi): Add support for sdist tar.gz + else: + raise ValueError('Storlet name is incorrect') + else: + raise ValueError('Unsupported Language') + + dep = params.get('Dependency') + if dep: + deps = dep.split(',') + if name in deps: + raise ValueError('Using the same name for storlet and ' + 'dependency is not allowed') + if len(deps) != len(set(deps)): + raise ValueError('Duplicated name in dependencies') + + @classmethod + def validate_dependency_registration(cls, params, name): + """ + Validate required parameters for dependency file + + :param params: parameters related to the dependency file + :param name: name of the dependency file + :raises ValueError: if some of the required parameters are missing, + or some of the parameters are invalid + """ + mandatory = ['Dependency-Version'] + cls._check_mandatory_params(params, mandatory) + + perm = params.get('Dependency-Permissions') + if perm is not None: + try: + perm_int = int(perm, 8) + except ValueError: + raise ValueError('Dependency permission is incorrect') + if (perm_int & int('600', 8)) != int('600', 8): + raise ValueError('The owner should have rw permission') + + @classmethod + def _check_mandatory_params(cls, params, mandatory): + """ + Ensure that we have all mandatory parameters in the given parameters + + :param params: file parameters + :param mandatory: required parameters + :raises ValueError: if some of the required parameters are missing + """ + for md in mandatory: + if md not in params: + raise ValueError('Mandatory parameter is missing' + ': {0}'.format(md)) + + def invocation_flow(self, sreq, extra_sources=None): + """ + Invoke the backend protocol with gateway + + :param sreq: StorletRequest instance + :param extra_sources (WIP): A list of StorletRequest instance to gather + as extra resoureces to feed to storlet + container as data source + :return: StorletResponse instance + """ + run_time_sbox = self.sandbox(self.scope, self.conf, self.logger) + container_updated = self.update_container_from_cache(sreq) + run_time_sbox.activate_storlet_daemon(sreq, container_updated) + self._add_system_params(sreq) + + slog_path = self.paths.get_host_slog_path(sreq.storlet_main) + storlet_pipe_path = \ + self.paths.get_host_storlet_pipe(sreq.storlet_main) + + sprotocol = StorletInvocationProtocol(sreq, + storlet_pipe_path, + slog_path, + self.storlet_timeout, + self.logger, + extra_sources=extra_sources) + + sresp = sprotocol.communicate() + + self._upload_storlet_logs(slog_path, sreq) + + return sresp + + def _add_system_params(self, sreq): + """ + Adds Storlet engine specific parameters to the invocation + + currently, this consists only of the execution path of the + Storlet within the container. + + :params params: Request parameters + """ + sreq.params['storlet_execution_path'] = self. \ + paths.get_sbox_storlet_dir(sreq.storlet_main) + + def _upload_storlet_logs(self, slog_path, sreq): + """ + Upload storlet execution log as a swift object + + :param slog_path: target path + :params sreq: ContainerStorletRequest instance + """ + if sreq.generate_log: + with open(slog_path, 'rb') as logfile: + storlet_name = sreq.storlet_id.split('-')[0] + log_obj_name = '%s.log' % storlet_name + sreq.file_manager.put_log(log_obj_name, logfile) + + def bring_from_cache(self, obj_name, sreq, is_storlet): + """ + Auxiliary function that: + + (1) Brings from Swift obj_name, either this is in a + storlet or a storlet dependency. + (2) Copies from local cache into the conrainer + If this is a Storlet then also validates that the cache is updated + with most recent copy of the Storlet compared to the copy residing in + Swift. + + :params obj_name: name of the object + :params sreq: ContainerStorletRequest instance + :params is_storlet: True if the object is a storlet object + False if the object is a dependency object + :returns: Whether the container was updated with obj_name + """ + # Determine the cache we are to work with + # e.g. dependency or storlet + if is_storlet: + cache_dir = self.paths.host_storlet_cache_dir + get_func = sreq.file_manager.get_storlet + else: + cache_dir = self.paths.host_dependency_cache_dir + get_func = sreq.file_manager.get_dependency + + if not os.path.exists(cache_dir): + os.makedirs(cache_dir, 0o700) + + # cache_target_path is the actual object we need to deal with + # e.g. a concrete storlet or dependency we need to bring/update + cache_target_path = os.path.join(cache_dir, obj_name) + + # Determine if we need to update the cache for cache_target_path + # We default for no + update_cache = False + + # If it does not exist in cache, we obviously need to bring + if not os.path.isfile(cache_target_path): + update_cache = True + elif is_storlet: + # The cache_target_path exists, we test if it is up-to-date + # with the metadata we got. + # We mention that this is currently applicable for storlets + # only, and not for dependencies. + # This will change when we will head dependencies as well + fstat = os.stat(cache_target_path) + storlet_or_size = int( + sreq.options['storlet_content_length'].rstrip("L")) + storlet_or_time = float(sreq.options['storlet_x_timestamp']) + b_storlet_size_changed = fstat.st_size != storlet_or_size + b_storlet_file_updated = float(fstat.st_mtime) < storlet_or_time + if b_storlet_size_changed or b_storlet_file_updated: + update_cache = True + + if update_cache: + # If the cache needs to be updated, then get on with it + # bring the object from storage + data_iter, perm = get_func(obj_name) + + if perm: + perm = int(perm, 8) & 0o700 + else: + perm = 0o600 + + # TODO(takashi): Do not directly write to target path + with open(cache_target_path, 'wb') as fn: + os.chmod(cache_target_path, perm) + for data in data_iter: + fn.write(data) + + # The node's local cache is now updated. + # We now verify if we need to update the + # container itself. + # The container needs to be updated if: + # 1. The container does not hold a copy of the object + # 2. The container holds an older version of the object + update_container = False + container_storlet_path = \ + self.paths.get_host_storlet_dir(sreq.storlet_main) + container_target_path = os.path.join(container_storlet_path, obj_name) + + if not os.path.exists(container_storlet_path): + os.makedirs(container_storlet_path, 0o700) + update_container = True + elif not os.path.isfile(container_target_path): + update_container = True + else: + fstat_cached = os.stat(cache_target_path) + fstat_container = os.stat(container_target_path) + if fstat_cached.st_size != fstat_container.st_size: + update_container = True + if fstat_cached.st_mtime < fstat_container.st_mtime: + update_container = True + + if update_container: + # need to copy from cache to container + # copy2 also copies the permissions + shutil.copy2(cache_target_path, container_target_path) + + return update_container + + def update_container_from_cache(self, sreq): + """ + Iterates over the storlet name and its dependencies appearing + + in the invocation data and make sure they are brought to the + local cache, and from there to the container. + Uses the bring_from_cache auxiliary function. + + :params sreq: ContainerStorletRequest instance + :returns: True if the container was updated + """ + # where at the host side, reside the storlet containers + storlet_path = self.paths.host_storlet_base_dir + if not os.path.exists(storlet_path): + os.makedirs(storlet_path, 0o755) + + # Iterate over storlet and dependencies, and make sure + # they are updated within the container. + # return True if any of them wea actually + # updated within the container + container_updated = False + + updated = self.bring_from_cache(sreq.storlet_id, sreq, True) + container_updated = container_updated or updated + + for dep in sreq.dependencies: + updated = self.bring_from_cache(dep, sreq, False) + container_updated = container_updated or updated + + return container_updated diff --git a/storlets/gateway/gateways/container/runtime.py b/storlets/gateway/gateways/container/runtime.py new file mode 100644 index 00000000..0b0cdb4f --- /dev/null +++ b/storlets/gateway/gateways/container/runtime.py @@ -0,0 +1,744 @@ +# Copyright (c) 2015, 2016 OpenStack Foundation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import abc +import errno +import os +import select +import stat +import sys +import time + +import eventlet +import json +from contextlib import contextmanager + +from storlets.sbus.client import SBusClient +from storlets.sbus.client.exceptions import SBusClientException +from storlets.sbus.datagram import SBusFileDescriptor +from storlets.sbus import file_description as sbus_fd +from storlets.gateway.common.exceptions import StorletRuntimeException, \ + StorletTimeout +from storlets.gateway.common.logger import StorletLogger +from storlets.gateway.common.stob import StorletResponse + +MAX_METADATA_SIZE = 4096 + + +eventlet.monkey_patch() + + +class RunTimePaths(object): + """ + The Storlet Engine need to be access stuff located in many paths: + + 1. The various communication channels represented as pipes in the + filesystem + 2. Directories where to place Storlets + 3. Directories where to place logs + + Communication channels + ---------------------- + The RunTimeSandbox communicates with the Sandbox via two types of pipes + 1. factory pipe - defined per scope, used for communication with the + sandbox + for e.g. start/stop a storlet daemon + 2. Storlet pipe - defined per scope and Storlet, used for communication + with a storlet daemon, e.g. to call the invoke API + + Each pipe type has two paths: + 1. A path that is inside the sandbox + 2. A path that is outside of the sandbox or at the host side. As such + this path is prefixed by 'host_' + + Thus, we have the following 4 paths of interest: + 1. sandbox_factory_pipe_path + 2. host_factory_pipe_path + 3. sandbox_storlet_pipe_path + 4. host_storlet_pipe_path + + Our implementation uses the following path structure for the various pipes: + In the host, all pipes belonging to a given scope are prefixed by + /, where comes from the configuration + Thus: + host_factory_pipe_path is of the form //factory_pipe + host_storlet_pipe_path is of the form // + + In The sandbox side + sandbox_factory_pipe_path is of the form /mnt/channels/factory_pipe + sandbox_storlet_pipe_path is of the form /mnt/channels/ + + Storlets Locations + ------------------ + The Storlet binaries are accessible from the sandbox using a mounted + directory. + This directory is called the storlet directories. + On the host side it is of the form // + On the sandbox side it is of the form /home/swift/ + comes from the configuration + is the prefix of the jar. + + Logs + ---- + Logs are located in paths of the form: + //.log + """ + + def __init__(self, scope, conf): + """ + Construct RunTimePaths instance + + :param scope: scope name to be used as container name + :param conf: gateway conf + """ + self.scope = scope + self.factory_pipe_name = 'factory_pipe' + self.sandbox_pipe_dir = '/mnt/channels' + + self.sandbox_storlet_base_dir = '/home/swift' + self.host_root_dir = conf.get('host_root', '/var/lib/storlets') + self.host_pipe_root_dir = \ + conf.get('pipes_dir', + os.path.join(self.host_root_dir, 'pipes', 'scopes')) + self.host_storlet_root_dir = \ + conf.get('storlets_dir', + os.path.join(self.host_root_dir, 'storlets', 'scopes')) + self.host_log_root_dir = \ + conf.get('log_dir', + os.path.join(self.host_root_dir, 'logs', 'scopes')) + self.host_cache_root_dir = \ + conf.get('cache_dir', + os.path.join(self.host_root_dir, 'cache', 'scopes')) + self.host_restart_script_dir = \ + conf.get('script_dir', + os.path.join(self.host_root_dir, 'scripts')) + + self.host_storlet_native_lib_dir = '/usr/local/lib/storlets' + self.sandbox_storlet_native_lib_dir = '/usr/local/lib/storlets' + self.host_storlet_native_bin_dir = '/usr/local/libexec/storlets' + self.sandbox_storlet_native_bin_dir = '/usr/local/libexec/storlets' + + @property + def host_pipe_dir(self): + return os.path.join(self.host_pipe_root_dir, self.scope) + + def create_host_pipe_dir(self): + path = self.host_pipe_dir + if not os.path.exists(path): + os.makedirs(path) + # 0777 should be 0700 when we get user namespaces in container + os.chmod(path, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) + return path + + @property + def host_factory_pipe(self): + return os.path.join(self.host_pipe_dir, self.factory_pipe_name) + + @property + def sandbox_factory_pipe(self): + return os.path.join(self.sandbox_pipe_dir, self.factory_pipe_name) + + def get_host_storlet_pipe(self, storlet_id): + return os.path.join(self.host_pipe_dir, storlet_id) + + def get_sbox_storlet_pipe(self, storlet_id): + return os.path.join(self.sandbox_pipe_dir, storlet_id) + + def get_sbox_storlet_dir(self, storlet_id): + return os.path.join(self.sandbox_storlet_base_dir, storlet_id) + + @property + def host_storlet_base_dir(self): + return os.path.join(self.host_storlet_root_dir, self.scope) + + def get_host_storlet_dir(self, storlet_id): + return os.path.join(self.host_storlet_base_dir, storlet_id) + + def get_host_slog_path(self, storlet_id): + return os.path.join( + self.host_log_root_dir, self.scope, storlet_id, + 'storlet_invoke.log') + + @property + def host_storlet_cache_dir(self): + return os.path.join(self.host_cache_root_dir, self.scope, 'storlet') + + @property + def host_dependency_cache_dir(self): + return os.path.join(self.host_cache_root_dir, self.scope, 'dependency') + + +class RunTimeSandbox(object, metaclass=abc.ABCMeta): + """ + The RunTimeSandbox represents a reusable per scope sandbox. + + The sandbox is reusable in the sense that it can run several storlet + daemons. + + The following methods are supported: + ping - pings the sandbox for liveness + wait - wait for the sandbox to be ready for processing commands + restart - restart the sandbox + start_storlet_daemon - start a daemon for a given storlet + stop_storlet_daemon - stop a daemon of a given storlet + get_storlet_daemon_status - test if a given storlet daemon is running + """ + + def __init__(self, scope, conf, logger): + """ + :param scope: scope name to be used as container name + :param conf: gateway conf + :param logger: logger instance + """ + self.paths = RunTimePaths(scope, conf) + self.scope = scope + + self.sandbox_ping_interval = \ + float(conf.get('sandbox_ping_interval', 0.5)) + self.sandbox_stop_timeout = \ + float(conf.get('stop_linux_container_timeout', 1)) + self.sandbox_wait_timeout = \ + float(conf.get('restart_linux_container_timeout', 10)) + + self.container_image_namespace = \ + conf.get('docker_repo', conf.get('container_image_namespace')) + self.container_image_name_prefix = 'tenant' + + # TODO(add line in conf) + self.storlet_daemon_thread_pool_size = \ + int(conf.get('storlet_daemon_thread_pool_size', 5)) + self.storlet_daemon_factory_debug_level = \ + conf.get('storlet_daemon_factory_debug_level', 'DEBUG') + self.storlet_daemon_debug_level = \ + conf.get('storlet_daemon_debug_level', 'DEBUG') + + # TODO(change logger's route if possible) + self.logger = logger + + self.default_container_image_name = conf.get( + 'default_docker_image_name', + conf.get('default_container_image_name', 'storlet_engine_image') + ) + + self.max_containers_per_node = \ + int(conf.get('max_containers_per_node', 0)) + + self.container_cpu_period = int(conf.get('container_cpu_period', 0)) + self.container_cpu_quota = int(conf.get('container_cpu_quota', 0)) + self.container_mem_limit = conf.get('container_mem_limit', 0) + # NOTE(tkajinam): memory limit can be a string with unit like 1024m + try: + self.container_mem_limit = int(self.container_mem_limit) + except TypeError: + pass + self.container_cpuset_cpus = conf.get('container_cpuset_cpus') + self.container_cpuset_mems = conf.get('container_cpuset_mems') + self.container_pids_limit = int(conf.get('container_pids_limit', 0)) + + def ping(self): + """ + Ping to daemon factory process inside container + + :returns: True when the daemon factory is responsive + False when the daemon factory is not responsive or it fails + to send command to the process + """ + pipe_path = self.paths.host_factory_pipe + client = SBusClient(pipe_path) + try: + resp = client.ping() + if not resp.status: + self.logger.error('Failed to ping to daemon factory: %s' % + resp.message) + return resp.status + except SBusClientException: + return False + + def wait(self): + """ + Wait while scope's sandbox is starting + + :raises StorletTimeout: the sandbox has not started in + sandbox_wait_timeout + """ + with StorletTimeout(self.sandbox_wait_timeout): + while not self.ping(): + time.sleep(self.sandbox_ping_interval) + + @abc.abstractmethod + def _restart(self, container_image_name): + """ + Restarts the scope's sandbox using the specified container image + + :param container_image_name: name of the container image to start + :raises StorletRuntimeException: when failed to restart the container + """ + pass + + def restart(self): + """ + Restarts the scope's sandbox + + """ + self.paths.create_host_pipe_dir() + + container_image_name = self.scope + try: + self._restart(container_image_name) + self.wait() + except StorletTimeout: + raise + except StorletRuntimeException: + # We were unable to start a container from the tenant image. + # Let us try to start a container from default image. + self.logger.exception("Failed to start a container from " + "tenant image %s" % container_image_name) + + self.logger.info("Trying to start a container from default " + "image: %s" % self.default_container_image_name) + self._restart(self.default_container_image_name) + self.wait() + + def start_storlet_daemon( + self, spath, storlet_id, language, language_version=None): + """ + Start SDaemon process in the scope's sandbox + """ + pipe_path = self.paths.host_factory_pipe + client = SBusClient(pipe_path) + try: + resp = client.start_daemon( + language.lower(), spath, storlet_id, + self.paths.get_sbox_storlet_pipe(storlet_id), + self.storlet_daemon_debug_level, + self.storlet_daemon_thread_pool_size, + language_version) + + if not resp.status: + self.logger.error('Failed to start storlet daemon: %s' % + resp.message) + raise StorletRuntimeException('Daemon start failed') + except SBusClientException: + raise StorletRuntimeException('Daemon start failed') + + def stop_storlet_daemon(self, storlet_id): + """ + Stop SDaemon process in the scope's sandbox + """ + pipe_path = self.paths.host_factory_pipe + client = SBusClient(pipe_path) + try: + resp = client.stop_daemon(storlet_id) + if not resp.status: + self.logger.error('Failed to stop storlet daemon: %s' % + resp.message) + raise StorletRuntimeException('Daemon stop failed') + except SBusClientException: + raise StorletRuntimeException('Daemon stop failed') + + def get_storlet_daemon_status(self, storlet_id): + """ + Get the status of SDaemon process in the scope's sandbox + """ + pipe_path = self.paths.host_factory_pipe + client = SBusClient(pipe_path) + try: + resp = client.daemon_status(storlet_id) + if resp.status: + return 1 + else: + self.logger.error('Failed to get status about storlet ' + 'daemon: %s' % resp.message) + return 0 + except SBusClientException: + return -1 + + def _get_storlet_classpath(self, storlet_main, storlet_id, dependencies): + """ + Get classpath required to run storlet application + + :param storlet_main: Main class name of the storlet + :param storlet_id: Name of the storlet file + :param dependencies: A list of dependency file + :returns: classpath string + """ + class_path = os.path.join( + self.paths.get_sbox_storlet_dir(storlet_main), storlet_id) + + dep_path_list = \ + [os.path.join(self.paths.get_sbox_storlet_dir(storlet_main), dep) + for dep in dependencies] + + return class_path + ':' + ':'.join(dep_path_list) + + def activate_storlet_daemon(self, sreq, cache_updated=True): + storlet_daemon_status = \ + self.get_storlet_daemon_status(sreq.storlet_main) + if (storlet_daemon_status == -1): + # We failed to send a command to the factory. + # Best we can do is execute the container. + self.logger.debug('Failed to check the storlet daemon status. ' + 'Restart its container') + self.restart() + storlet_daemon_status = 0 + + if (cache_updated is True and storlet_daemon_status == 1): + # The cache was updated while the daemon is running we need to + # stop it. + self.logger.debug('The cache was updated, and the storlet daemon ' + 'is running. Stopping daemon') + + try: + self.stop_storlet_daemon(sreq.storlet_main) + except StorletRuntimeException: + self.logger.warning('Failed to stop the storlet daemon. ' + 'Restart its container') + self.restart() + else: + self.logger.debug('Deamon stopped') + storlet_daemon_status = 0 + + if (storlet_daemon_status == 0): + self.logger.debug('Going to start the storlet daemon!') + + # TODO(takashi): This is not needed for python application + classpath = self._get_storlet_classpath( + sreq.storlet_main, sreq.storlet_id, sreq.dependencies) + + self.start_storlet_daemon( + classpath, sreq.storlet_main, sreq.storlet_language, + sreq.storlet_language_version) + self.logger.debug('Daemon started') + + +class StorletInvocationProtocol(object): + """ + StorletInvocationProtocol class + + This class serves communictaion with a container to run an + application + + :param srequest: StorletRequest instance + :param storlet_pipe_path: path string to pipe + :param storlet_logger_path: path string to log file + :param timeout: integer of timeout for waiting the resp from container + :param logger: logger instance + :param extra_sources (WIP): a list of StorletRequest instances + which keep data_iter for adding extra source + as data stream + """ + def __init__(self, srequest, storlet_pipe_path, storlet_logger_path, + timeout, logger, extra_sources=None): + self.srequest = srequest + self.storlet_pipe_path = storlet_pipe_path + self.storlet_logger = StorletLogger(storlet_logger_path) + self.logger = logger + self.timeout = timeout + + # local side file descriptors + self.data_read_fd = None + self.data_write_fd = None + self.metadata_read_fd = None + self.metadata_write_fd = None + self.task_id = None + self._input_data_read_fd = None + self._input_data_write_fd = None + + self.extra_data_sources = [] + extra_sources = extra_sources or [] + for source in extra_sources: + if source.has_fd: + # TODO(kota_): it may be data_fd in the future. + raise Exception( + 'extra_source no requires data_fd just data_iter') + self.extra_data_sources.append( + {'read_fd': None, 'write_fd': None, + 'user_metadata': source.user_metadata, + 'data_iter': source.data_iter}) + + @property + def input_data_read_fd(self): + """ + File descriptor to read the input body content + """ + if self.srequest.has_fd: + return self.srequest.data_fd + else: + return self._input_data_read_fd + + @property + def remote_fds(self): + """ + A list of sbus file descriptors passed to remote side + """ + storlets_metadata = {} + if self.srequest.has_range: + storlets_metadata.update( + {'start': str(self.srequest.start), + 'end': str(self.srequest.end)}) + + fds = [SBusFileDescriptor(sbus_fd.SBUS_FD_INPUT_OBJECT, + self.input_data_read_fd, + storage_metadata=self.srequest.user_metadata, + storlets_metadata=storlets_metadata), + SBusFileDescriptor(sbus_fd.SBUS_FD_OUTPUT_OBJECT, + self.data_write_fd), + SBusFileDescriptor(sbus_fd.SBUS_FD_OUTPUT_OBJECT_METADATA, + self.metadata_write_fd), + SBusFileDescriptor(sbus_fd.SBUS_FD_LOGGER, + self.storlet_logger.getfd())] + + for source in self.extra_data_sources: + fd = SBusFileDescriptor( + sbus_fd.SBUS_FD_INPUT_OBJECT, + source['read_fd'], + storage_metadata=source['user_metadata']) + fds.append(fd) + + return fds + + @contextmanager + def _activate_invocation_descriptors(self): + """ + Contextmanager about file descriptors used in storlet invocation + + NOTE: This context manager now only closes remote side fds, + so you should close local side fds + """ + self._prepare_invocation_descriptors() + try: + yield + finally: + self._close_remote_side_descriptors() + + def _prepare_invocation_descriptors(self): + """ + Create all pipse used for Storlet execution + """ + if not self.srequest.has_fd: + self._input_data_read_fd, self._input_data_write_fd = os.pipe() + self.data_read_fd, self.data_write_fd = os.pipe() + self.metadata_read_fd, self.metadata_write_fd = os.pipe() + + for source in self.extra_data_sources: + source['read_fd'], source['write_fd'] = os.pipe() + + def _safe_close(self, fds): + """ + Make sure that all of the file descriptors get closed + + :param fds: a list of file descriptors + """ + for fd in fds: + try: + os.close(fd) + except OSError as err: + if err.errno != errno.EBADF: + raise + # TODO(kota_): fd might be closed already, so if already + # closed, OSError will be raised. we need more refactor to + # keep clean the file descriptors. + pass + + def _close_remote_side_descriptors(self): + """ + Close all of the container side descriptors + """ + fds = [self.data_write_fd, self.metadata_write_fd] + if not self.srequest.has_fd: + fds.append(self.input_data_read_fd) + fds.extend([source['read_fd'] for source in self.extra_data_sources]) + for fd in fds: + os.close(fd) + + def _close_local_side_descriptors(self): + """ + Close all of the host side descriptors + """ + fds = [self.data_read_fd, self.metadata_read_fd] + fds.extend([source['write_fd'] for source in self.extra_data_sources]) + self._safe_close(fds) + + def _cancel(self): + """ + Cancel on-going storlet execution + """ + client = SBusClient(self.storlet_pipe_path) + try: + resp = client.cancel(self.task_id) + if not resp.status: + raise StorletRuntimeException('Failed to cancel task') + except SBusClientException: + raise StorletRuntimeException('Failed to cancel task') + + def _invoke(self): + """ + Send an execution command to the remote daemon factory + """ + with self.storlet_logger.activate(),\ + self._activate_invocation_descriptors(): + self._send_execute_command() + + def _send_execute_command(self): + """ + Send execute command to the remote daemon factory to invoke storlet + execution + """ + client = SBusClient(self.storlet_pipe_path) + try: + resp = client.execute(self.srequest.params, self.remote_fds) + if not resp.status: + raise StorletRuntimeException("Failed to send execute command") + + if not resp.task_id: + raise StorletRuntimeException("Missing task id") + else: + self.task_id = resp.task_id + except SBusClientException: + raise StorletRuntimeException("Failed to send execute command") + + def _wait_for_read_with_timeout(self, fd): + """ + Wait while the read file descriptor gets ready + + :param fd: File descriptor to read + :raises StorletTimeout: Exception raised when it times out to cancel + the existing task + :raises StorletRuntimeException: Exception raised when it fails to + cancel the existing task + """ + try: + with StorletTimeout(self.timeout): + r, w, e = select.select([fd], [], []) + except StorletTimeout: + exc_type, exc_value, exc_traceback = sys.exc_info() + + # When there is a task already running, we should cancel it. + if self.task_id: + try: + self._cancel() + except StorletRuntimeException: + self.logger.warning( + 'Task %s timed out, but failed to get canceled' + % self.task_id) + pass + + if exc_value is None: + exc_value = exc_traceback + if exc_value.__traceback__ is not exc_traceback: + raise exc_value.with_traceback(exc_traceback) + raise exc_value + + if fd not in r: + raise StorletRuntimeException('Read fd is not ready') + + def _read_metadata(self): + """ + Read metadata in the storlet execution result from fd + + :returns: a dict of metadata + """ + self._wait_for_read_with_timeout(self.metadata_read_fd) + flat_json = os.read(self.metadata_read_fd, MAX_METADATA_SIZE) + os.close(self.metadata_read_fd) + try: + return json.loads(flat_json) + except ValueError: + self.logger.exception('Failed to load metadata from json') + raise StorletRuntimeException('Got invalid format about metadata') + + def _wait_for_write_with_timeout(self, fd): + """ + Wait while the write file descriptor gets ready + + :param fd: File descriptor to write + :raises StorletTimeout: Exception raised when it times out to cancel + the existing task + :raises StorletRuntimeException: Exception raised when it fails to + cancel the existing task + """ + with StorletTimeout(self.timeout): + r, w, e = select.select([], [fd], []) + if fd not in w: + raise StorletRuntimeException('Write fd is not ready') + + def _close_input_data_descriptors(self): + fds = [self._input_data_read_fd, self._input_data_write_fd] + self._safe_close(fds) + + def communicate(self): + try: + self._invoke() + + if not self.srequest.has_fd: + self._wait_for_write_with_timeout(self._input_data_write_fd) + + # We do the writing in a different thread. + # Otherwise, we can run into the following deadlock + # 1. middleware writes to Storlet + # 2. Storlet reads and starts to write metadata and then data + # 3. middleware continues writing + # 4. Storlet continues writing and gets stuck as middleware + # is busy writing, but still not consuming the reader end + # of the Storlet writer. + eventlet.spawn_n(self._write_input_data, + self._input_data_write_fd, + self.srequest.data_iter) + + for source in self.extra_data_sources: + # NOTE(kota_): not sure right now if using eventlet.spawn_n is + # right way. GreenPool is better? I don't get + # whole for the dead lock described in above. + self._wait_for_write_with_timeout(source['write_fd']) + eventlet.spawn_n(self._write_input_data, + source['write_fd'], + source['data_iter']) + + out_md = self._read_metadata() + self._wait_for_read_with_timeout(self.data_read_fd) + + return StorletResponse(out_md, data_fd=self.data_read_fd, + cancel=self._cancel) + except Exception: + self._close_local_side_descriptors() + if not self.srequest.has_fd: + self._close_input_data_descriptors() + raise + + @contextmanager + def _open_writer(self, fd): + with os.fdopen(fd, 'wb') as writer: + yield writer + + def _write_input_data(self, fd, data_iter): + try: + # double try/except block saving from unexpected errors + try: + with self._open_writer(fd) as writer: + for chunk in data_iter: + with StorletTimeout(self.timeout): + writer.write(chunk) + except (OSError, TypeError, ValueError): + self.logger.exception('fdopen failed') + except IOError: + # this will happen at sort of broken pipe while writer.write + self.logger.exception('IOError with writing fd %s' % fd) + except StorletTimeout: + self.logger.exception( + 'Timeout (%s)s with writing fd %s' % (self.timeout, fd)) + except Exception: + # _write_input_data is designed to run eventlet thread + # so that we should catch and suppress it here + self.logger.exception('Unexpected error at writing input data') diff --git a/storlets/gateway/gateways/docker/__init__.py b/storlets/gateway/gateways/docker/__init__.py index d62a93ad..86d1ed89 100644 --- a/storlets/gateway/gateways/docker/__init__.py +++ b/storlets/gateway/gateways/docker/__init__.py @@ -13,8 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -from storlets.gateway.gateways.docker.gateway import StorletGatewayDocker +from storlets.gateway.gateways.docker.gateway import DockerStorletGateway __all__ = [ - 'StorletGatewayDocker', + 'DockerStorletGateway', ] diff --git a/storlets/gateway/gateways/docker/gateway.py b/storlets/gateway/gateways/docker/gateway.py index 602bde31..3d281921 100644 --- a/storlets/gateway/gateways/docker/gateway.py +++ b/storlets/gateway/gateways/docker/gateway.py @@ -14,370 +14,9 @@ # See the License for the specific language governing permissions and # limitations under the License. -import os -import shutil - -from storlets.gateway.common.stob import StorletRequest -from storlets.gateway.gateways.base import StorletGatewayBase -from storlets.gateway.gateways.docker.runtime import RunTimePaths, \ - RunTimeSandbox, StorletInvocationProtocol +from storlets.gateway.gateways.container.gateway import StorletGatewayContainer +from storlets.gateway.gateways.docker.runtime import DockerRunTimeSandbox -"""--------------------------------------------------------------------------- -The Storlet Gateway API -The API is made of: -(1) The class DockerStorletRequest. This encapsulates what goes in and comes - out of the gateway -(2) The StorletGateway is the Docker flavor of the StorletGateway API: - validate_storlet_registration - validate_dependency_registration - invocation_flow ----------------------------------------------------------------------------""" - - -class DockerStorletRequest(StorletRequest): - """ - The DockerStorletRequest class represents a request to be processed by the - storlet the request is derived from the Swift request and - essentially consists of: - 1. A data stream to be processed - 2. Metadata identifying the stream - """ - - # TODO(takashi): Some of following parameters should be defined common - # parameters for StorletRequest - required_options = ['storlet_main', 'storlet_language', 'file_manager'] - - def __init__(self, storlet_id, params, user_metadata, data_iter=None, - data_fd=None, options=None): - """ - :param storlet_id: storlet id - :param params: execution parameters - :param user_metadata: user metadata - :param data_iter: an iterator to read data - :param data_fd: a file descriptor to read data - :param options: a dictionaly which stores gateway specific options. - :raises ValueError: when some of the required options (storlet_main - and file_manager) are missing - """ - super(DockerStorletRequest, self).__init__( - storlet_id, params, user_metadata, data_iter, data_fd, - options=options) - - self.generate_log = self.options.get('generate_log', False) - - self.storlet_main = self.options['storlet_main'] - self.storlet_language = self.options['storlet_language'] - self.storlet_language_version = \ - self.options.get('storlet_language_version') - - if self.options.get('storlet_dependency'): - self.dependencies = [ - x.strip() for x - in self.options['storlet_dependency'].split(',') - if x.strip()] - else: - self.dependencies = [] - - self.file_manager = self.options['file_manager'] - - self.start = self.options.get('range_start') - self.end = self.options.get('range_end') - - @property - def has_range(self): - """ - Whether the input range is given - """ - return self.start is not None and self.end is not None - - -class StorletGatewayDocker(StorletGatewayBase): - - request_class = DockerStorletRequest - - def __init__(self, conf, logger, scope): - """ - :param conf: a dict for gateway conf - :param logger: a logger instance - :param scope: scope name to identify the container - """ - super(StorletGatewayDocker, self).__init__(conf, logger, scope) - self.storlet_timeout = float(self.conf.get('storlet_timeout', 40)) - self.paths = RunTimePaths(scope, conf) - - @classmethod - def validate_storlet_registration(cls, params, name): - """ - Validate required parameters for storlet file - - :param params: parameters related to the storlet file - :param name: name of the storlet file - :raises ValueError: if some of the required parameters are missing, - or some of the parameters are invalid - """ - mandatory = ['Language', 'Interface-Version', 'Object-Metadata', - 'Main'] - cls._check_mandatory_params(params, mandatory) - - if params['Language'].lower() == 'java': - if '-' not in name or '.' not in name: - raise ValueError('Storlet name is incorrect') - elif params['Language'].lower() == 'python': - try: - version = float(params.get('Language-Version', 3)) - except ValueError: - raise ValueError('Language-Version is invalid') - - if int(version) != 3: - # TODO(kota_): more strict version check should be nice. - raise ValueError('Not supported version specified') - - if name.endswith('.py'): - cls_name = params['Main'] - if not cls_name.startswith(name[:-3] + '.'): - raise ValueError('Main class should be included in ' - 'storlet file') - - if len(cls_name.split('.')) != 2: - raise ValueError('Submodule is currently not supported') - # TODO(takashi): Add support for sdist tar.gz - else: - raise ValueError('Storlet name is incorrect') - else: - raise ValueError('Unsupported Language') - - dep = params.get('Dependency') - if dep: - deps = dep.split(',') - if name in deps: - raise ValueError('Using the same name for storlet and ' - 'dependency is not allowed') - if len(deps) != len(set(deps)): - raise ValueError('Duplicated name in dependencies') - - @classmethod - def validate_dependency_registration(cls, params, name): - """ - Validate required parameters for dependency file - - :param params: parameters related to the dependency file - :param name: name of the dependency file - :raises ValueError: if some of the required parameters are missing, - or some of the parameters are invalid - """ - mandatory = ['Dependency-Version'] - cls._check_mandatory_params(params, mandatory) - - perm = params.get('Dependency-Permissions') - if perm is not None: - try: - perm_int = int(perm, 8) - except ValueError: - raise ValueError('Dependency permission is incorrect') - if (perm_int & int('600', 8)) != int('600', 8): - raise ValueError('The owner should have rw permission') - - @classmethod - def _check_mandatory_params(cls, params, mandatory): - """ - Ensure that we have all mandatory parameters in the given parameters - - :param params: file parameters - :param mandatory: required parameters - :raises ValueError: if some of the required parameters are missing - """ - for md in mandatory: - if md not in params: - raise ValueError('Mandatory parameter is missing' - ': {0}'.format(md)) - - def invocation_flow(self, sreq, extra_sources=None): - """ - Invoke the backend protocol with gateway - - :param sreq: StorletRequest instance - :param extra_sources (WIP): A list of StorletRequest instance to gather - as extra resoureces to feed to storlet - container as data source - :return: StorletResponse instance - """ - run_time_sbox = RunTimeSandbox(self.scope, self.conf, self.logger) - docker_updated = self.update_docker_container_from_cache(sreq) - run_time_sbox.activate_storlet_daemon(sreq, docker_updated) - self._add_system_params(sreq) - - slog_path = self.paths.get_host_slog_path(sreq.storlet_main) - storlet_pipe_path = \ - self.paths.get_host_storlet_pipe(sreq.storlet_main) - - sprotocol = StorletInvocationProtocol(sreq, - storlet_pipe_path, - slog_path, - self.storlet_timeout, - self.logger, - extra_sources=extra_sources) - - sresp = sprotocol.communicate() - - self._upload_storlet_logs(slog_path, sreq) - - return sresp - - def _add_system_params(self, sreq): - """ - Adds Storlet engine specific parameters to the invocation - - currently, this consists only of the execution path of the - Storlet within the Docker container. - - :params params: Request parameters - """ - sreq.params['storlet_execution_path'] = self. \ - paths.get_sbox_storlet_dir(sreq.storlet_main) - - def _upload_storlet_logs(self, slog_path, sreq): - """ - Upload storlet execution log as a swift object - - :param slog_path: target path - :params sreq: DockerStorletRequest instance - """ - if sreq.generate_log: - with open(slog_path, 'rb') as logfile: - storlet_name = sreq.storlet_id.split('-')[0] - log_obj_name = '%s.log' % storlet_name - sreq.file_manager.put_log(log_obj_name, logfile) - - def bring_from_cache(self, obj_name, sreq, is_storlet): - """ - Auxiliary function that: - - (1) Brings from Swift obj_name, either this is in a - storlet or a storlet dependency. - (2) Copies from local cache into the Docker conrainer - If this is a Storlet then also validates that the cache is updated - with most recent copy of the Storlet compared to the copy residing in - Swift. - - :params obj_name: name of the object - :params sreq: DockerStorletRequest instance - :params is_storlet: True if the object is a storlet object - False if the object is a dependency object - :returns: Whether the Docker container was updated with obj_name - """ - # Determine the cache we are to work with - # e.g. dependency or storlet - if is_storlet: - cache_dir = self.paths.host_storlet_cache_dir - get_func = sreq.file_manager.get_storlet - else: - cache_dir = self.paths.host_dependency_cache_dir - get_func = sreq.file_manager.get_dependency - - if not os.path.exists(cache_dir): - os.makedirs(cache_dir, 0o700) - - # cache_target_path is the actual object we need to deal with - # e.g. a concrete storlet or dependency we need to bring/update - cache_target_path = os.path.join(cache_dir, obj_name) - - # Determine if we need to update the cache for cache_target_path - # We default for no - update_cache = False - - # If it does not exist in cache, we obviously need to bring - if not os.path.isfile(cache_target_path): - update_cache = True - elif is_storlet: - # The cache_target_path exists, we test if it is up-to-date - # with the metadata we got. - # We mention that this is currently applicable for storlets - # only, and not for dependencies. - # This will change when we will head dependencies as well - fstat = os.stat(cache_target_path) - storlet_or_size = int( - sreq.options['storlet_content_length'].rstrip("L")) - storlet_or_time = float(sreq.options['storlet_x_timestamp']) - b_storlet_size_changed = fstat.st_size != storlet_or_size - b_storlet_file_updated = float(fstat.st_mtime) < storlet_or_time - if b_storlet_size_changed or b_storlet_file_updated: - update_cache = True - - if update_cache: - # If the cache needs to be updated, then get on with it - # bring the object from storage - data_iter, perm = get_func(obj_name) - - if perm: - perm = int(perm, 8) & 0o700 - else: - perm = 0o600 - - # TODO(takashi): Do not directly write to target path - with open(cache_target_path, 'wb') as fn: - os.chmod(cache_target_path, perm) - for data in data_iter: - fn.write(data) - - # The node's local cache is now updated. - # We now verify if we need to update the - # Docker container itself. - # The Docker container needs to be updated if: - # 1. The Docker container does not hold a copy of the object - # 2. The Docker container holds an older version of the object - update_docker = False - docker_storlet_path = \ - self.paths.get_host_storlet_dir(sreq.storlet_main) - docker_target_path = os.path.join(docker_storlet_path, obj_name) - - if not os.path.exists(docker_storlet_path): - os.makedirs(docker_storlet_path, 0o700) - update_docker = True - elif not os.path.isfile(docker_target_path): - update_docker = True - else: - fstat_cached = os.stat(cache_target_path) - fstat_docker = os.stat(docker_target_path) - if fstat_cached.st_size != fstat_docker.st_size: - update_docker = True - if fstat_cached.st_mtime < fstat_docker.st_mtime: - update_docker = True - - if update_docker: - # need to copy from cache to docker - # copy2 also copies the permissions - shutil.copy2(cache_target_path, docker_target_path) - - return update_docker - - def update_docker_container_from_cache(self, sreq): - """ - Iterates over the storlet name and its dependencies appearing - - in the invocation data and make sure they are brought to the - local cache, and from there to the Docker container. - Uses the bring_from_cache auxiliary function. - - :params sreq: DockerStorletRequest instance - :returns: True if the Docker container was updated - """ - # where at the host side, reside the storlet containers - storlet_path = self.paths.host_storlet_base_dir - if not os.path.exists(storlet_path): - os.makedirs(storlet_path, 0o755) - - # Iterate over storlet and dependencies, and make sure - # they are updated within the Docker container. - # return True if any of them wea actually - # updated within the Docker container - docker_updated = False - - updated = self.bring_from_cache(sreq.storlet_id, sreq, True) - docker_updated = docker_updated or updated - - for dep in sreq.dependencies: - updated = self.bring_from_cache(dep, sreq, False) - docker_updated = docker_updated or updated - - return docker_updated +class DockerStorletGateway(StorletGatewayContainer): + sandbox = DockerRunTimeSandbox diff --git a/storlets/gateway/gateways/docker/runtime.py b/storlets/gateway/gateways/docker/runtime.py index 9d44119b..295c5685 100644 --- a/storlets/gateway/gateways/docker/runtime.py +++ b/storlets/gateway/gateways/docker/runtime.py @@ -13,283 +13,17 @@ # See the License for the specific language governing permissions and # limitations under the License. -import errno import os -import select -import stat -import sys -import time + import docker import docker.errors from docker.types import Mount as DockerMount -import eventlet -import json -from contextlib import contextmanager - -from storlets.sbus.client import SBusClient -from storlets.sbus.client.exceptions import SBusClientException -from storlets.sbus.datagram import SBusFileDescriptor -from storlets.sbus import file_description as sbus_fd -from storlets.gateway.common.exceptions import StorletRuntimeException, \ - StorletTimeout -from storlets.gateway.common.logger import StorletLogger -from storlets.gateway.common.stob import StorletResponse - -MAX_METADATA_SIZE = 4096 +from storlets.gateway.common.exceptions import StorletRuntimeException +from storlets.gateway.gateways.container.runtime import RunTimeSandbox -eventlet.monkey_patch() - - -"""--------------------------------------------------------------------------- -Sandbox API -""" - - -class RunTimePaths(object): - """ - The Storlet Engine need to be access stuff located in many paths: - - 1. The various communication channels represented as pipes in the - filesystem - 2. Directories where to place Storlets - 3. Directories where to place logs - - Communication channels - ---------------------- - The RunTimeSandbox communicates with the Sandbox via two types of pipes - 1. factory pipe - defined per scope, used for communication with the - sandbox - for e.g. start/stop a storlet daemon - 2. Storlet pipe - defined per scope and Storlet, used for communication - with a storlet daemon, e.g. to call the invoke API - - Each pipe type has two paths: - 1. A path that is inside the sandbox - 2. A path that is outside of the sandbox or at the host side. As such - this path is prefixed by 'host_' - - Thus, we have the following 4 paths of interest: - 1. sandbox_factory_pipe_path - 2. host_factory_pipe_path - 3. sandbox_storlet_pipe_path - 4. host_storlet_pipe_path - - Our implementation uses the following path structure for the various pipes: - In the host, all pipes belonging to a given scope are prefixed by - /, where comes from the configuration - Thus: - host_factory_pipe_path is of the form //factory_pipe - host_storlet_pipe_path is of the form // - - In The sandbox side - sandbox_factory_pipe_path is of the form /mnt/channels/factory_pipe - sandbox_storlet_pipe_path is of the form /mnt/channels/ - - Storlets Locations - ------------------ - The Storlet binaries are accessible from the sandbox using a mounted - directory. - This directory is called the storlet directories. - On the host side it is of the form // - On the sandbox side it is of the form /home/swift/ - comes from the configuration - is the prefix of the jar. - - Logs - ---- - Logs are located in paths of the form: - //.log - """ - - def __init__(self, scope, conf): - """ - Construct RunTimePaths instance - - :param scope: scope name to be used as container name - :param conf: gateway conf - """ - self.scope = scope - self.factory_pipe_name = 'factory_pipe' - self.sandbox_pipe_dir = '/mnt/channels' - - self.sandbox_storlet_base_dir = '/home/swift' - self.host_root_dir = conf.get('host_root', '/var/lib/storlets') - self.host_pipe_root_dir = \ - conf.get('pipes_dir', - os.path.join(self.host_root_dir, 'pipes', 'scopes')) - self.host_storlet_root_dir = \ - conf.get('storlets_dir', - os.path.join(self.host_root_dir, 'storlets', 'scopes')) - self.host_log_root_dir = \ - conf.get('log_dir', - os.path.join(self.host_root_dir, 'logs', 'scopes')) - self.host_cache_root_dir = \ - conf.get('cache_dir', - os.path.join(self.host_root_dir, 'cache', 'scopes')) - self.host_restart_script_dir = \ - conf.get('script_dir', - os.path.join(self.host_root_dir, 'scripts')) - - self.host_storlet_native_lib_dir = '/usr/local/lib/storlets' - self.sandbox_storlet_native_lib_dir = '/usr/local/lib/storlets' - self.host_storlet_native_bin_dir = '/usr/local/libexec/storlets' - self.sandbox_storlet_native_bin_dir = '/usr/local/libexec/storlets' - - @property - def host_pipe_dir(self): - return os.path.join(self.host_pipe_root_dir, self.scope) - - def create_host_pipe_dir(self): - path = self.host_pipe_dir - if not os.path.exists(path): - os.makedirs(path) - # 0777 should be 0700 when we get user namespaces in Docker - os.chmod(path, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) - return path - - @property - def host_factory_pipe(self): - return os.path.join(self.host_pipe_dir, self.factory_pipe_name) - - @property - def sandbox_factory_pipe(self): - return os.path.join(self.sandbox_pipe_dir, self.factory_pipe_name) - - def get_host_storlet_pipe(self, storlet_id): - return os.path.join(self.host_pipe_dir, storlet_id) - - def get_sbox_storlet_pipe(self, storlet_id): - return os.path.join(self.sandbox_pipe_dir, storlet_id) - - def get_sbox_storlet_dir(self, storlet_id): - return os.path.join(self.sandbox_storlet_base_dir, storlet_id) - - @property - def host_storlet_base_dir(self): - return os.path.join(self.host_storlet_root_dir, self.scope) - - def get_host_storlet_dir(self, storlet_id): - return os.path.join(self.host_storlet_base_dir, storlet_id) - - def get_host_slog_path(self, storlet_id): - return os.path.join( - self.host_log_root_dir, self.scope, storlet_id, - 'storlet_invoke.log') - - @property - def host_storlet_cache_dir(self): - return os.path.join(self.host_cache_root_dir, self.scope, 'storlet') - - @property - def host_dependency_cache_dir(self): - return os.path.join(self.host_cache_root_dir, self.scope, 'dependency') - - -"""--------------------------------------------------------------------------- -Docker Stateful Container API -The RunTimeSandbox serve as an API between the Docker Gateway and -a reusable per scope sandbox ----------------------------------------------------------------------------""" - - -class RunTimeSandbox(object): - """ - The RunTimeSandbox represents a reusable per scope sandbox. - - The sandbox is reusable in the sense that it can run several storlet - daemons. - - The following methods are supported: - ping - pings the sandbox for liveness - wait - wait for the sandbox to be ready for processing commands - restart - restart the sandbox - start_storlet_daemon - start a daemon for a given storlet - stop_storlet_daemon - stop a daemon of a given storlet - get_storlet_daemon_status - test if a given storlet daemon is running - """ - - def __init__(self, scope, conf, logger): - """ - :param scope: scope name to be used as container name - :param conf: gateway conf - :param logger: logger instance - """ - self.paths = RunTimePaths(scope, conf) - self.scope = scope - - self.sandbox_ping_interval = \ - float(conf.get('sandbox_ping_interval', 0.5)) - self.sandbox_stop_timeout = \ - float(conf.get('stop_linux_container_timeout', 1)) - self.sandbox_wait_timeout = \ - float(conf.get('restart_linux_container_timeout', 10)) - - self.container_image_namespace = \ - conf.get('docker_repo', conf.get('container_image_namespace')) - self.container_image_name_prefix = 'tenant' - - # TODO(add line in conf) - self.storlet_daemon_thread_pool_size = \ - int(conf.get('storlet_daemon_thread_pool_size', 5)) - self.storlet_daemon_factory_debug_level = \ - conf.get('storlet_daemon_factory_debug_level', 'DEBUG') - self.storlet_daemon_debug_level = \ - conf.get('storlet_daemon_debug_level', 'DEBUG') - - # TODO(change logger's route if possible) - self.logger = logger - - self.default_container_image_name = conf.get( - 'default_docker_image_name', - conf.get('default_container_image_name', 'storlet_engine_image') - ) - - self.max_containers_per_node = \ - int(conf.get('max_containers_per_node', 0)) - - self.container_cpu_period = int(conf.get('container_cpu_period', 0)) - self.container_cpu_quota = int(conf.get('container_cpu_quota', 0)) - self.container_mem_limit = conf.get('container_mem_limit', 0) - # NOTE(tkajinam): memory limit can be a string with unit like 1024m - try: - self.container_mem_limit = int(self.container_mem_limit) - except TypeError: - pass - self.container_cpuset_cpus = conf.get('container_cpuset_cpus') - self.container_cpuset_mems = conf.get('container_cpuset_mems') - self.container_pids_limit = int(conf.get('container_pids_limit', 0)) - - def ping(self): - """ - Ping to daemon factory process inside container - - :returns: True when the daemon factory is responsive - False when the daemon factory is not responsive or it fails - to send command to the process - """ - pipe_path = self.paths.host_factory_pipe - client = SBusClient(pipe_path) - try: - resp = client.ping() - if not resp.status: - self.logger.error('Failed to ping to daemon factory: %s' % - resp.message) - return resp.status - except SBusClientException: - return False - - def wait(self): - """ - Wait while scope's sandbox is starting - - :raises StorletTimeout: the sandbox has not started in - sandbox_wait_timeout - """ - with StorletTimeout(self.sandbox_wait_timeout): - while not self.ping(): - time.sleep(self.sandbox_ping_interval) +class DockerRunTimeSandbox(RunTimeSandbox): def _restart(self, container_image_name): """ @@ -375,468 +109,3 @@ class RunTimeSandbox(object): except docker.errors.APIError: self.logger.exception("Failed to manage docker containers") raise StorletRuntimeException("Docker runtime error") - - def restart(self): - """ - Restarts the scope's sandbox - - """ - self.paths.create_host_pipe_dir() - - container_image_name = self.scope - try: - self._restart(container_image_name) - self.wait() - except StorletTimeout: - raise - except StorletRuntimeException: - # We were unable to start docker container from the tenant image. - # Let us try to start docker container from default image. - self.logger.exception("Failed to start docker container from " - "tenant image %s" % container_image_name) - - self.logger.info("Trying to start docker container from default " - "image: %s" % self.default_container_image_name) - self._restart(self.default_container_image_name) - self.wait() - - def start_storlet_daemon( - self, spath, storlet_id, language, language_version=None): - """ - Start SDaemon process in the scope's sandbox - """ - pipe_path = self.paths.host_factory_pipe - client = SBusClient(pipe_path) - try: - resp = client.start_daemon( - language.lower(), spath, storlet_id, - self.paths.get_sbox_storlet_pipe(storlet_id), - self.storlet_daemon_debug_level, - self.storlet_daemon_thread_pool_size, - language_version) - - if not resp.status: - self.logger.error('Failed to start storlet daemon: %s' % - resp.message) - raise StorletRuntimeException('Daemon start failed') - except SBusClientException: - raise StorletRuntimeException('Daemon start failed') - - def stop_storlet_daemon(self, storlet_id): - """ - Stop SDaemon process in the scope's sandbox - """ - pipe_path = self.paths.host_factory_pipe - client = SBusClient(pipe_path) - try: - resp = client.stop_daemon(storlet_id) - if not resp.status: - self.logger.error('Failed to stop storlet daemon: %s' % - resp.message) - raise StorletRuntimeException('Daemon stop failed') - except SBusClientException: - raise StorletRuntimeException('Daemon stop failed') - - def get_storlet_daemon_status(self, storlet_id): - """ - Get the status of SDaemon process in the scope's sandbox - """ - pipe_path = self.paths.host_factory_pipe - client = SBusClient(pipe_path) - try: - resp = client.daemon_status(storlet_id) - if resp.status: - return 1 - else: - self.logger.error('Failed to get status about storlet ' - 'daemon: %s' % resp.message) - return 0 - except SBusClientException: - return -1 - - def _get_storlet_classpath(self, storlet_main, storlet_id, dependencies): - """ - Get classpath required to run storlet application - - :param storlet_main: Main class name of the storlet - :param storlet_id: Name of the storlet file - :param dependencies: A list of dependency file - :returns: classpath string - """ - class_path = os.path.join( - self.paths.get_sbox_storlet_dir(storlet_main), storlet_id) - - dep_path_list = \ - [os.path.join(self.paths.get_sbox_storlet_dir(storlet_main), dep) - for dep in dependencies] - - return class_path + ':' + ':'.join(dep_path_list) - - def activate_storlet_daemon(self, sreq, cache_updated=True): - storlet_daemon_status = \ - self.get_storlet_daemon_status(sreq.storlet_main) - if (storlet_daemon_status == -1): - # We failed to send a command to the factory. - # Best we can do is execute the container. - self.logger.debug('Failed to check the storlet daemon status. ' - 'Restart Docker container') - self.restart() - storlet_daemon_status = 0 - - if (cache_updated is True and storlet_daemon_status == 1): - # The cache was updated while the daemon is running we need to - # stop it. - self.logger.debug('The cache was updated, and the storlet daemon ' - 'is running. Stopping daemon') - - try: - self.stop_storlet_daemon(sreq.storlet_main) - except StorletRuntimeException: - self.logger.warning('Failed to stop the storlet daemon. ' - 'Restart Docker container') - self.restart() - else: - self.logger.debug('Deamon stopped') - storlet_daemon_status = 0 - - if (storlet_daemon_status == 0): - self.logger.debug('Going to start the storlet daemon!') - - # TODO(takashi): This is not needed for python application - classpath = self._get_storlet_classpath( - sreq.storlet_main, sreq.storlet_id, sreq.dependencies) - - self.start_storlet_daemon( - classpath, sreq.storlet_main, sreq.storlet_language, - sreq.storlet_language_version) - self.logger.debug('Daemon started') - - -"""--------------------------------------------------------------------------- -Storlet Daemon API -StorletInvocationProtocol -server as an API between the Docker Gateway and the Storlet Daemon which -runs inside the Docker container. These classes implement the Storlet execution -protocol ----------------------------------------------------------------------------""" - - -class StorletInvocationProtocol(object): - """ - StorletInvocationProtocol class - - This class serves communictaion with a Docker container to run an - application - - :param srequest: StorletRequest instance - :param storlet_pipe_path: path string to pipe - :param storlet_logger_path: path string to log file - :param timeout: integer of timeout for waiting the resp from container - :param logger: logger instance - :param extra_sources (WIP): a list of StorletRequest instances - which keep data_iter for adding extra source - as data stream - """ - def __init__(self, srequest, storlet_pipe_path, storlet_logger_path, - timeout, logger, extra_sources=None): - self.srequest = srequest - self.storlet_pipe_path = storlet_pipe_path - self.storlet_logger = StorletLogger(storlet_logger_path) - self.logger = logger - self.timeout = timeout - - # local side file descriptors - self.data_read_fd = None - self.data_write_fd = None - self.metadata_read_fd = None - self.metadata_write_fd = None - self.task_id = None - self._input_data_read_fd = None - self._input_data_write_fd = None - - self.extra_data_sources = [] - extra_sources = extra_sources or [] - for source in extra_sources: - if source.has_fd: - # TODO(kota_): it may be data_fd in the future. - raise Exception( - 'extra_source no requires data_fd just data_iter') - self.extra_data_sources.append( - {'read_fd': None, 'write_fd': None, - 'user_metadata': source.user_metadata, - 'data_iter': source.data_iter}) - - @property - def input_data_read_fd(self): - """ - File descriptor to read the input body content - """ - if self.srequest.has_fd: - return self.srequest.data_fd - else: - return self._input_data_read_fd - - @property - def remote_fds(self): - """ - A list of sbus file descriptors passed to remote side - """ - storlets_metadata = {} - if self.srequest.has_range: - storlets_metadata.update( - {'start': str(self.srequest.start), - 'end': str(self.srequest.end)}) - - fds = [SBusFileDescriptor(sbus_fd.SBUS_FD_INPUT_OBJECT, - self.input_data_read_fd, - storage_metadata=self.srequest.user_metadata, - storlets_metadata=storlets_metadata), - SBusFileDescriptor(sbus_fd.SBUS_FD_OUTPUT_OBJECT, - self.data_write_fd), - SBusFileDescriptor(sbus_fd.SBUS_FD_OUTPUT_OBJECT_METADATA, - self.metadata_write_fd), - SBusFileDescriptor(sbus_fd.SBUS_FD_LOGGER, - self.storlet_logger.getfd())] - - for source in self.extra_data_sources: - fd = SBusFileDescriptor( - sbus_fd.SBUS_FD_INPUT_OBJECT, - source['read_fd'], - storage_metadata=source['user_metadata']) - fds.append(fd) - - return fds - - @contextmanager - def _activate_invocation_descriptors(self): - """ - Contextmanager about file descriptors used in storlet invocation - - NOTE: This context manager now only closes remote side fds, - so you should close local side fds - """ - self._prepare_invocation_descriptors() - try: - yield - finally: - self._close_remote_side_descriptors() - - def _prepare_invocation_descriptors(self): - """ - Create all pipse used for Storlet execution - """ - if not self.srequest.has_fd: - self._input_data_read_fd, self._input_data_write_fd = os.pipe() - self.data_read_fd, self.data_write_fd = os.pipe() - self.metadata_read_fd, self.metadata_write_fd = os.pipe() - - for source in self.extra_data_sources: - source['read_fd'], source['write_fd'] = os.pipe() - - def _safe_close(self, fds): - """ - Make sure that all of the file descriptors get closed - - :param fds: a list of file descriptors - """ - for fd in fds: - try: - os.close(fd) - except OSError as err: - if err.errno != errno.EBADF: - raise - # TODO(kota_): fd might be closed already, so if already - # closed, OSError will be raised. we need more refactor to - # keep clean the file descriptors. - pass - - def _close_remote_side_descriptors(self): - """ - Close all of the container side descriptors - """ - fds = [self.data_write_fd, self.metadata_write_fd] - if not self.srequest.has_fd: - fds.append(self.input_data_read_fd) - fds.extend([source['read_fd'] for source in self.extra_data_sources]) - for fd in fds: - os.close(fd) - - def _close_local_side_descriptors(self): - """ - Close all of the host side descriptors - """ - fds = [self.data_read_fd, self.metadata_read_fd] - fds.extend([source['write_fd'] for source in self.extra_data_sources]) - self._safe_close(fds) - - def _cancel(self): - """ - Cancel on-going storlet execution - """ - client = SBusClient(self.storlet_pipe_path) - try: - resp = client.cancel(self.task_id) - if not resp.status: - raise StorletRuntimeException('Failed to cancel task') - except SBusClientException: - raise StorletRuntimeException('Failed to cancel task') - - def _invoke(self): - """ - Send an execution command to the remote daemon factory - """ - with self.storlet_logger.activate(),\ - self._activate_invocation_descriptors(): - self._send_execute_command() - - def _send_execute_command(self): - """ - Send execute command to the remote daemon factory to invoke storlet - execution - """ - client = SBusClient(self.storlet_pipe_path) - try: - resp = client.execute(self.srequest.params, self.remote_fds) - if not resp.status: - raise StorletRuntimeException("Failed to send execute command") - - if not resp.task_id: - raise StorletRuntimeException("Missing task id") - else: - self.task_id = resp.task_id - except SBusClientException: - raise StorletRuntimeException("Failed to send execute command") - - def _wait_for_read_with_timeout(self, fd): - """ - Wait while the read file descriptor gets ready - - :param fd: File descriptor to read - :raises StorletTimeout: Exception raised when it times out to cancel - the existing task - :raises StorletRuntimeException: Exception raised when it fails to - cancel the existing task - """ - try: - with StorletTimeout(self.timeout): - r, w, e = select.select([fd], [], []) - except StorletTimeout: - exc_type, exc_value, exc_traceback = sys.exc_info() - - # When there is a task already running, we should cancel it. - if self.task_id: - try: - self._cancel() - except StorletRuntimeException: - self.logger.warning( - 'Task %s timed out, but failed to get canceled' - % self.task_id) - pass - - if exc_value is None: - exc_value = exc_traceback - if exc_value.__traceback__ is not exc_traceback: - raise exc_value.with_traceback(exc_traceback) - raise exc_value - - if fd not in r: - raise StorletRuntimeException('Read fd is not ready') - - def _read_metadata(self): - """ - Read metadata in the storlet execution result from fd - - :returns: a dict of metadata - """ - self._wait_for_read_with_timeout(self.metadata_read_fd) - flat_json = os.read(self.metadata_read_fd, MAX_METADATA_SIZE) - os.close(self.metadata_read_fd) - try: - return json.loads(flat_json) - except ValueError: - self.logger.exception('Failed to load metadata from json') - raise StorletRuntimeException('Got invalid format about metadata') - - def _wait_for_write_with_timeout(self, fd): - """ - Wait while the write file descriptor gets ready - - :param fd: File descriptor to write - :raises StorletTimeout: Exception raised when it times out to cancel - the existing task - :raises StorletRuntimeException: Exception raised when it fails to - cancel the existing task - """ - with StorletTimeout(self.timeout): - r, w, e = select.select([], [fd], []) - if fd not in w: - raise StorletRuntimeException('Write fd is not ready') - - def _close_input_data_descriptors(self): - fds = [self._input_data_read_fd, self._input_data_write_fd] - self._safe_close(fds) - - def communicate(self): - try: - self._invoke() - - if not self.srequest.has_fd: - self._wait_for_write_with_timeout(self._input_data_write_fd) - - # We do the writing in a different thread. - # Otherwise, we can run into the following deadlock - # 1. middleware writes to Storlet - # 2. Storlet reads and starts to write metadata and then data - # 3. middleware continues writing - # 4. Storlet continues writing and gets stuck as middleware - # is busy writing, but still not consuming the reader end - # of the Storlet writer. - eventlet.spawn_n(self._write_input_data, - self._input_data_write_fd, - self.srequest.data_iter) - - for source in self.extra_data_sources: - # NOTE(kota_): not sure right now if using eventlet.spawn_n is - # right way. GreenPool is better? I don't get - # whole for the dead lock described in above. - self._wait_for_write_with_timeout(source['write_fd']) - eventlet.spawn_n(self._write_input_data, - source['write_fd'], - source['data_iter']) - - out_md = self._read_metadata() - self._wait_for_read_with_timeout(self.data_read_fd) - - return StorletResponse(out_md, data_fd=self.data_read_fd, - cancel=self._cancel) - except Exception: - self._close_local_side_descriptors() - if not self.srequest.has_fd: - self._close_input_data_descriptors() - raise - - @contextmanager - def _open_writer(self, fd): - with os.fdopen(fd, 'wb') as writer: - yield writer - - def _write_input_data(self, fd, data_iter): - try: - # double try/except block saving from unexpected errors - try: - with self._open_writer(fd) as writer: - for chunk in data_iter: - with StorletTimeout(self.timeout): - writer.write(chunk) - except (OSError, TypeError, ValueError): - self.logger.exception('fdopen failed') - except IOError: - # this will happen at sort of broken pipe while writer.write - self.logger.exception('IOError with writing fd %s' % fd) - except StorletTimeout: - self.logger.exception( - 'Timeout (%s)s with writing fd %s' % (self.timeout, fd)) - except Exception: - # _write_input_data is designed to run eventlet thread - # so that we should catch and suppress it here - self.logger.exception('Unexpected error at writing input data') diff --git a/storlets/gateway/gateways/stub.py b/storlets/gateway/gateways/stub.py index 445c2231..c0d6b127 100644 --- a/storlets/gateway/gateways/stub.py +++ b/storlets/gateway/gateways/stub.py @@ -18,12 +18,12 @@ from storlets.gateway.common.stob import StorletRequest, StorletResponse from storlets.gateway.gateways.base import StorletGatewayBase -class StorletGatewayStub(StorletGatewayBase): +class StubStorletGateway(StorletGatewayBase): request_class = StorletRequest def __init__(self, conf, logger, scope): - super(StorletGatewayStub, self).__init__(conf, logger, scope) + super(StubStorletGateway, self).__init__(conf, logger, scope) self.logger = logger self.conf = conf self.scope = scope diff --git a/tests/functional/common/test_capabilities.py b/tests/functional/common/test_capabilities.py index 69799152..74fd4c76 100644 --- a/tests/functional/common/test_capabilities.py +++ b/tests/functional/common/test_capabilities.py @@ -39,7 +39,7 @@ class TestCapabilities(StorletBaseFunctionalTest): # TODO(eranr): take values from conf self.assertEqual('dependency', options['storlet_dependency']) self.assertEqual('storlet', options['storlet_container']) - self.assertEqual('StorletGatewayDocker', + self.assertEqual('DockerStorletGateway', options['storlet_gateway_class']) diff --git a/tests/unit/gateway/gateways/container/__init__.py b/tests/unit/gateway/gateways/container/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/unit/gateway/gateways/container/test_gateway.py b/tests/unit/gateway/gateways/container/test_gateway.py new file mode 100644 index 00000000..69317d58 --- /dev/null +++ b/tests/unit/gateway/gateways/container/test_gateway.py @@ -0,0 +1,561 @@ +# Copyright (c) 2010-2015 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import eventlet +from io import BytesIO, StringIO +import json +from shutil import rmtree +from tempfile import mkdtemp +import unittest +from unittest import mock + +from swift.common.swob import Request, Response +from swift.common.utils import FileLikeIter + +from storlets.sbus.client import SBusResponse +from storlets.gateway.gateways.container.gateway import ContainerStorletRequest +from tests.unit import FakeLogger +from tests.unit.gateway.gateways import FakeFileManager + + +class TestContainerStorletRequest(unittest.TestCase): + + def test_init(self): + # Java + storlet_id = 'Storlet-1.0.jar' + params = {'Param1': 'Value1', 'Param2': 'Value2'} + metadata = {'MetaKey1': 'MetaValue1', 'MetaKey2': 'MetaValue2'} + + # with dependencies + options = {'storlet_main': 'org.openstack.storlet.Storlet', + 'storlet_dependency': 'dep1,dep2', + 'storlet_language': 'java', + 'file_manager': FakeFileManager('storlet', 'dep')} + dsreq = ContainerStorletRequest(storlet_id, params, metadata, + iter(StringIO()), options=options) + self.assertEqual(metadata, dsreq.user_metadata) + self.assertEqual(params, dsreq.params) + self.assertEqual('Storlet-1.0.jar', dsreq.storlet_id) + self.assertEqual('org.openstack.storlet.Storlet', dsreq.storlet_main) + self.assertEqual(['dep1', 'dep2'], dsreq.dependencies) + self.assertEqual('java', dsreq.storlet_language) + self.assertIsNone(dsreq.storlet_language_version) + + # without dependencies + options = {'storlet_main': 'org.openstack.storlet.Storlet', + 'storlet_language': 'java', + 'file_manager': FakeFileManager('storlet', 'dep')} + dsreq = ContainerStorletRequest(storlet_id, params, metadata, + iter(StringIO()), options=options) + self.assertEqual(metadata, dsreq.user_metadata) + self.assertEqual(params, dsreq.params) + self.assertEqual('Storlet-1.0.jar', dsreq.storlet_id) + self.assertEqual('org.openstack.storlet.Storlet', dsreq.storlet_main) + self.assertEqual([], dsreq.dependencies) + self.assertEqual('java', dsreq.storlet_language) + self.assertIsNone(dsreq.storlet_language_version) + + # storlet_language is not given + options = {'storlet_main': 'org.openstack.storlet.Storlet', + 'file_manager': FakeFileManager('storlet', 'dep')} + with self.assertRaises(ValueError): + ContainerStorletRequest(storlet_id, params, metadata, + iter(StringIO()), options=options) + + # storlet_main is not given + options = {'storlet_language': 'java', + 'file_manager': FakeFileManager('storlet', 'dep')} + with self.assertRaises(ValueError): + ContainerStorletRequest(storlet_id, params, metadata, + iter(StringIO()), options=options) + + # file_manager is not given + options = {'storlet_main': 'org.openstack.storlet.Storlet', + 'storlet_language': 'java'} + with self.assertRaises(ValueError): + ContainerStorletRequest(storlet_id, params, metadata, + iter(StringIO()), options=options) + + # Python + storlet_id = 'storlet.py' + params = {'Param1': 'Value1', 'Param2': 'Value2'} + metadata = {'MetaKey1': 'MetaValue1', 'MetaKey2': 'MetaValue2'} + + # without language version + options = {'storlet_main': 'storlet.Storlet', + 'storlet_language': 'python', + 'file_manager': FakeFileManager('storlet', 'dep')} + dsreq = ContainerStorletRequest(storlet_id, params, metadata, + iter(StringIO()), options=options) + self.assertEqual(metadata, dsreq.user_metadata) + self.assertEqual(params, dsreq.params) + self.assertEqual('storlet.py', dsreq.storlet_id) + self.assertEqual('storlet.Storlet', dsreq.storlet_main) + self.assertEqual([], dsreq.dependencies) + self.assertEqual('python', dsreq.storlet_language) + self.assertIsNone(dsreq.storlet_language_version) + + # with language version + options = {'storlet_main': 'storlet.Storlet', + 'storlet_language': 'python', + 'storlet_language_version': '3.6', + 'file_manager': FakeFileManager('storlet', 'dep')} + dsreq = ContainerStorletRequest(storlet_id, params, metadata, + iter(StringIO()), options=options) + self.assertEqual(metadata, dsreq.user_metadata) + self.assertEqual(params, dsreq.params) + self.assertEqual('storlet.py', dsreq.storlet_id) + self.assertEqual('storlet.Storlet', dsreq.storlet_main) + self.assertEqual([], dsreq.dependencies) + self.assertEqual('python', dsreq.storlet_language) + self.assertEqual('3.6', dsreq.storlet_language_version) + + def test_init_with_range(self): + storlet_id = 'Storlet-1.0.jar' + params = {} + metadata = {} + options = {'storlet_main': 'org.openstack.storlet.Storlet', + 'storlet_dependency': 'dep1,dep2', + 'storlet_language': 'java', + 'file_manager': FakeFileManager('storlet', 'dep'), + 'range_start': 1, + 'range_end': 6} + dsreq = ContainerStorletRequest(storlet_id, params, metadata, + None, 0, options=options) + + self.assertEqual('Storlet-1.0.jar', dsreq.storlet_id) + self.assertEqual('org.openstack.storlet.Storlet', dsreq.storlet_main) + self.assertEqual(['dep1', 'dep2'], dsreq.dependencies) + self.assertEqual('java', dsreq.storlet_language) + self.assertIsNone(dsreq.storlet_language_version) + self.assertEqual(1, dsreq.start) + self.assertEqual(6, dsreq.end) + + options = {'storlet_main': 'org.openstack.storlet.Storlet', + 'storlet_dependency': 'dep1,dep2', + 'storlet_language': 'java', + 'file_manager': FakeFileManager('storlet', 'dep'), + 'range_start': 0, + 'range_end': 0} + dsreq = ContainerStorletRequest(storlet_id, params, metadata, + None, 0, options=options) + + self.assertEqual('Storlet-1.0.jar', dsreq.storlet_id) + self.assertEqual('org.openstack.storlet.Storlet', dsreq.storlet_main) + self.assertEqual(['dep1', 'dep2'], dsreq.dependencies) + self.assertEqual('java', dsreq.storlet_language) + self.assertIsNone(dsreq.storlet_language_version) + self.assertEqual(0, dsreq.start) + self.assertEqual(0, dsreq.end) + + def test_has_range(self): + storlet_id = 'Storlet-1.0.jar' + params = {} + metadata = {} + options = {'storlet_main': 'org.openstack.storlet.Storlet', + 'storlet_dependency': 'dep1,dep2', + 'storlet_language': 'java', + 'file_manager': FakeFileManager('storlet', 'dep')} + dsreq = ContainerStorletRequest(storlet_id, params, metadata, + None, 0, options=options) + self.assertFalse(dsreq.has_range) + + options = {'storlet_main': 'org.openstack.storlet.Storlet', + 'storlet_dependency': 'dep1,dep2', + 'storlet_language': 'java', + 'file_manager': FakeFileManager('storlet', 'dep'), + 'range_start': 1, + 'range_end': 6} + dsreq = ContainerStorletRequest(storlet_id, params, metadata, + None, 0, options=options) + self.assertTrue(dsreq.has_range) + + options = {'storlet_main': 'org.openstack.storlet.Storlet', + 'storlet_dependency': 'dep1,dep2', + 'storlet_language': 'java', + 'file_manager': FakeFileManager('storlet', 'dep'), + 'range_start': 0, + 'range_end': 6} + dsreq = ContainerStorletRequest(storlet_id, params, metadata, + None, 0, options=options) + self.assertTrue(dsreq.has_range) + + +class ContainerGatewayTestMixin(object): + + def setUp(self): + # TODO(takashi): take these values from config file + self.tempdir = mkdtemp() + self.sconf = { + 'host_root': self.tempdir, + 'swift_dir': self.tempdir, + 'storlet_timeout': '9', + 'storlet_container': 'storlet', + 'storlet_dependency': 'dependency', + 'reseller_prefix': 'AUTH' + } + self.logger = FakeLogger() + + self.storlet_container = self.sconf['storlet_container'] + self.storlet_dependency = self.sconf['storlet_dependency'] + + self.version = 'v1' + self.account = 'AUTH_account' + self.container = 'container' + self.obj = 'object' + self.sobj = 'storlet-1.0.jar' + + self.gateway = self.gateway_class( + self.sconf, self.logger, self.account) + + def tearDown(self): + rmtree(self.tempdir) + + @property + def req_path(self): + return self._create_proxy_path( + self.version, self.account, self.container, + self.obj) + + @property + def storlet_path(self): + return self._create_proxy_path( + self.version, self.account, self.storlet_container, + self.sobj) + + def _create_proxy_path(self, version, account, container, obj): + return '/'.join(['', version, account, container, obj]) + + def test_check_mandatory_params(self): + params = {'keyA': 'valueA', + 'keyB': 'valueB', + 'keyC': 'valueC'} + + # all mandatory headers are included + self.gateway_class._check_mandatory_params( + params, ['keyA', 'keyB']) + + # some of mandatory headers are missing + with self.assertRaises(ValueError): + self.gateway_class._check_mandatory_params( + params, ['keyA', 'KeyD']) + + def test_validate_storlet_registration_java(self): + # correct name and headers w/ dependency + obj = 'storlet-1.0.jar' + params = {'Language': 'java', + 'Interface-Version': '1.0', + 'Dependency': 'dep_file', + 'Object-Metadata': 'no', + 'Main': 'path.to.storlet.class'} + self.gateway_class.validate_storlet_registration(params, obj) + + # correct name and headers w/o dependency + obj = 'storlet-1.0.jar' + params = {'Language': 'java', + 'Interface-Version': '1.0', + 'Object-Metadata': 'no', + 'Main': 'path.to.storlet.class'} + self.gateway_class.validate_storlet_registration(params, obj) + + # some header keys are missing + params = {'Language': 'java', + 'Interface-Version': '1.0', + 'Dependency': 'dep_file', + 'Object-Metadata': 'no'} + with self.assertRaises(ValueError): + self.gateway_class.validate_storlet_registration(params, obj) + + # wrong name + obj = 'storlet.jar' + params = {'Language': 'java', + 'Interface-Version': '1.0', + 'Dependency': 'dep_file', + 'Object-Metadata': 'no', + 'Main': 'path.to.storlet.class'} + with self.assertRaises(ValueError): + self.gateway_class.validate_storlet_registration(params, obj) + + def test_validate_storlet_registration_python(self): + # correct name and headers w/ dependency + obj = 'storlet.py' + params = {'Language': 'python', + 'Language-Version': '3.6', + 'Interface-Version': '1.0', + 'Dependency': 'dep_file', + 'Object-Metadata': 'no', + 'Main': 'storlet.Storlet'} + self.gateway_class.validate_storlet_registration(params, obj) + + # wrong version + obj = 'storlet.py' + params = {'Language': 'python', + 'Language-Version': '1.7', + 'Interface-Version': '1.0', + 'Dependency': 'dep_file', + 'Object-Metadata': 'no', + 'Main': 'storlet.Storlet'} + with self.assertRaises(ValueError): + self.gateway_class.validate_storlet_registration(params, obj) + + # py2 is no more supported + obj = 'storlet.py' + params = {'Language': 'python', + 'Language-Version': '2.7', + 'Interface-Version': '1.0', + 'Dependency': 'dep_file', + 'Object-Metadata': 'no', + 'Main': 'storlet.Storlet'} + with self.assertRaises(ValueError): + self.gateway_class.validate_storlet_registration(params, obj) + + # wrong name + obj = 'storlet.pyfoo' + params = {'Language': 'python', + 'Interface-Version': '1.0', + 'Dependency': 'dep_file', + 'Object-Metadata': 'no', + 'Main': 'storlet.Storlet'} + with self.assertRaises(ValueError): + self.gateway_class.validate_storlet_registration(params, obj) + + # wrong main class + obj = 'storlet.py' + params = {'Language': 'python', + 'Interface-Version': '1.0', + 'Dependency': 'dep_file', + 'Object-Metadata': 'no', + 'Main': 'another_storlet.Storlet'} + with self.assertRaises(ValueError): + self.gateway_class.validate_storlet_registration(params, obj) + + obj = 'storlet.py' + params = {'Language': 'python', + 'Interface-Version': '1.0', + 'Dependency': 'dep_file', + 'Object-Metadata': 'no', + 'Main': 'storlet'} + with self.assertRaises(ValueError): + self.gateway_class.validate_storlet_registration(params, obj) + + obj = 'storlet.py' + params = {'Language': 'python', + 'Interface-Version': '1.0', + 'Dependency': 'dep_file', + 'Object-Metadata': 'no', + 'Main': 'storlet.foo.Storlet'} + with self.assertRaises(ValueError): + self.gateway_class.validate_storlet_registration(params, obj) + + def test_validate_storlet_registration_not_suppoeted(self): + # unsupported language + obj = 'storlet.foo' + params = {'Language': 'bar', + 'Interface-Version': '1.0', + 'Dependency': 'dep_file', + 'Object-Metadata': 'no', + 'Main': 'path.to.storlet.class'} + with self.assertRaises(ValueError): + self.gateway_class.validate_storlet_registration(params, obj) + + # same name for storlet and dependency + obj = 'storlet-1.0.jar' + params = {'Language': 'java', + 'Interface-Version': '1.0', + 'Dependency': 'storlet-1.0.jar', + 'Object-Metadata': 'no', + 'Main': 'path.to.storlet.class'} + with self.assertRaises(ValueError): + self.gateway_class.validate_storlet_registration(params, obj) + + # duplicated name in dependencies + obj = 'storlet-1.0.jar' + params = {'Language': 'java', + 'Interface-Version': '1.0', + 'Dependency': 'dep_file,dep_file', + 'Object-Metadata': 'no', + 'Main': 'path.to.storlet.class'} + with self.assertRaises(ValueError): + self.gateway_class.validate_storlet_registration(params, obj) + + def test_validate_dependency_registration(self): + # w/o dependency parameter + obj = 'dep_file' + params = {'Dependency-Version': '1.0'} + self.gateway_class.validate_dependency_registration(params, obj) + + # w/ correct dependency parameter + params = { + 'Dependency-Permissions': '755', + 'Dependency-Version': '1.0'} + self.gateway_class.validate_dependency_registration(params, obj) + + # w/ wrong dependency parameter + params = { + 'Dependency-Permissions': '400', + 'Dependency-Version': '1.0'} + with self.assertRaises(ValueError): + self.gateway_class.validate_dependency_registration(params, obj) + + # w/ invalid dependency parameter + params = { + 'Dependency-Permissions': 'foo', + 'Dependency-Version': '1.0'} + with self.assertRaises(ValueError): + self.gateway_class.validate_dependency_registration(params, obj) + params = { + 'Dependency-Permissions': '888', + 'Dependency-Version': '1.0'} + with self.assertRaises(ValueError): + self.gateway_class.validate_dependency_registration(params, obj) + + def _test_invocation_flow(self, extra_sources=None): + extra_sources = extra_sources or [] + options = {'generate_log': False, + 'scope': 'AUTH_account', + 'storlet_main': 'org.openstack.storlet.Storlet', + 'storlet_dependency': 'dep1,dep2', + 'storlet_language': 'java', + 'file_manager': FakeFileManager('storlet', 'dep')} + + st_req = ContainerStorletRequest( + storlet_id=self.sobj, + params={}, + user_metadata={}, + data_iter=iter('body'), options=options) + + # TODO(kota_): need more efficient way for emuration of return value + # from SDaemon + value_generator = iter([ + # first, we get metadata json + json.dumps({'metadata': 'return'}), + # then we get object data + 'something', '', + ]) + + def mock_read(fd, size): + try: + value = next(value_generator) + except StopIteration: + raise Exception('called more then expected') + # NOTE(takashi): Make sure that we return bytes in PY3 + return value.encode('utf-8') + + def mock_close(fd): + pass + + called_fd_and_bodies = [] + invocation_protocol = \ + 'storlets.gateway.gateways.container.runtime.' \ + 'StorletInvocationProtocol._write_input_data' + + def mock_writer(self, fd, app_iter): + body = '' + for chunk in app_iter: + body += chunk + called_fd_and_bodies.append((fd, body)) + + # prepare nested mock patch + # SBus -> mock SBus.send() for container communication + # os.read -> mock reading the file descriptor from container + # select.select -> mock fd communication which can be readable + @mock.patch('storlets.gateway.gateways.container.runtime.SBusClient') + @mock.patch('storlets.gateway.gateways.container.runtime.os.read', + mock_read) + @mock.patch('storlets.gateway.gateways.container.runtime.os.close', + mock_close) + @mock.patch('storlets.gateway.gateways.container.runtime.select.' + 'select', + lambda r, w, x, timeout=None: (r, w, x)) + @mock.patch('storlets.gateway.common.stob.os.read', mock_read) + @mock.patch(invocation_protocol, mock_writer) + def test_invocation_flow(client): + client.ping.return_value = SBusResponse(True, 'OK') + client.stop_daemon.return_value = SBusResponse(True, 'OK') + client.start_daemon.return_value = SBusResponse(True, 'OK') + client.execute.return_value = SBusResponse(True, 'OK', 'someid') + + sresp = self.gateway.invocation_flow(st_req, extra_sources) + eventlet.sleep(0.1) + file_like = FileLikeIter(sresp.data_iter) + self.assertEqual(b'something', file_like.read()) + + # I hate the decorator to return an instance but to track current + # implementation, we have to make a mock class for this. Need to fix. + + class MockFileManager(object): + def get_storlet(self, req): + return BytesIO(b'mock'), None + + def get_dependency(self, req): + return BytesIO(b'mock'), None + + st_req.file_manager = MockFileManager() + + test_invocation_flow() + + # ensure st_req.app_iter is drawn + self.assertRaises(StopIteration, next, st_req.data_iter) + expected_mock_writer_calls = len(extra_sources) + 1 + self.assertEqual(expected_mock_writer_calls, + len(called_fd_and_bodies)) + self.assertEqual('body', called_fd_and_bodies[0][1]) + return called_fd_and_bodies + + def test_invocation_flow(self): + self._test_invocation_flow() + + def test_invocation_flow_with_extra_sources(self): + options = {'generate_log': False, + 'scope': 'AUTH_account', + 'storlet_main': 'org.openstack.storlet.Storlet', + 'storlet_dependency': 'dep1,dep2', + 'storlet_language': 'java', + 'file_manager': FakeFileManager('storlet', 'dep')} + + data_sources = [] + + def generate_extra_st_request(): + # This works similarly with build_storlet_request + # TODO(kota_): think of more generarl way w/o + # build_storlet_request + sw_req = Request.blank( + self.req_path, environ={'REQUEST_METHOD': 'GET'}, + headers={'X-Run-Storlet': self.sobj}) + + sw_resp = Response( + app_iter=iter(['This is a response body']), status=200) + + st_req = ContainerStorletRequest( + storlet_id=sw_req.headers['X-Run-Storlet'], + params=sw_req.params, + user_metadata={}, + data_iter=sw_resp.app_iter, options=options) + data_sources.append(sw_resp.app_iter) + return st_req + + extra_request = generate_extra_st_request() + mock_calls = self._test_invocation_flow( + extra_sources=[extra_request]) + self.assertEqual('This is a response body', mock_calls[1][1]) + + # run all existing eventlet threads + for app_iter in data_sources: + # ensure all app_iters are drawn + self.assertRaises(StopIteration, next, app_iter) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/unit/gateway/gateways/container/test_runtime.py b/tests/unit/gateway/gateways/container/test_runtime.py new file mode 100644 index 00000000..e00874c4 --- /dev/null +++ b/tests/unit/gateway/gateways/container/test_runtime.py @@ -0,0 +1,411 @@ +# Copyright (c) 2010-2015 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from contextlib import contextmanager +import errno +from io import StringIO +import os +from stat import ST_MODE +import tempfile +import unittest +from unittest import mock + +from storlets.sbus.client import SBusResponse +from storlets.sbus.client.exceptions import SBusClientIOError +from storlets.gateway.common.exceptions import StorletRuntimeException, \ + StorletTimeout +from storlets.gateway.gateways.container.gateway import ContainerStorletRequest +from storlets.gateway.gateways.container.runtime import \ + RunTimePaths, StorletInvocationProtocol +from tests.unit import FakeLogger, with_tempdir +from tests.unit.gateway.gateways import FakeFileManager + + +@contextmanager +def _mock_os_pipe(bufs): + class FakeFd(object): + def __init__(self, rbuf=''): + self.rbuf = rbuf.encode('utf-8') + self.closed = False + + def read(self, size): + size = min(len(self.rbuf), size) + ret = self.rbuf[:size] + self.rbuf = self.rbuf[size:] + return ret + + def close(self): + if self.closed: + raise OSError(errno.EBADF, os.strerror(errno.EBADF)) + self.closed = True + + def fake_os_read(fd, size): + return fd.read(size) + + def fake_os_close(fd): + fd.close() + + pipes = [(FakeFd(buf), FakeFd()) for buf in bufs] + pipe_generator = iter(pipes) + + def mock_os_pipe(): + try: + return next(pipe_generator) + except StopIteration: + raise AssertionError('pipe called more than expected') + + with mock.patch('storlets.gateway.gateways.container.runtime.os.pipe', + mock_os_pipe), \ + mock.patch('storlets.gateway.gateways.container.runtime.os.read', + fake_os_read) as fake_os_read,\ + mock.patch('storlets.gateway.gateways.container.runtime.os.close', + fake_os_close) as fake_os_close: + yield pipes + + +class TestRuntimePaths(unittest.TestCase): + + def setUp(self): + self.scope = '0123456789abc' + self._initialize() + + def _initialize(self): + # TODO(takashi): take these values from config file + base_dir = '/var/lib/storlets' + self.script_dir = os.path.join(base_dir, 'scripts') + self.pipes_dir = os.path.join(base_dir, 'pipes', 'scopes') + self.storlets_dir = os.path.join(base_dir, 'storlets', 'scopes') + self.log_dir = os.path.join(base_dir, 'logs', 'scopes') + self.cache_dir = os.path.join(base_dir, 'cache', 'scopes') + + self.conf = {} + self.storlet_id = 'org.openstack.storlet.mystorlet' + self.paths = RunTimePaths(self.scope, self.conf) + + def tearDown(self): + pass + + def test_host_pipe_dir(self): + self.assertEqual( + os.path.join(self.pipes_dir, self.scope), + self.paths.host_pipe_dir) + + def test_create_host_pipe_dir(self): + pipedir = self.paths.host_pipe_dir + + # When the directory exists + with mock.patch('os.path.exists', return_value=True), \ + mock.patch('os.makedirs') as m, \ + mock.patch('os.chmod') as c: + self.assertEqual(os.path.join(self.pipes_dir, self.scope), + self.paths.create_host_pipe_dir()) + self.assertEqual(0, m.call_count) + cargs, ckwargs = c.call_args + # Make sure about the target directory + self.assertEqual(cargs[0], pipedir) + + # When the directory does not exist + with mock.patch('os.path.exists', return_value=False), \ + mock.patch('os.makedirs') as m, \ + mock.patch('os.chmod') as c: + self.assertEqual(os.path.join(self.pipes_dir, self.scope), + self.paths.create_host_pipe_dir()) + self.assertEqual(1, m.call_count) + # Make sure about the target directory + margs, mkwargs = m.call_args + self.assertEqual(margs[0], pipedir) + cargs, ckwargs = c.call_args + self.assertEqual(cargs[0], pipedir) + + def test_host_factory_pipe(self): + self.assertEqual( + self.paths.host_factory_pipe, + os.path.join(self.pipes_dir, self.scope, 'factory_pipe')) + + def test_get_host_storlet_pipe(self): + self.assertEqual( + os.path.join(self.pipes_dir, self.scope, self.storlet_id), + self.paths.get_host_storlet_pipe(self.storlet_id)) + + def test_get_sbox_storlet_pipe(self): + self.assertEqual( + os.path.join('/mnt/channels', self.storlet_id), + self.paths.get_sbox_storlet_pipe(self.storlet_id)) + + def test_get_sbox_storlet_dir(self): + self.assertEqual( + os.path.join('/home/swift', self.storlet_id), + self.paths.get_sbox_storlet_dir(self.storlet_id)) + + def test_host_storlet_base_dir(self): + self.assertEqual( + self.paths.host_storlet_base_dir, + os.path.join(self.storlets_dir, self.scope)) + + def test_get_host_storlet_dir(self): + self.assertEqual( + os.path.join(self.storlets_dir, self.scope, self.storlet_id), + self.paths.get_host_storlet_dir(self.storlet_id)) + + def test_get_host_slog_path(self): + self.assertEqual( + os.path.join(self.log_dir, self.scope, self.storlet_id, + 'storlet_invoke.log'), + self.paths.get_host_slog_path(self.storlet_id)) + + def test_host_storlet_cache_dir(self): + self.assertEqual( + os.path.join(self.cache_dir, self.scope, 'storlet'), + self.paths.host_storlet_cache_dir) + + def test_host_dependency_cache_dir(self): + self.assertEqual( + os.path.join(self.cache_dir, self.scope, 'dependency'), + self.paths.host_dependency_cache_dir) + + def test_runtime_paths_default(self): + # CHECK: docs says we need 4 dirs for communicate + # ==================================================================== + # |1| host_factory_pipe_path | //factory_pipe | + # ==================================================================== + # |2| host_storlet_pipe_path | // | + # ==================================================================== + # |3| sandbox_factory_pipe_path | /mnt/channels/factory_pipe | + # ==================================================================== + # |4| sandbox_storlet_pipe_path | /mnt/channels/ | + # ==================================================================== + # + # With this test, the scope value is "account" and the storlet_id is + # "Storlet-1.0.jar" (app name?) + # ok, let's check for these values + + runtime_paths = RunTimePaths('account', {}) + storlet_id = 'Storlet-1.0.jar' + + # For pipe + self.assertEqual('/var/lib/storlets/pipes/scopes/account', + runtime_paths.host_pipe_dir) + + # 1. host_factory_pipe_path //factory_pipe + self.assertEqual( + '/var/lib/storlets/pipes/scopes/account/factory_pipe', + runtime_paths.host_factory_pipe) + # 2. host_storlet_pipe_path // + self.assertEqual( + '/var/lib/storlets/pipes/scopes/account/Storlet-1.0.jar', + runtime_paths.get_host_storlet_pipe(storlet_id)) + # 3. Yes, right now, we don't have the path for #3 in Python + # 4. sandbox_storlet_pipe_path | /mnt/channels/ + self.assertEqual('/mnt/channels/Storlet-1.0.jar', + runtime_paths.get_sbox_storlet_pipe(storlet_id)) + + # This looks like for jar load? + self.assertEqual('/var/lib/storlets/storlets/scopes/account', + runtime_paths.host_storlet_base_dir) + self.assertEqual( + '/var/lib/storlets/storlets/scopes/account/Storlet-1.0.jar', + runtime_paths.get_host_storlet_dir(storlet_id)) + # And this one is a mount point in sand box? + self.assertEqual('/home/swift/Storlet-1.0.jar', + runtime_paths.get_sbox_storlet_dir(storlet_id)) + + @with_tempdir + def test_create_host_pipe_dir_with_real_dir(self, temp_dir): + runtime_paths = RunTimePaths('account', {'host_root': temp_dir}) + runtime_paths.create_host_pipe_dir() + path = runtime_paths.host_pipe_dir + self.assertTrue(os.path.exists(path)) + self.assertTrue(os.path.isdir(path)) + permission = oct(os.stat(path)[ST_MODE])[-3:] + # TODO(kota_): make sure if this is really acceptable + self.assertEqual('777', permission) + + +class TestRuntimePathsTempauth(TestRuntimePaths): + def setUp(self): + self.scope = 'test' + self._initialize() + + +class TestStorletInvocationProtocol(unittest.TestCase): + def setUp(self): + self.pipe_path = tempfile.mktemp() + self.log_file = tempfile.mktemp() + self.logger = FakeLogger() + self.storlet_id = 'Storlet-1.0.jar' + self.options = {'storlet_main': 'org.openstack.storlet.Storlet', + 'storlet_dependency': 'dep1,dep2', + 'storlet_language': 'java', + 'file_manager': FakeFileManager('storlet', 'dep')} + storlet_request = ContainerStorletRequest( + self.storlet_id, {}, {}, iter(StringIO()), options=self.options) + self.protocol = StorletInvocationProtocol( + storlet_request, self.pipe_path, self.log_file, 1, self.logger) + + def tearDown(self): + for path in [self.pipe_path, self.log_file]: + try: + os.unlink(path) + except OSError: + pass + + def test_send_execute_command(self): + with mock.patch('storlets.gateway.gateways.container.runtime.' + 'SBusClient.execute') as execute: + execute.return_value = SBusResponse(True, 'OK', 'someid') + with self.protocol.storlet_logger.activate(): + self.protocol._send_execute_command() + self.assertEqual('someid', self.protocol.task_id) + + with mock.patch('storlets.gateway.gateways.container.runtime.' + 'SBusClient.execute') as execute: + execute.return_value = SBusResponse(True, 'OK') + with self.assertRaises(StorletRuntimeException): + with self.protocol.storlet_logger.activate(): + self.protocol._send_execute_command() + + with mock.patch('storlets.gateway.gateways.container.runtime.' + 'SBusClient.execute') as execute: + execute.return_value = SBusResponse(False, 'NG', 'someid') + with self.assertRaises(StorletRuntimeException): + with self.protocol.storlet_logger.activate(): + self.protocol._send_execute_command() + + with mock.patch('storlets.gateway.gateways.container.runtime.' + 'SBusClient.execute') as execute: + execute.side_effect = SBusClientIOError() + with self.assertRaises(StorletRuntimeException): + with self.protocol.storlet_logger.activate(): + self.protocol._send_execute_command() + + def test_invocation_protocol(self): + # os.pipe will be called 3 times + pipe_called = 3 + + with _mock_os_pipe([''] * pipe_called) as pipes: + with mock.patch.object(self.protocol, + '_wait_for_read_with_timeout'), \ + mock.patch.object(self.protocol, '_send_execute_command'): + self.protocol._invoke() + + self.assertEqual(pipe_called, len(pipes)) + pipes = iter(pipes) + + # data write is not directly closed + # data read is closed + input_data_read_fd, input_data_write_fd = next(pipes) + self.assertTrue(input_data_read_fd.closed) + self.assertFalse(input_data_write_fd.closed) + + # data write is closed but data read is still open + data_read_fd, data_write_fd = next(pipes) + self.assertFalse(data_read_fd.closed) + self.assertTrue(data_write_fd.closed) + + # metadata write fd is closed, metadata read fd is still open. + metadata_read_fd, metadata_write_fd = next(pipes) + self.assertFalse(metadata_read_fd.closed) + self.assertTrue(metadata_write_fd.closed) + + # sanity + self.assertRaises(StopIteration, next, pipes) + + def test_invocation_protocol_remote_fds(self): + # In default, we have 4 fds in remote_fds + storlet_request = ContainerStorletRequest( + self.storlet_id, {}, {}, iter(StringIO()), options=self.options) + protocol = StorletInvocationProtocol( + storlet_request, self.pipe_path, self.log_file, 1, self.logger) + with protocol.storlet_logger.activate(): + self.assertEqual(4, len(protocol.remote_fds)) + + # extra_resources expands the remote_fds + storlet_request = ContainerStorletRequest( + self.storlet_id, {}, {}, iter(StringIO()), options=self.options) + protocol = StorletInvocationProtocol( + storlet_request, self.pipe_path, self.log_file, 1, self.logger, + extra_sources=[storlet_request]) + with protocol.storlet_logger.activate(): + self.assertEqual(5, len(protocol.remote_fds)) + + # 2 more extra_resources expands the remote_fds + storlet_request = ContainerStorletRequest( + self.storlet_id, {}, {}, iter(StringIO()), options=self.options) + protocol = StorletInvocationProtocol( + storlet_request, self.pipe_path, self.log_file, 1, self.logger, + extra_sources=[storlet_request] * 3) + with protocol.storlet_logger.activate(): + self.assertEqual(7, len(protocol.remote_fds)) + + def test_open_writer_with_invalid_fd(self): + invalid_fds = ( + (None, TypeError), (-1, ValueError), ('blah', TypeError)) + + for invalid_fd, expected_error in invalid_fds: + with self.assertRaises(expected_error): + with self.protocol._open_writer(invalid_fd): + pass + + def _test_writer_with_exception(self, exception_cls): + pipes = [os.pipe()] + + def raise_in_the_context(): + with self.protocol._open_writer(pipes[0][1]): + raise exception_cls() + try: + # writer context doesn't suppress any exception + self.assertRaises(exception_cls, raise_in_the_context) + + # since _open_writer closes the write fd, the os.close will fail as + # BadFileDescriptor + with self.assertRaises(OSError) as os_error: + os.close(pipes[0][1]) + self.assertEqual(9, os_error.exception.errno) + + finally: + for fd in pipes[0]: + try: + os.close(fd) + except OSError: + pass + + def test_writer_raise_while_in_writer_context(self): + # basic storlet timeout + self._test_writer_with_exception(StorletTimeout) + # unexpected IOError + self._test_writer_with_exception(IOError) + # else + self._test_writer_with_exception(Exception) + + +class TestStorletInvocationProtocolPython(TestStorletInvocationProtocol): + def setUp(self): + self.pipe_path = tempfile.mktemp() + self.log_file = tempfile.mktemp() + self.logger = FakeLogger() + self.storlet_id = 'Storlet-1.0.py' + self.options = {'storlet_main': 'storlet.Storlet', + 'storlet_dependency': 'dep1,dep2', + 'storlet_language': 'python', + 'language_version': '3.6', + 'file_manager': FakeFileManager('storlet', 'dep')} + storlet_request = ContainerStorletRequest( + self.storlet_id, {}, {}, iter(StringIO()), options=self.options) + self.protocol = StorletInvocationProtocol( + storlet_request, self.pipe_path, self.log_file, 1, self.logger) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/unit/gateway/gateways/docker/test_gateway.py b/tests/unit/gateway/gateways/docker/test_gateway.py index 116584ef..28af68ed 100644 --- a/tests/unit/gateway/gateways/docker/test_gateway.py +++ b/tests/unit/gateway/gateways/docker/test_gateway.py @@ -13,554 +13,17 @@ # See the License for the specific language governing permissions and # limitations under the License. -import eventlet -from io import BytesIO, StringIO -import json -from shutil import rmtree -from tempfile import mkdtemp import unittest -from unittest import mock -from swift.common.swob import Request, Response -from swift.common.utils import FileLikeIter +from storlets.gateway.gateways.docker.gateway import \ + DockerStorletGateway +from tests.unit.gateway.gateways.container.test_gateway import \ + ContainerGatewayTestMixin -from storlets.sbus.client import SBusResponse -from tests.unit import FakeLogger -from tests.unit.gateway.gateways import FakeFileManager -from storlets.gateway.gateways.docker.gateway import DockerStorletRequest, \ - StorletGatewayDocker +class TestStorletDockerGateway(ContainerGatewayTestMixin, unittest.TestCase): - -class MockInternalClient(object): - def __init__(self): - pass - - -class TestDockerStorletRequest(unittest.TestCase): - - def test_init(self): - # Java - storlet_id = 'Storlet-1.0.jar' - params = {'Param1': 'Value1', 'Param2': 'Value2'} - metadata = {'MetaKey1': 'MetaValue1', 'MetaKey2': 'MetaValue2'} - - # with dependencies - options = {'storlet_main': 'org.openstack.storlet.Storlet', - 'storlet_dependency': 'dep1,dep2', - 'storlet_language': 'java', - 'file_manager': FakeFileManager('storlet', 'dep')} - dsreq = DockerStorletRequest(storlet_id, params, metadata, - iter(StringIO()), options=options) - self.assertEqual(metadata, dsreq.user_metadata) - self.assertEqual(params, dsreq.params) - self.assertEqual('Storlet-1.0.jar', dsreq.storlet_id) - self.assertEqual('org.openstack.storlet.Storlet', dsreq.storlet_main) - self.assertEqual(['dep1', 'dep2'], dsreq.dependencies) - self.assertEqual('java', dsreq.storlet_language) - self.assertIsNone(dsreq.storlet_language_version) - - # without dependencies - options = {'storlet_main': 'org.openstack.storlet.Storlet', - 'storlet_language': 'java', - 'file_manager': FakeFileManager('storlet', 'dep')} - dsreq = DockerStorletRequest(storlet_id, params, metadata, - iter(StringIO()), options=options) - self.assertEqual(metadata, dsreq.user_metadata) - self.assertEqual(params, dsreq.params) - self.assertEqual('Storlet-1.0.jar', dsreq.storlet_id) - self.assertEqual('org.openstack.storlet.Storlet', dsreq.storlet_main) - self.assertEqual([], dsreq.dependencies) - self.assertEqual('java', dsreq.storlet_language) - self.assertIsNone(dsreq.storlet_language_version) - - # storlet_language is not given - options = {'storlet_main': 'org.openstack.storlet.Storlet', - 'file_manager': FakeFileManager('storlet', 'dep')} - with self.assertRaises(ValueError): - DockerStorletRequest(storlet_id, params, metadata, - iter(StringIO()), options=options) - - # storlet_main is not given - options = {'storlet_language': 'java', - 'file_manager': FakeFileManager('storlet', 'dep')} - with self.assertRaises(ValueError): - DockerStorletRequest(storlet_id, params, metadata, - iter(StringIO()), options=options) - - # file_manager is not given - options = {'storlet_main': 'org.openstack.storlet.Storlet', - 'storlet_language': 'java'} - with self.assertRaises(ValueError): - DockerStorletRequest(storlet_id, params, metadata, - iter(StringIO()), options=options) - - # Python - storlet_id = 'storlet.py' - params = {'Param1': 'Value1', 'Param2': 'Value2'} - metadata = {'MetaKey1': 'MetaValue1', 'MetaKey2': 'MetaValue2'} - - # without language version - options = {'storlet_main': 'storlet.Storlet', - 'storlet_language': 'python', - 'file_manager': FakeFileManager('storlet', 'dep')} - dsreq = DockerStorletRequest(storlet_id, params, metadata, - iter(StringIO()), options=options) - self.assertEqual(metadata, dsreq.user_metadata) - self.assertEqual(params, dsreq.params) - self.assertEqual('storlet.py', dsreq.storlet_id) - self.assertEqual('storlet.Storlet', dsreq.storlet_main) - self.assertEqual([], dsreq.dependencies) - self.assertEqual('python', dsreq.storlet_language) - self.assertIsNone(dsreq.storlet_language_version) - - # with language version - options = {'storlet_main': 'storlet.Storlet', - 'storlet_language': 'python', - 'storlet_language_version': '3.6', - 'file_manager': FakeFileManager('storlet', 'dep')} - dsreq = DockerStorletRequest(storlet_id, params, metadata, - iter(StringIO()), options=options) - self.assertEqual(metadata, dsreq.user_metadata) - self.assertEqual(params, dsreq.params) - self.assertEqual('storlet.py', dsreq.storlet_id) - self.assertEqual('storlet.Storlet', dsreq.storlet_main) - self.assertEqual([], dsreq.dependencies) - self.assertEqual('python', dsreq.storlet_language) - self.assertEqual('3.6', dsreq.storlet_language_version) - - def test_init_with_range(self): - storlet_id = 'Storlet-1.0.jar' - params = {} - metadata = {} - options = {'storlet_main': 'org.openstack.storlet.Storlet', - 'storlet_dependency': 'dep1,dep2', - 'storlet_language': 'java', - 'file_manager': FakeFileManager('storlet', 'dep'), - 'range_start': 1, - 'range_end': 6} - dsreq = DockerStorletRequest(storlet_id, params, metadata, - None, 0, options=options) - - self.assertEqual('Storlet-1.0.jar', dsreq.storlet_id) - self.assertEqual('org.openstack.storlet.Storlet', dsreq.storlet_main) - self.assertEqual(['dep1', 'dep2'], dsreq.dependencies) - self.assertEqual('java', dsreq.storlet_language) - self.assertIsNone(dsreq.storlet_language_version) - self.assertEqual(1, dsreq.start) - self.assertEqual(6, dsreq.end) - - options = {'storlet_main': 'org.openstack.storlet.Storlet', - 'storlet_dependency': 'dep1,dep2', - 'storlet_language': 'java', - 'file_manager': FakeFileManager('storlet', 'dep'), - 'range_start': 0, - 'range_end': 0} - dsreq = DockerStorletRequest(storlet_id, params, metadata, - None, 0, options=options) - - self.assertEqual('Storlet-1.0.jar', dsreq.storlet_id) - self.assertEqual('org.openstack.storlet.Storlet', dsreq.storlet_main) - self.assertEqual(['dep1', 'dep2'], dsreq.dependencies) - self.assertEqual('java', dsreq.storlet_language) - self.assertIsNone(dsreq.storlet_language_version) - self.assertEqual(0, dsreq.start) - self.assertEqual(0, dsreq.end) - - def test_has_range(self): - storlet_id = 'Storlet-1.0.jar' - params = {} - metadata = {} - options = {'storlet_main': 'org.openstack.storlet.Storlet', - 'storlet_dependency': 'dep1,dep2', - 'storlet_language': 'java', - 'file_manager': FakeFileManager('storlet', 'dep')} - dsreq = DockerStorletRequest(storlet_id, params, metadata, - None, 0, options=options) - self.assertFalse(dsreq.has_range) - - options = {'storlet_main': 'org.openstack.storlet.Storlet', - 'storlet_dependency': 'dep1,dep2', - 'storlet_language': 'java', - 'file_manager': FakeFileManager('storlet', 'dep'), - 'range_start': 1, - 'range_end': 6} - dsreq = DockerStorletRequest(storlet_id, params, metadata, - None, 0, options=options) - self.assertTrue(dsreq.has_range) - - options = {'storlet_main': 'org.openstack.storlet.Storlet', - 'storlet_dependency': 'dep1,dep2', - 'storlet_language': 'java', - 'file_manager': FakeFileManager('storlet', 'dep'), - 'range_start': 0, - 'range_end': 6} - dsreq = DockerStorletRequest(storlet_id, params, metadata, - None, 0, options=options) - self.assertTrue(dsreq.has_range) - - -class TestStorletDockerGateway(unittest.TestCase): - - def setUp(self): - # TODO(takashi): take these values from config file - self.tempdir = mkdtemp() - self.sconf = { - 'host_root': self.tempdir, - 'swift_dir': self.tempdir, - 'storlet_timeout': '9', - 'storlet_container': 'storlet', - 'storlet_dependency': 'dependency', - 'reseller_prefix': 'AUTH' - } - self.logger = FakeLogger() - - self.storlet_container = self.sconf['storlet_container'] - self.storlet_dependency = self.sconf['storlet_dependency'] - - self.version = 'v1' - self.account = 'AUTH_account' - self.container = 'container' - self.obj = 'object' - self.sobj = 'storlet-1.0.jar' - - self.gateway = StorletGatewayDocker( - self.sconf, self.logger, self.account) - - def tearDown(self): - rmtree(self.tempdir) - - @property - def req_path(self): - return self._create_proxy_path( - self.version, self.account, self.container, - self.obj) - - @property - def storlet_path(self): - return self._create_proxy_path( - self.version, self.account, self.storlet_container, - self.sobj) - - def _create_proxy_path(self, version, account, container, obj): - return '/'.join(['', version, account, container, obj]) - - def test_check_mandatory_params(self): - params = {'keyA': 'valueA', - 'keyB': 'valueB', - 'keyC': 'valueC'} - - # all mandatory headers are included - StorletGatewayDocker._check_mandatory_params( - params, ['keyA', 'keyB']) - - # some of mandatory headers are missing - with self.assertRaises(ValueError): - StorletGatewayDocker._check_mandatory_params( - params, ['keyA', 'KeyD']) - - def test_validate_storlet_registration_java(self): - # correct name and headers w/ dependency - obj = 'storlet-1.0.jar' - params = {'Language': 'java', - 'Interface-Version': '1.0', - 'Dependency': 'dep_file', - 'Object-Metadata': 'no', - 'Main': 'path.to.storlet.class'} - StorletGatewayDocker.validate_storlet_registration(params, obj) - - # correct name and headers w/o dependency - obj = 'storlet-1.0.jar' - params = {'Language': 'java', - 'Interface-Version': '1.0', - 'Object-Metadata': 'no', - 'Main': 'path.to.storlet.class'} - StorletGatewayDocker.validate_storlet_registration(params, obj) - - # some header keys are missing - params = {'Language': 'java', - 'Interface-Version': '1.0', - 'Dependency': 'dep_file', - 'Object-Metadata': 'no'} - with self.assertRaises(ValueError): - StorletGatewayDocker.validate_storlet_registration(params, obj) - - # wrong name - obj = 'storlet.jar' - params = {'Language': 'java', - 'Interface-Version': '1.0', - 'Dependency': 'dep_file', - 'Object-Metadata': 'no', - 'Main': 'path.to.storlet.class'} - with self.assertRaises(ValueError): - StorletGatewayDocker.validate_storlet_registration(params, obj) - - def test_validate_storlet_registration_python(self): - # correct name and headers w/ dependency - obj = 'storlet.py' - params = {'Language': 'python', - 'Language-Version': '3.6', - 'Interface-Version': '1.0', - 'Dependency': 'dep_file', - 'Object-Metadata': 'no', - 'Main': 'storlet.Storlet'} - StorletGatewayDocker.validate_storlet_registration(params, obj) - - # wrong version - obj = 'storlet.py' - params = {'Language': 'python', - 'Language-Version': '1.7', - 'Interface-Version': '1.0', - 'Dependency': 'dep_file', - 'Object-Metadata': 'no', - 'Main': 'storlet.Storlet'} - with self.assertRaises(ValueError): - StorletGatewayDocker.validate_storlet_registration(params, obj) - - # py2 is no more supported - obj = 'storlet.py' - params = {'Language': 'python', - 'Language-Version': '2.7', - 'Interface-Version': '1.0', - 'Dependency': 'dep_file', - 'Object-Metadata': 'no', - 'Main': 'storlet.Storlet'} - with self.assertRaises(ValueError): - StorletGatewayDocker.validate_storlet_registration(params, obj) - - # wrong name - obj = 'storlet.pyfoo' - params = {'Language': 'python', - 'Interface-Version': '1.0', - 'Dependency': 'dep_file', - 'Object-Metadata': 'no', - 'Main': 'storlet.Storlet'} - with self.assertRaises(ValueError): - StorletGatewayDocker.validate_storlet_registration(params, obj) - - # wrong main class - obj = 'storlet.py' - params = {'Language': 'python', - 'Interface-Version': '1.0', - 'Dependency': 'dep_file', - 'Object-Metadata': 'no', - 'Main': 'another_storlet.Storlet'} - with self.assertRaises(ValueError): - StorletGatewayDocker.validate_storlet_registration(params, obj) - - obj = 'storlet.py' - params = {'Language': 'python', - 'Interface-Version': '1.0', - 'Dependency': 'dep_file', - 'Object-Metadata': 'no', - 'Main': 'storlet'} - with self.assertRaises(ValueError): - StorletGatewayDocker.validate_storlet_registration(params, obj) - - obj = 'storlet.py' - params = {'Language': 'python', - 'Interface-Version': '1.0', - 'Dependency': 'dep_file', - 'Object-Metadata': 'no', - 'Main': 'storlet.foo.Storlet'} - with self.assertRaises(ValueError): - StorletGatewayDocker.validate_storlet_registration(params, obj) - - def test_validate_storlet_registration_not_suppoeted(self): - # unsupported language - obj = 'storlet.foo' - params = {'Language': 'bar', - 'Interface-Version': '1.0', - 'Dependency': 'dep_file', - 'Object-Metadata': 'no', - 'Main': 'path.to.storlet.class'} - with self.assertRaises(ValueError): - StorletGatewayDocker.validate_storlet_registration(params, obj) - - # same name for storlet and dependency - obj = 'storlet-1.0.jar' - params = {'Language': 'java', - 'Interface-Version': '1.0', - 'Dependency': 'storlet-1.0.jar', - 'Object-Metadata': 'no', - 'Main': 'path.to.storlet.class'} - with self.assertRaises(ValueError): - StorletGatewayDocker.validate_storlet_registration(params, obj) - - # duplicated name in dependencies - obj = 'storlet-1.0.jar' - params = {'Language': 'java', - 'Interface-Version': '1.0', - 'Dependency': 'dep_file,dep_file', - 'Object-Metadata': 'no', - 'Main': 'path.to.storlet.class'} - with self.assertRaises(ValueError): - StorletGatewayDocker.validate_storlet_registration(params, obj) - - def test_validate_dependency_registration(self): - # w/o dependency parameter - obj = 'dep_file' - params = {'Dependency-Version': '1.0'} - StorletGatewayDocker.validate_dependency_registration(params, obj) - - # w/ correct dependency parameter - params = { - 'Dependency-Permissions': '755', - 'Dependency-Version': '1.0'} - StorletGatewayDocker.validate_dependency_registration(params, obj) - - # w/ wrong dependency parameter - params = { - 'Dependency-Permissions': '400', - 'Dependency-Version': '1.0'} - with self.assertRaises(ValueError): - StorletGatewayDocker.validate_dependency_registration(params, obj) - - # w/ invalid dependency parameter - params = { - 'Dependency-Permissions': 'foo', - 'Dependency-Version': '1.0'} - with self.assertRaises(ValueError): - StorletGatewayDocker.validate_dependency_registration(params, obj) - params = { - 'Dependency-Permissions': '888', - 'Dependency-Version': '1.0'} - with self.assertRaises(ValueError): - StorletGatewayDocker.validate_dependency_registration(params, obj) - - def _test_docker_gateway_communicate(self, extra_sources=None): - extra_sources = extra_sources or [] - options = {'generate_log': False, - 'scope': 'AUTH_account', - 'storlet_main': 'org.openstack.storlet.Storlet', - 'storlet_dependency': 'dep1,dep2', - 'storlet_language': 'java', - 'file_manager': FakeFileManager('storlet', 'dep')} - - st_req = DockerStorletRequest( - storlet_id=self.sobj, - params={}, - user_metadata={}, - data_iter=iter('body'), options=options) - - # TODO(kota_): need more efficient way for emuration of return value - # from SDaemon - value_generator = iter([ - # first, we get metadata json - json.dumps({'metadata': 'return'}), - # then we get object data - 'something', '', - ]) - - def mock_read(fd, size): - try: - value = next(value_generator) - except StopIteration: - raise Exception('called more then expected') - # NOTE(takashi): Make sure that we return bytes in PY3 - return value.encode('utf-8') - - def mock_close(fd): - pass - - called_fd_and_bodies = [] - invocation_protocol = \ - 'storlets.gateway.gateways.docker.runtime.' \ - 'StorletInvocationProtocol._write_input_data' - - def mock_writer(self, fd, app_iter): - body = '' - for chunk in app_iter: - body += chunk - called_fd_and_bodies.append((fd, body)) - - # prepare nested mock patch - # SBus -> mock SBus.send() for container communication - # os.read -> mock reading the file descriptor from container - # select.select -> mock fd communication which can be readable - @mock.patch('storlets.gateway.gateways.docker.runtime.SBusClient') - @mock.patch('storlets.gateway.gateways.docker.runtime.os.read', - mock_read) - @mock.patch('storlets.gateway.gateways.docker.runtime.os.close', - mock_close) - @mock.patch('storlets.gateway.gateways.docker.runtime.select.select', - lambda r, w, x, timeout=None: (r, w, x)) - @mock.patch('storlets.gateway.common.stob.os.read', mock_read) - @mock.patch(invocation_protocol, mock_writer) - def test_invocation_flow(client): - client.ping.return_value = SBusResponse(True, 'OK') - client.stop_daemon.return_value = SBusResponse(True, 'OK') - client.start_daemon.return_value = SBusResponse(True, 'OK') - client.execute.return_value = SBusResponse(True, 'OK', 'someid') - - sresp = self.gateway.invocation_flow(st_req, extra_sources) - eventlet.sleep(0.1) - file_like = FileLikeIter(sresp.data_iter) - self.assertEqual(b'something', file_like.read()) - - # I hate the decorator to return an instance but to track current - # implementation, we have to make a mock class for this. Need to fix. - - class MockFileManager(object): - def get_storlet(self, req): - return BytesIO(b'mock'), None - - def get_dependency(self, req): - return BytesIO(b'mock'), None - - st_req.file_manager = MockFileManager() - - test_invocation_flow() - - # ensure st_req.app_iter is drawn - self.assertRaises(StopIteration, next, st_req.data_iter) - expected_mock_writer_calls = len(extra_sources) + 1 - self.assertEqual(expected_mock_writer_calls, - len(called_fd_and_bodies)) - self.assertEqual('body', called_fd_and_bodies[0][1]) - return called_fd_and_bodies - - def test_docker_gateway_communicate(self): - self._test_docker_gateway_communicate() - - def test_docker_gateway_communicate_with_extra_sources(self): - options = {'generate_log': False, - 'scope': 'AUTH_account', - 'storlet_main': 'org.openstack.storlet.Storlet', - 'storlet_dependency': 'dep1,dep2', - 'storlet_language': 'java', - 'file_manager': FakeFileManager('storlet', 'dep')} - - data_sources = [] - - def generate_extra_st_request(): - # This works similarly with build_storlet_request - # TODO(kota_): think of more generarl way w/o - # build_storlet_request - sw_req = Request.blank( - self.req_path, environ={'REQUEST_METHOD': 'GET'}, - headers={'X-Run-Storlet': self.sobj}) - - sw_resp = Response( - app_iter=iter(['This is a response body']), status=200) - - st_req = DockerStorletRequest( - storlet_id=sw_req.headers['X-Run-Storlet'], - params=sw_req.params, - user_metadata={}, - data_iter=sw_resp.app_iter, options=options) - data_sources.append(sw_resp.app_iter) - return st_req - - extra_request = generate_extra_st_request() - mock_calls = self._test_docker_gateway_communicate( - extra_sources=[extra_request]) - self.assertEqual('This is a response body', mock_calls[1][1]) - - # run all existing eventlet threads - for app_iter in data_sources: - # ensure all app_iters are drawn - self.assertRaises(StopIteration, next, app_iter) + gateway_class = DockerStorletGateway if __name__ == '__main__': diff --git a/tests/unit/gateway/gateways/docker/test_runtime.py b/tests/unit/gateway/gateways/docker/test_runtime.py index 459788a0..7a8bd6ab 100644 --- a/tests/unit/gateway/gateways/docker/test_runtime.py +++ b/tests/unit/gateway/gateways/docker/test_runtime.py @@ -13,12 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -from contextlib import contextmanager -import errno -from io import StringIO -import os -from stat import ST_MODE -import tempfile import unittest from unittest import mock @@ -31,266 +25,57 @@ from storlets.sbus.client.exceptions import SBusClientIOError, \ SBusClientMalformedResponse, SBusClientSendError from storlets.gateway.common.exceptions import StorletRuntimeException, \ StorletTimeout -from storlets.gateway.gateways.docker.gateway import DockerStorletRequest -from storlets.gateway.gateways.docker.runtime import RunTimeSandbox, \ - RunTimePaths, StorletInvocationProtocol -from tests.unit import FakeLogger, with_tempdir -from tests.unit.gateway.gateways import FakeFileManager +from storlets.gateway.gateways.docker.runtime import DockerRunTimeSandbox +from tests.unit import FakeLogger -@contextmanager -def _mock_os_pipe(bufs): - class FakeFd(object): - def __init__(self, rbuf=''): - self.rbuf = rbuf.encode('utf-8') - self.closed = False - - def read(self, size): - size = min(len(self.rbuf), size) - ret = self.rbuf[:size] - self.rbuf = self.rbuf[size:] - return ret - - def close(self): - if self.closed: - raise OSError(errno.EBADF, os.strerror(errno.EBADF)) - self.closed = True - - def fake_os_read(fd, size): - return fd.read(size) - - def fake_os_close(fd): - fd.close() - - pipes = [(FakeFd(buf), FakeFd()) for buf in bufs] - pipe_generator = iter(pipes) - - def mock_os_pipe(): - try: - return next(pipe_generator) - except StopIteration: - raise AssertionError('pipe called more than expected') - - with mock.patch('storlets.gateway.gateways.docker.runtime.os.pipe', - mock_os_pipe), \ - mock.patch('storlets.gateway.gateways.docker.runtime.os.read', - fake_os_read) as fake_os_read,\ - mock.patch('storlets.gateway.gateways.docker.runtime.os.close', - fake_os_close) as fake_os_close: - yield pipes - - -class TestRuntimePaths(unittest.TestCase): - - def setUp(self): - self.scope = '0123456789abc' - self._initialize() - - def _initialize(self): - # TODO(takashi): take these values from config file - base_dir = '/var/lib/storlets' - self.script_dir = os.path.join(base_dir, 'scripts') - self.pipes_dir = os.path.join(base_dir, 'pipes', 'scopes') - self.storlets_dir = os.path.join(base_dir, 'storlets', 'scopes') - self.log_dir = os.path.join(base_dir, 'logs', 'scopes') - self.cache_dir = os.path.join(base_dir, 'cache', 'scopes') - - self.conf = {} - self.storlet_id = 'org.openstack.storlet.mystorlet' - self.paths = RunTimePaths(self.scope, self.conf) - - def tearDown(self): - pass - - def test_host_pipe_dir(self): - self.assertEqual( - os.path.join(self.pipes_dir, self.scope), - self.paths.host_pipe_dir) - - def test_create_host_pipe_dir(self): - pipedir = self.paths.host_pipe_dir - - # When the directory exists - with mock.patch('os.path.exists', return_value=True), \ - mock.patch('os.makedirs') as m, \ - mock.patch('os.chmod') as c: - self.assertEqual(os.path.join(self.pipes_dir, self.scope), - self.paths.create_host_pipe_dir()) - self.assertEqual(0, m.call_count) - cargs, ckwargs = c.call_args - # Make sure about the target directory - self.assertEqual(cargs[0], pipedir) - - # When the directory does not exist - with mock.patch('os.path.exists', return_value=False), \ - mock.patch('os.makedirs') as m, \ - mock.patch('os.chmod') as c: - self.assertEqual(os.path.join(self.pipes_dir, self.scope), - self.paths.create_host_pipe_dir()) - self.assertEqual(1, m.call_count) - # Make sure about the target directory - margs, mkwargs = m.call_args - self.assertEqual(margs[0], pipedir) - cargs, ckwargs = c.call_args - self.assertEqual(cargs[0], pipedir) - - def test_host_factory_pipe(self): - self.assertEqual( - self.paths.host_factory_pipe, - os.path.join(self.pipes_dir, self.scope, 'factory_pipe')) - - def test_get_host_storlet_pipe(self): - self.assertEqual( - os.path.join(self.pipes_dir, self.scope, self.storlet_id), - self.paths.get_host_storlet_pipe(self.storlet_id)) - - def test_get_sbox_storlet_pipe(self): - self.assertEqual( - os.path.join('/mnt/channels', self.storlet_id), - self.paths.get_sbox_storlet_pipe(self.storlet_id)) - - def test_get_sbox_storlet_dir(self): - self.assertEqual( - os.path.join('/home/swift', self.storlet_id), - self.paths.get_sbox_storlet_dir(self.storlet_id)) - - def test_host_storlet_base_dir(self): - self.assertEqual( - self.paths.host_storlet_base_dir, - os.path.join(self.storlets_dir, self.scope)) - - def test_get_host_storlet_dir(self): - self.assertEqual( - os.path.join(self.storlets_dir, self.scope, self.storlet_id), - self.paths.get_host_storlet_dir(self.storlet_id)) - - def test_get_host_slog_path(self): - self.assertEqual( - os.path.join(self.log_dir, self.scope, self.storlet_id, - 'storlet_invoke.log'), - self.paths.get_host_slog_path(self.storlet_id)) - - def test_host_storlet_cache_dir(self): - self.assertEqual( - os.path.join(self.cache_dir, self.scope, 'storlet'), - self.paths.host_storlet_cache_dir) - - def test_host_dependency_cache_dir(self): - self.assertEqual( - os.path.join(self.cache_dir, self.scope, 'dependency'), - self.paths.host_dependency_cache_dir) - - def test_runtime_paths_default(self): - # CHECK: docs says we need 4 dirs for communicate - # ==================================================================== - # |1| host_factory_pipe_path | //factory_pipe | - # ==================================================================== - # |2| host_storlet_pipe_path | // | - # ==================================================================== - # |3| sandbox_factory_pipe_path | /mnt/channels/factory_pipe | - # ==================================================================== - # |4| sandbox_storlet_pipe_path | /mnt/channels/ | - # ==================================================================== - # - # With this test, the scope value is "account" and the storlet_id is - # "Storlet-1.0.jar" (app name?) - # ok, let's check for these values - - runtime_paths = RunTimePaths('account', {}) - storlet_id = 'Storlet-1.0.jar' - - # For pipe - self.assertEqual('/var/lib/storlets/pipes/scopes/account', - runtime_paths.host_pipe_dir) - - # 1. host_factory_pipe_path //factory_pipe - self.assertEqual( - '/var/lib/storlets/pipes/scopes/account/factory_pipe', - runtime_paths.host_factory_pipe) - # 2. host_storlet_pipe_path // - self.assertEqual( - '/var/lib/storlets/pipes/scopes/account/Storlet-1.0.jar', - runtime_paths.get_host_storlet_pipe(storlet_id)) - # 3. Yes, right now, we don't have the path for #3 in Python - # 4. sandbox_storlet_pipe_path | /mnt/channels/ - self.assertEqual('/mnt/channels/Storlet-1.0.jar', - runtime_paths.get_sbox_storlet_pipe(storlet_id)) - - # This looks like for jar load? - self.assertEqual('/var/lib/storlets/storlets/scopes/account', - runtime_paths.host_storlet_base_dir) - self.assertEqual( - '/var/lib/storlets/storlets/scopes/account/Storlet-1.0.jar', - runtime_paths.get_host_storlet_dir(storlet_id)) - # And this one is a mount point in sand box? - self.assertEqual('/home/swift/Storlet-1.0.jar', - runtime_paths.get_sbox_storlet_dir(storlet_id)) - - @with_tempdir - def test_create_host_pipe_dir_with_real_dir(self, temp_dir): - runtime_paths = RunTimePaths('account', {'host_root': temp_dir}) - runtime_paths.create_host_pipe_dir() - path = runtime_paths.host_pipe_dir - self.assertTrue(os.path.exists(path)) - self.assertTrue(os.path.isdir(path)) - permission = oct(os.stat(path)[ST_MODE])[-3:] - # TODO(kota_): make sure if this is really acceptable - self.assertEqual('777', permission) - - -class TestRuntimePathsTempauth(TestRuntimePaths): - def setUp(self): - self.scope = 'test' - self._initialize() - - -class TestRunTimeSandbox(unittest.TestCase): +class TestDockerRunTimeSandbox(unittest.TestCase): def setUp(self): self.logger = FakeLogger() # TODO(takashi): take these values from config file self.conf = {'container_image_namespace': 'localhost:5001', 'default_container_image_name': 'defaultimage'} self.scope = '0123456789abc' - self.sbox = RunTimeSandbox(self.scope, self.conf, self.logger) + self.sbox = DockerRunTimeSandbox(self.scope, self.conf, self.logger) def test_ping(self): - with mock.patch('storlets.gateway.gateways.docker.runtime.' + with mock.patch('storlets.gateway.gateways.container.runtime.' 'SBusClient.ping') as ping: ping.return_value = SBusResponse(True, 'OK') self.assertTrue(self.sbox.ping()) - with mock.patch('storlets.gateway.gateways.docker.runtime.' + with mock.patch('storlets.gateway.gateways.container.runtime.' 'SBusClient.ping') as ping: ping.return_value = SBusResponse(False, 'Error') self.assertFalse(self.sbox.ping()) - with mock.patch('storlets.gateway.gateways.docker.runtime.' + with mock.patch('storlets.gateway.gateways.container.runtime.' 'SBusClient.ping') as ping: ping.side_effect = SBusClientSendError() self.assertFalse(self.sbox.ping()) - with mock.patch('storlets.gateway.gateways.docker.runtime.' + with mock.patch('storlets.gateway.gateways.container.runtime.' 'SBusClient.ping') as ping: ping.side_effect = SBusClientMalformedResponse() self.assertFalse(self.sbox.ping()) - with mock.patch('storlets.gateway.gateways.docker.runtime.' + with mock.patch('storlets.gateway.gateways.container.runtime.' 'SBusClient.ping') as ping: ping.side_effect = SBusClientIOError() self.assertFalse(self.sbox.ping()) def test_wait(self): - with mock.patch('storlets.gateway.gateways.docker.runtime.' + with mock.patch('storlets.gateway.gateways.container.runtime.' 'SBusClient.ping') as ping, \ - mock.patch('storlets.gateway.gateways.docker.runtime.' + mock.patch('storlets.gateway.gateways.container.runtime.' 'time.sleep') as sleep: ping.return_value = SBusResponse(True, 'OK') self.sbox.wait() self.assertEqual(sleep.call_count, 0) - with mock.patch('storlets.gateway.gateways.docker.runtime.' + with mock.patch('storlets.gateway.gateways.container.runtime.' 'SBusClient.ping') as ping, \ - mock.patch('storlets.gateway.gateways.docker.runtime.' + mock.patch('storlets.gateway.gateways.container.runtime.' 'time.sleep') as sleep: ping.side_effect = [SBusResponse(False, 'Error'), SBusResponse(True, 'OK')] @@ -433,24 +218,24 @@ class TestRunTimeSandbox(unittest.TestCase): self.assertEqual(0, mock_containers.run.call_count) def test_restart(self): - with mock.patch('storlets.gateway.gateways.docker.runtime.' + with mock.patch('storlets.gateway.gateways.container.runtime.' 'RunTimePaths.create_host_pipe_dir') as pipe_dir, \ mock.patch('storlets.gateway.gateways.docker.runtime.' - 'RunTimeSandbox._restart') as _restart, \ + 'DockerRunTimeSandbox._restart') as _restart, \ mock.patch('storlets.gateway.gateways.docker.runtime.' - 'RunTimeSandbox.wait') as wait: + 'DockerRunTimeSandbox.wait') as wait: self.sbox.restart() self.assertEqual(1, pipe_dir.call_count) self.assertEqual(1, _restart.call_count) self.assertEqual((self.scope,), _restart.call_args.args) self.assertEqual(1, wait.call_count) - with mock.patch('storlets.gateway.gateways.docker.runtime.' + with mock.patch('storlets.gateway.gateways.container.runtime.' 'RunTimePaths.create_host_pipe_dir') as pipe_dir, \ mock.patch('storlets.gateway.gateways.docker.runtime.' - 'RunTimeSandbox._restart') as _restart, \ + 'DockerRunTimeSandbox._restart') as _restart, \ mock.patch('storlets.gateway.gateways.docker.runtime.' - 'RunTimeSandbox.wait') as wait: + 'DockerRunTimeSandbox.wait') as wait: _restart.side_effect = [StorletRuntimeException(), None] self.sbox.restart() self.assertEqual(1, pipe_dir.call_count) @@ -461,12 +246,12 @@ class TestRunTimeSandbox(unittest.TestCase): _restart.call_args_list[1].args) self.assertEqual(1, wait.call_count) - with mock.patch('storlets.gateway.gateways.docker.runtime.' + with mock.patch('storlets.gateway.gateways.container.runtime.' 'RunTimePaths.create_host_pipe_dir') as pipe_dir, \ mock.patch('storlets.gateway.gateways.docker.runtime.' - 'RunTimeSandbox._restart') as _restart, \ + 'DockerRunTimeSandbox._restart') as _restart, \ mock.patch('storlets.gateway.gateways.docker.runtime.' - 'RunTimeSandbox.wait') as wait: + 'DockerRunTimeSandbox.wait') as wait: _restart.side_effect = StorletTimeout() with self.assertRaises(StorletRuntimeException): self.sbox.restart() @@ -487,173 +272,5 @@ class TestRunTimeSandbox(unittest.TestCase): dependencies),) -class TestStorletInvocationProtocol(unittest.TestCase): - def setUp(self): - self.pipe_path = tempfile.mktemp() - self.log_file = tempfile.mktemp() - self.logger = FakeLogger() - self.storlet_id = 'Storlet-1.0.jar' - self.options = {'storlet_main': 'org.openstack.storlet.Storlet', - 'storlet_dependency': 'dep1,dep2', - 'storlet_language': 'java', - 'file_manager': FakeFileManager('storlet', 'dep')} - storlet_request = DockerStorletRequest( - self.storlet_id, {}, {}, iter(StringIO()), options=self.options) - self.protocol = StorletInvocationProtocol( - storlet_request, self.pipe_path, self.log_file, 1, self.logger) - - def tearDown(self): - for path in [self.pipe_path, self.log_file]: - try: - os.unlink(path) - except OSError: - pass - - def test_send_execute_command(self): - with mock.patch('storlets.gateway.gateways.docker.runtime.SBusClient.' - 'execute') as execute: - execute.return_value = SBusResponse(True, 'OK', 'someid') - with self.protocol.storlet_logger.activate(): - self.protocol._send_execute_command() - self.assertEqual('someid', self.protocol.task_id) - - with mock.patch('storlets.gateway.gateways.docker.runtime.SBusClient.' - 'execute') as execute: - execute.return_value = SBusResponse(True, 'OK') - with self.assertRaises(StorletRuntimeException): - with self.protocol.storlet_logger.activate(): - self.protocol._send_execute_command() - - with mock.patch('storlets.gateway.gateways.docker.runtime.SBusClient.' - 'execute') as execute: - execute.return_value = SBusResponse(False, 'NG', 'someid') - with self.assertRaises(StorletRuntimeException): - with self.protocol.storlet_logger.activate(): - self.protocol._send_execute_command() - - with mock.patch('storlets.gateway.gateways.docker.runtime.SBusClient.' - 'execute') as execute: - execute.side_effect = SBusClientIOError() - with self.assertRaises(StorletRuntimeException): - with self.protocol.storlet_logger.activate(): - self.protocol._send_execute_command() - - def test_invocation_protocol(self): - # os.pipe will be called 3 times - pipe_called = 3 - - with _mock_os_pipe([''] * pipe_called) as pipes: - with mock.patch.object(self.protocol, - '_wait_for_read_with_timeout'), \ - mock.patch.object(self.protocol, '_send_execute_command'): - self.protocol._invoke() - - self.assertEqual(pipe_called, len(pipes)) - pipes = iter(pipes) - - # data write is not directly closed - # data read is closed - input_data_read_fd, input_data_write_fd = next(pipes) - self.assertTrue(input_data_read_fd.closed) - self.assertFalse(input_data_write_fd.closed) - - # data write is closed but data read is still open - data_read_fd, data_write_fd = next(pipes) - self.assertFalse(data_read_fd.closed) - self.assertTrue(data_write_fd.closed) - - # metadata write fd is closed, metadata read fd is still open. - metadata_read_fd, metadata_write_fd = next(pipes) - self.assertFalse(metadata_read_fd.closed) - self.assertTrue(metadata_write_fd.closed) - - # sanity - self.assertRaises(StopIteration, next, pipes) - - def test_invocation_protocol_remote_fds(self): - # In default, we have 4 fds in remote_fds - storlet_request = DockerStorletRequest( - self.storlet_id, {}, {}, iter(StringIO()), options=self.options) - protocol = StorletInvocationProtocol( - storlet_request, self.pipe_path, self.log_file, 1, self.logger) - with protocol.storlet_logger.activate(): - self.assertEqual(4, len(protocol.remote_fds)) - - # extra_resources expands the remote_fds - storlet_request = DockerStorletRequest( - self.storlet_id, {}, {}, iter(StringIO()), options=self.options) - protocol = StorletInvocationProtocol( - storlet_request, self.pipe_path, self.log_file, 1, self.logger, - extra_sources=[storlet_request]) - with protocol.storlet_logger.activate(): - self.assertEqual(5, len(protocol.remote_fds)) - - # 2 more extra_resources expands the remote_fds - storlet_request = DockerStorletRequest( - self.storlet_id, {}, {}, iter(StringIO()), options=self.options) - protocol = StorletInvocationProtocol( - storlet_request, self.pipe_path, self.log_file, 1, self.logger, - extra_sources=[storlet_request] * 3) - with protocol.storlet_logger.activate(): - self.assertEqual(7, len(protocol.remote_fds)) - - def test_open_writer_with_invalid_fd(self): - invalid_fds = ( - (None, TypeError), (-1, ValueError), ('blah', TypeError)) - - for invalid_fd, expected_error in invalid_fds: - with self.assertRaises(expected_error): - with self.protocol._open_writer(invalid_fd): - pass - - def _test_writer_with_exception(self, exception_cls): - pipes = [os.pipe()] - - def raise_in_the_context(): - with self.protocol._open_writer(pipes[0][1]): - raise exception_cls() - try: - # writer context doesn't suppress any exception - self.assertRaises(exception_cls, raise_in_the_context) - - # since _open_writer closes the write fd, the os.close will fail as - # BadFileDescriptor - with self.assertRaises(OSError) as os_error: - os.close(pipes[0][1]) - self.assertEqual(9, os_error.exception.errno) - - finally: - for fd in pipes[0]: - try: - os.close(fd) - except OSError: - pass - - def test_writer_raise_while_in_writer_context(self): - # basic storlet timeout - self._test_writer_with_exception(StorletTimeout) - # unexpected IOError - self._test_writer_with_exception(IOError) - # else - self._test_writer_with_exception(Exception) - - -class TestStorletInvocationProtocolPython(TestStorletInvocationProtocol): - def setUp(self): - self.pipe_path = tempfile.mktemp() - self.log_file = tempfile.mktemp() - self.logger = FakeLogger() - self.storlet_id = 'Storlet-1.0.py' - self.options = {'storlet_main': 'storlet.Storlet', - 'storlet_dependency': 'dep1,dep2', - 'storlet_language': 'python', - 'language_version': '3.6', - 'file_manager': FakeFileManager('storlet', 'dep')} - storlet_request = DockerStorletRequest( - self.storlet_id, {}, {}, iter(StringIO()), options=self.options) - self.protocol = StorletInvocationProtocol( - storlet_request, self.pipe_path, self.log_file, 1, self.logger) - - if __name__ == '__main__': unittest.main() diff --git a/tests/unit/gateway/test_loader.py b/tests/unit/gateway/test_loader.py index 715960ac..ef072236 100644 --- a/tests/unit/gateway/test_loader.py +++ b/tests/unit/gateway/test_loader.py @@ -15,8 +15,8 @@ import unittest from storlets.gateway.common.exceptions import StorletGatewayLoadError from storlets.gateway.loader import load_gateway -from storlets.gateway.gateways.stub import StorletGatewayStub -from storlets.gateway.gateways.docker import StorletGatewayDocker +from storlets.gateway.gateways.stub import StubStorletGateway +from storlets.gateway.gateways.docker import DockerStorletGateway class TestLoader(unittest.TestCase): @@ -26,11 +26,11 @@ class TestLoader(unittest.TestCase): def test_load_gateway_entry_point(self): # existing entry point self.assertEqual( - StorletGatewayStub, + StubStorletGateway, load_gateway('stub')) self.assertEqual( - StorletGatewayDocker, + DockerStorletGateway, load_gateway('docker')) # If the given entry point does not exist @@ -40,18 +40,18 @@ class TestLoader(unittest.TestCase): def test_load_gateway_full_class_path(self): # If the given class path exists self.assertEqual( - StorletGatewayStub, - load_gateway('storlets.gateway.gateways.stub.StorletGatewayStub')) + StubStorletGateway, + load_gateway('storlets.gateway.gateways.stub.StubStorletGateway')) self.assertEqual( - StorletGatewayDocker, + DockerStorletGateway, load_gateway('storlets.gateway.gateways.docker.' - 'StorletGatewayDocker')) + 'DockerStorletGateway')) # If module does not exist with self.assertRaises(StorletGatewayLoadError): load_gateway('storlets.gateway.gateways.another_stub.' - 'StorletGatewayStub') + 'StubStorletGateway') # If class does not exist with self.assertRaises(StorletGatewayLoadError): diff --git a/tests/unit/swift_middleware/handlers/__init__.py b/tests/unit/swift_middleware/handlers/__init__.py index c0d8faf6..5c9320c8 100644 --- a/tests/unit/swift_middleware/handlers/__init__.py +++ b/tests/unit/swift_middleware/handlers/__init__.py @@ -16,7 +16,7 @@ import unittest from unittest import mock -from storlets.gateway.gateways.stub import StorletGatewayStub +from storlets.gateway.gateways.stub import StubStorletGateway from storlets.swift_middleware import storlet_handler from tests.unit import FakeLogger @@ -25,7 +25,7 @@ from tests.unit.swift_middleware import FakeApp def create_handler_config(exec_server): return {'execution_server': exec_server, - 'gateway_module': StorletGatewayStub} + 'gateway_module': StubStorletGateway} class BaseTestStorletMiddleware(unittest.TestCase):