Remove integration tests from saharaclient

These tests are not gated anymore, we are using sahara-tests
framework for testing sahara and client. Also, removing several
test requirements since them are no longer used in the code
there was usages only in the tests.

Change-Id: I861d5528a7b0f66a7df14d1456b639724a620301
This commit is contained in:
Vitaly Gridnev 2016-02-27 18:20:34 +03:00
parent 6a5f3e1289
commit c279df9d8a
23 changed files with 0 additions and 1888 deletions

View File

@ -1,209 +0,0 @@
# Copyright (c) 2014 Red Hat Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import os
import sys
from oslo_config import cfg
def singleton(cls):
instances = {}
def get_instance():
if cls not in instances:
instances[cls] = cls()
return instances[cls]
return get_instance
COMMON_CONFIG_GROUP = cfg.OptGroup(name='COMMON')
COMMON_CONFIG_OPTS = [
cfg.StrOpt('OS_USERNAME',
default='admin',
help='Username for OpenStack.'),
cfg.StrOpt('OS_PASSWORD',
default='admin',
help='Password for OpenStack.'),
cfg.StrOpt('OS_TENANT_NAME',
default='admin',
help='Tenant name for OpenStack.'),
cfg.StrOpt('OS_AUTH_URL',
default='http://127.0.0.1:5000/v2.0',
help='URL for OpenStack.'),
cfg.StrOpt('OS_PROJECT_NAME',
default='admin',
help='Project name for OpenStack.'),
cfg.StrOpt('BYPASS_URL',
help='The BYPASS_URL value to pass to the cli'),
cfg.StrOpt('SWIFT_AUTH_VERSION',
default=2,
help='OpenStack auth version for Swift.'),
cfg.IntOpt('CLUSTER_CREATION_TIMEOUT',
default=10,
help='Cluster creation timeout (in minutes); '
'minimal value is 1.'),
cfg.StrOpt('USER_KEYPAIR_ID',
help='A keypair id to use during cluster launch. '
'If the id is left blank, an id will be generated. '
'If the id names an existing keypair, that keypair will '
'be used. If the named keypair does not exist, it will be '
'created and deleted after the test.'),
cfg.IntOpt('DELAY_AFTER_ACTIVE',
default=2,
help='Length of time (in minutes) to '
'wait after cluster is active before running jobs.'),
cfg.StrOpt('FLOATING_IP_POOL',
help='Pool name for floating IPs. If Sahara uses Nova '
'management network and auto assignment of IPs was '
'enabled then you should leave default value of this '
'parameter. If auto assignment was not enabled, then you '
'should specify value (floating IP pool name) of this '
'parameter. If Sahara uses Neutron management network, '
'then you should always specify value (floating IP pool '
'name) of this parameter.'),
cfg.BoolOpt('NEUTRON_ENABLED',
default=False,
help='If Sahara uses Nova management network, then you '
'should leave default value of this flag. If Sahara '
'uses Neutron management network, then you should set '
'this flag to True and specify values of the following '
'parameters: FLOATING_IP_POOL and '
'INTERNAL_NEUTRON_NETWORK.'),
cfg.StrOpt('INTERNAL_NEUTRON_NETWORK',
default='private',
help='Name for internal Neutron network.'),
cfg.IntOpt('JOB_LAUNCH_TIMEOUT',
default=10,
help='Job launch timeout (in minutes); '
'minimal value is 1.'),
cfg.BoolOpt('INTERNAL_JOB_BINARIES',
default=True,
help='Store job binary data in the sahara '
'internal database. If this option is set to '
'False, job binary data will be stored in swift.'),
cfg.StrOpt('CLUSTER_NAME',
default='test',
help='Name for cluster.')
]
def general_cluster_config_opts(plugin_text, plugin_name, hadoop_ver,
skip_all=False):
return [
cfg.StrOpt('EXISTING_CLUSTER_ID',
help='The id of an existing active cluster '
'to use for the test instead of building one. '
'Cluster teardown will be skipped. This has priority '
'over EXISTING_CLUSTER_NAME'),
cfg.StrOpt('EXISTING_CLUSTER_NAME',
help='The name of an existing active cluster '
'to use for the test instead of building one. '
'Cluster teardown will be skipped. This is superseded '
'by EXISTING_CLUSTER_ID'),
cfg.StrOpt('IMAGE_ID',
help='ID for image which is used for cluster creation. '
'You can also specify image name or tag of image instead '
'of image ID. If you do not specify image related '
'parameters then the image for cluster creation will be '
'chosen by tag "sahara_i_tests".'),
cfg.StrOpt('IMAGE_NAME',
help='Name for image which is used for cluster creation. '
'You can also specify image ID or tag of image instead of '
'image name. If you do not specify image related '
'parameters then the image for cluster creation will be '
'chosen by tag "sahara_i_tests".'),
cfg.StrOpt('IMAGE_TAG',
help='Tag for image which is used for cluster creation. '
'You can also specify image ID or image name instead of '
'the image tag. If you do not specify image related '
'parameters, then the image for cluster creation will be '
'chosen by the tag "sahara_i_tests".'),
cfg.StrOpt('SSH_USERNAME',
help='Username used to log into a cluster node via SSH.'),
cfg.StrOpt('HADOOP_VERSION',
default='%s' % hadoop_ver,
help='Version of Hadoop'),
cfg.StrOpt('PLUGIN_NAME',
default='%s' % plugin_name,
help='Name of plugin'),
cfg.BoolOpt('SKIP_ALL_TESTS_FOR_PLUGIN',
default=skip_all,
help='If this flag is True, then all tests for the %s '
'plugin will be skipped.' % plugin_text),
cfg.BoolOpt('SKIP_CLUSTER_TEARDOWN',
default=False,
help='Skip tearing down the cluster. If an existing '
'cluster is used it will never be torn down by the test.'),
cfg.BoolOpt('SKIP_JAVA_EDP_TEST', default=False),
cfg.BoolOpt('SKIP_MAPREDUCE_EDP_TEST', default=False),
cfg.BoolOpt('SKIP_MAPREDUCE_STREAMING_EDP_TEST', default=False),
cfg.BoolOpt('SKIP_PIG_EDP_TEST', default=False)
]
VANILLA_CONFIG_GROUP = cfg.OptGroup(name='VANILLA')
VANILLA_CONFIG_OPTS = general_cluster_config_opts("Vanilla",
"vanilla", "1.2.1")
VANILLA2_CONFIG_GROUP = cfg.OptGroup(name='VANILLA2')
VANILLA2_CONFIG_OPTS = general_cluster_config_opts("Vanilla2",
"vanilla", "2.3.0",
skip_all=True)
HDP_CONFIG_GROUP = cfg.OptGroup(name='HDP')
HDP_CONFIG_OPTS = general_cluster_config_opts("HDP",
"hdp", "1.3.2",
skip_all=True)
def register_config(config, config_group, config_opts):
config.register_group(config_group)
config.register_opts(config_opts, config_group)
@singleton
class ITConfig(object):
def __init__(self):
config = 'itest.conf'
config_files = []
config_path = '%s/saharaclient/tests/integration/configs/%s'
if not os.path.exists(config_path % (os.getcwd(), config)):
message = ('\n**************************************************'
'\nINFO: Configuration file "%s" not found *\n'
'**************************************************'
% config)
print(message, file=sys.stderr)
else:
config = os.path.join(
config_path % (os.getcwd(), config)
)
config_files.append(config)
register_config(cfg.CONF, COMMON_CONFIG_GROUP, COMMON_CONFIG_OPTS)
register_config(cfg.CONF, VANILLA_CONFIG_GROUP, VANILLA_CONFIG_OPTS)
register_config(cfg.CONF, VANILLA2_CONFIG_GROUP, VANILLA2_CONFIG_OPTS)
register_config(cfg.CONF, HDP_CONFIG_GROUP, HDP_CONFIG_OPTS)
cfg.CONF(
[], project='Sahara_client_integration_tests',
default_config_files=config_files
)
self.common_config = cfg.CONF.COMMON
self.vanilla_config = cfg.CONF.VANILLA
self.hdp_config = cfg.CONF.HDP
self.vanilla2_config = cfg.CONF.VANILLA2

View File

@ -1,13 +0,0 @@
[COMMON]
OS_USERNAME = 'admin'
OS_PASSWORD = 'admin'
OS_TENANT_NAME = 'admin'
OS_PROJECT_NAME = 'admin'
OS_AUTH_URL = 'http://127.0.0.1:5000/v2.0'
[VANILLA]
[VANILLA2]
[HDP]

View File

@ -1,273 +0,0 @@
[COMMON]
# Username for OpenStack (string value)
#OS_USERNAME = 'admin'
# Password for OpenStack (string value)
#OS_PASSWORD = 'admin'
# Tenant name for OpenStack (string value)
#OS_TENANT_NAME = 'admin'
# URL for OpenStack (string value)
#OS_AUTH_URL = 'http://127.0.0.1:5000/v2.0'
# OpenStack auth version for Swift (string value)
#SWIFT_AUTH_VERSION = 2
# Project name for OpenStack (string value)
#OS_PROJECT_NAME = 'admin'
# The BYPASS_URL value to pass to the cli (string value)
#BYPASS_URL = <None>
# Cluster creation timeout (in minutes); minimal value is 1 (integer value)
#CLUSTER_CREATION_TIMEOUT = 10
# A keypair id to use during cluster launch.
# If the id is left blank, an id will be generated. '
# If the id names an existing keypair, that keypair will
# be used. If the named keypair does not exist, it will be
# created and deleted after the test (string value)
#USER_KEYPAIR_ID = <None>
# Length of time (in minutes) to wait after cluster is active
# before running jobs (integer value)
#DELAY_AFTER_ACTIVE = 2
# Pool name for floating IPs. If Sahara uses Nova
# management network and auto assignment of IPs was
# enabled then you should leave default value of this
# parameter. If auto assignment was not enabled, then you
# should specify value (floating IP pool name) of this
# parameter. If Sahara uses Neutron management network,
# then you should always specify value (floating IP pool
# name) of this parameter (string value)
#FLOATING_IP_POOL = <None>
# If Sahara uses Nova management network, then you
# should leave default value of this flag. If Sahara
# uses Neutron management network, then you should set
# this flag to True and specify values of the following
# parameters: FLOATING_IP_POOL and INTERNAL_NEUTRON_NETWORK
# (boolean value)
#NEUTRON_ENABLED = False
# Name for internal Neutron network (string value)
#INTERNAL_NEUTRON_NETWORK = 'private'
# Job launch timeout (in minutes); minimal value is 1
# (integer value)
#JOB_LAUNCH_TIMEOUT = 10
# Store job binary data in the sahara internal database.
# If this option is set to False, job binary data will be stored in swift
# (boolean value)
#INTERNAL_JOB_BINARIES = True
# Name for cluster (string value)
#CLUSTER_NAME = 'test'
[VANILLA]
# The id of an existing active cluster
# to use for the test instead of building one.
# Cluster teardown will be skipped. This has priority
# over EXISTING_CLUSTER_NAME (string value)
#EXISTING_CLUSTER_ID = <None>
# The name of an existing active cluster
# to use for the test instead of building one.
# Cluster teardown will be skipped. This is superseded
# by EXISTING_CLUSTER_ID (string value)
#EXISTING_CLUSTER_NAME = <None>
# ID for image which is used for cluster creation.
# You can also specify image name or tag of image instead
# of image ID. If you do not specify image related
# parameters then the image for cluster creation will be
# chosen by tag "sahara_i_tests" (string value)
#IMAGE_ID = <None>
# Name for image which is used for cluster creation.
# You can also specify image ID or tag of image instead of
# image name. If you do not specify image related
# parameters then the image for cluster creation will be
# chosen by tag "sahara_i_tests" (string value)
#IMAGE_NAME = <None>
# Tag for image which is used for cluster creation.
# You can also specify image ID or image name instead of
# the image tag. If you do not specify image related
# parameters, then the image for cluster creation will be
# chosen by the tag "sahara_i_tests" (string value)
#IMAGE_TAG = <None>
# Username used to log into a cluster node via SSH (string value)
#SSH_USERNAME = <None>
# Skip tearing down the cluster. If an existing
# cluster is used it will never be torn down by the test (boolean value)
#SKIP_CLUSTER_TEARDOWN = False
# Version of Hadoop (string value)
#HADOOP_VERSION = '1.2.1'
# Name of plugin (string value)
#PLUGIN_NAME = 'vanilla'
# If this option is set True no tests for this plugin will be run
# (boolean value)
#SKIP_ALL_TESTS_FOR_PLUGIN = False
# If this option is set True no Java EDP job will be submitted
# (boolean value)
#SKIP_JAVA_EDP_TEST = False
# If this option is set True no MapReduce EDP job will be submitted
# (boolean value)
#SKIP_MAPREDUCE_EDP_TEST = False
# If this option is set True no Streaming MapReduce EDP job
# will be submitted (boolean value)
#SKIP_MAPREDUCE_STREAMING_EDP_TEST = False
# If this option is set True no Pig EDP job will be submitted
# (boolean value)
#SKIP_PIG_EDP_TEST = False
[VANILLA2]
# The id of an existing active cluster
# to use for the test instead of building one.
# Cluster teardown will be skipped. This has priority
# over EXISTING_CLUSTER_NAME (string value)
#EXISTING_CLUSTER_ID = <None>
# The name of an existing active cluster
# to use for the test instead of building one.
# Cluster teardown will be skipped. This is superseded
# by EXISTING_CLUSTER_ID (string value)
#EXISTING_CLUSTER_NAME = <None>
# ID for image which is used for cluster creation.
# You can also specify image name or tag of image instead
# of image ID. If you do not specify image related
# parameters then the image for cluster creation will be
# chosen by tag "sahara_i_tests" (string value)
#IMAGE_ID = <None>
# Name for image which is used for cluster creation.
# You can also specify image ID or tag of image instead of
# image name. If you do not specify image related
# parameters then the image for cluster creation will be
# chosen by tag "sahara_i_tests" (string value)
#IMAGE_NAME = <None>
# Tag for image which is used for cluster creation.
# You can also specify image ID or image name instead of
# the image tag. If you do not specify image related
# parameters, then the image for cluster creation will be
# chosen by the tag "sahara_i_tests" (string value)
#IMAGE_TAG = <None>
# Username used to log into a cluster node via SSH (string value)
#SSH_USERNAME = <None>
# Skip tearing down the cluster. If an existing
# cluster is used it will never be torn down by the test (boolean value)
#SKIP_CLUSTER_TEARDOWN = False
# Version of Hadoop (string value)
#HADOOP_VERSION = '2.3.0'
# Name of plugin (string value)
#PLUGIN_NAME = 'vanilla'
# If this option is set True, no tests for this plugin will be run
# (boolean value)
#SKIP_ALL_TESTS_FOR_PLUGIN = True
# If this option is set True no Java EDP job will be submitted
# (boolean value)
#SKIP_JAVA_EDP_TEST = False
# If this option is set True no MapReduce EDP job will be submitted
# (boolean value)
#SKIP_MAPREDUCE_EDP_TEST = False
# If this option is set True no Streaming MapReduce EDP job
# will be submitted (boolean value)
#SKIP_MAPREDUCE_STREAMING_EDP_TEST = False
# If this option is set True no Pig EDP job will be submitted
# (boolean value)
#SKIP_PIG_EDP_TEST = False
[HDP]
# The id of an existing active cluster
# to use for the test instead of building one.
# Cluster teardown will be skipped. This has priority
# over EXISTING_CLUSTER_NAME (string value)
#EXISTING_CLUSTER_ID = <None>
# The name of an existing active cluster
# to use for the test instead of building one.
# Cluster teardown will be skipped. This is superseded
# by EXISTING_CLUSTER_ID (string value)
#EXISTING_CLUSTER_NAME = <None>
# ID for image which is used for cluster creation.
# You can also specify image name or tag of image instead
# of image ID. If you do not specify image related
# parameters then the image for cluster creation will be
# chosen by tag "sahara_i_tests" (string value)
#IMAGE_ID = <None>
# Name for image which is used for cluster creation.
# You can also specify image ID or tag of image instead of
# image name. If you do not specify image related
# parameters then the image for cluster creation will be
# chosen by tag "sahara_i_tests" (string value)
#IMAGE_NAME = <None>
# Tag for image which is used for cluster creation.
# You can also specify image ID or image name instead of
# the image tag. If you do not specify image related
# parameters, then the image for cluster creation will be
# chosen by the tag "sahara_i_tests" (string value)
#IMAGE_TAG = <None>
# Username used to log into a cluster node via SSH (string value)
#SSH_USERNAME = <None>
# Skip tearing down the cluster. If an existing
# cluster is used it will never be torn down by the test (boolean value)
#SKIP_CLUSTER_TEARDOWN = False
# Version of Hadoop (string value)
#HADOOP_VERSION = '1.3.2'
# Name of plugin (string value)
#PLUGIN_NAME = 'hdp'
# If this option is set True, no tests for this plugin will be run
# (boolean value)
#SKIP_ALL_TESTS_FOR_PLUGIN = True
# If this option is set True no Java EDP job will be submitted
# (boolean value)
#SKIP_JAVA_EDP_TEST = False
# If this option is set True no MapReduce EDP job will be submitted
# (boolean value)
#SKIP_MAPREDUCE_EDP_TEST = False
# If this option is set True no Streaming MapReduce EDP job
# will be submitted (boolean value)
#SKIP_MAPREDUCE_STREAMING_EDP_TEST = False
# If this option is set True no Pig EDP job will be submitted
# (boolean value)
#SKIP_PIG_EDP_TEST = False

View File

@ -1,28 +0,0 @@
# Copyright (c) 2014 Red Hat Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import testtools
import saharaclient.tests.integration.tests.clidriver as clidriver
import saharaclient.tests.integration.tests.utils as utils
class ITestBase(testtools.TestCase, utils.AssertionWrappers):
def setUp(self):
super(ITestBase, self).setUp()
# The client is for readonly operations to check results
self.util = utils.Utils()
self.cli = clidriver.CLICommands()

View File

@ -1,174 +0,0 @@
# Copyright (c) 2014 Red Hat Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import shlex
import subprocess
from saharaclient.tests.integration.configs import config as cfg
cfg = cfg.ITConfig()
LOG = logging.getLogger(__name__)
# This is modeled after the client interface in tempest cli tests.2
class CommandBase(object):
def sahara(self, action, flags='', params='', fail_ok=False):
return self.cmd_with_bypass('sahara', action, flags, params, fail_ok)
def cmd_with_bypass(self, cmd, action, flags='', params='', fail_ok=False):
if cfg.common_config['BYPASS_URL']:
bypass = '--bypass-url %s' % cfg.common_config['BYPASS_URL']
flags = bypass + ' ' + flags
return self.cmd_with_auth(cmd, action, flags, params, fail_ok)
def cmd_with_auth(self, cmd, action, flags='', params='', fail_ok=False):
"""Executes given command with auth attributes appended."""
creds = ('--os-username %s --os-tenant-name %s --os-password %s '
'--os-auth-url %s ' %
(cfg.common_config['OS_USERNAME'],
cfg.common_config['OS_TENANT_NAME'],
cfg.common_config['OS_PASSWORD'],
cfg.common_config['OS_AUTH_URL']))
flags = creds + ' ' + flags
return self.cmd(cmd, action, flags, params, fail_ok)
def cmd(self, cmd, action, flags='', params='', fail_ok=False,
merge_stderr=False):
"""Executes specified command for the given action."""
cmd = ' '.join([cmd, flags, action, params])
LOG.info("running: '%s'", cmd)
cmd_str = cmd
cmd = shlex.split(cmd)
result = ''
result_err = ''
try:
stdout = subprocess.PIPE
stderr = subprocess.STDOUT if merge_stderr else subprocess.PIPE
proc = subprocess.Popen(
cmd, stdout=stdout, stderr=stderr)
result, result_err = proc.communicate()
if not fail_ok and proc.returncode != 0:
raise CommandFailed(proc.returncode,
cmd,
result)
finally:
LOG.debug('output of %s:\n%s', cmd_str, result)
if not merge_stderr and result_err:
LOG.debug('error output of %s:\n%s', cmd_str, result_err)
return result
class CommandFailed(subprocess.CalledProcessError):
# adds output attribute for python2.6
def __init__(self, returncode, cmd, output):
super(CommandFailed, self).__init__(returncode, cmd)
self.output = output
class CLICommands(CommandBase):
def register_image(self, image_id, username='', description=''):
params = '--id %s' % image_id
if username:
params += ' --username %s' % username
if description:
params += ' --description %s' % description
return self.sahara('image-register', params=params)
def unregister_image(self, id):
params = '--id %s' % id
return self.sahara('image-unregister', params=params)
def tag_image(self, id, tag):
params = '--id %s --tag %s' % (id, tag)
return self.sahara('image-add-tag', params=params)
def node_group_template_create(self, filename):
params = '--json %s' % filename
return self.sahara('node-group-template-create', params=params)
def node_group_template_delete(self, id):
params = '--id %s' % id
return self.sahara('node-group-template-delete', params=params)
def cluster_template_create(self, filename):
params = '--json %s' % filename
return self.sahara('cluster-template-create', params=params)
def cluster_template_delete(self, id):
params = '--id %s' % id
return self.sahara('cluster-template-delete', params=params)
def cluster_create(self, filename):
params = '--json %s' % filename
return self.sahara('cluster-create', params=params)
def cluster_delete(self, id):
params = '--id %s' % id
return self.sahara('cluster-delete', params=params)
def job_binary_create(self, name, url, desc='', username='', password=''):
params = '--name %s --url %s' % (name, url)
if desc:
params += ' --description %s' % desc
if username:
params += ' --user %s' % username
if password:
params += ' --password %s' % password
return self.sahara('job-binary-create', params=params)
def job_binary_delete(self, id):
params = '--id %s' % id
return self.sahara('job-binary-delete', params=params)
def job_binary_data_create(self, fname):
params = '--file %s' % fname
return self.sahara('job-binary-data-create', params=params)
def job_binary_data_delete(self, id):
params = '--id %s' % id
return self.sahara('job-binary-data-delete', params=params)
def job_template_create(self, filename):
params = '--json %s' % (filename)
return self.sahara('job-template-create', params=params)
def job_template_delete(self, id):
params = '--id %s' % id
return self.sahara('job-template-delete', params=params)
def job_create(self, job_template_id, filename):
params = '--job-template %s --json %s' % (job_template_id, filename)
return self.sahara('job-create', params=params)
def job_delete(self, id):
params = '--id %s' % id
return self.sahara('job-delete', params=params)
def data_source_create(self, name, datatype, url,
desc='', username='', password=''):
params = '--name %s --type %s --url %s' % (name, datatype, url)
if desc:
params += ' --description %s' % desc
if username:
params += ' --user %s' % username
if password:
params += ' --password %s' % password
return self.sahara('data-source-create', params=params)
def data_source_delete(self, id):
params = '--id %s' % id
return self.sahara('data-source-delete', params=params)

View File

@ -1,332 +0,0 @@
# Copyright (c) 2014 Red Hat Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import time
import saharaclient.api.base as api_base
from saharaclient.tests.integration.configs import config as cfg
import saharaclient.tests.integration.tests.base as base
import saharaclient.tests.integration.tests.utils as ut
from neutronclient.v2_0 import client as neutron_client
import novaclient.exceptions
from novaclient.v1_1 import client as nova_client
cfg = cfg.ITConfig()
common = cfg.common_config
PROP_DESCR = '_sahara_description'
PROP_USERNAME = '_sahara_username'
PROP_TAG = '_sahara_tag_'
class ClusterTest(base.ITestBase):
def setUp(self):
super(ClusterTest, self).setUp()
self.cluster = None
self.cluster_template = None
self.image = None
self.created_key = False
self.node_group_templates = []
self.nova = nova_client.Client(
username=common.OS_USERNAME,
api_key=common.OS_PASSWORD,
project_id=common.OS_PROJECT_NAME,
auth_url=common.OS_AUTH_URL)
# Get the network ids for the managmenet network and the
# floating ip pool if we are using neutron
if common.NEUTRON_ENABLED:
self.neutron = neutron_client.Client(
username=common.OS_USERNAME,
password=common.OS_PASSWORD,
tenant_name=common.OS_TENANT_NAME,
auth_url=common.OS_AUTH_URL)
self.floating_ip_pool = self.find_network_id(
common.FLOATING_IP_POOL)
self.neutron_mgmt_net = self.find_network_id(
common.INTERNAL_NEUTRON_NETWORK)
else:
self.neutron = None
self.floating_ip_pool = common.FLOATING_IP_POOL
self.neutron_mgmt_net = None
def find_network_id(self, netname):
try:
net = self.neutron.list_networks(name=netname)
net_id = net['networks'][0]['id']
return net_id
except IndexError:
raise Exception(
'\nNetwork \'%s\' not found in network list. '
'Please make sure you specified right network name.' % netname)
def tearDown(self):
super(ClusterTest, self).tearDown()
if self.created_key:
self.nova.keypairs.delete(self.keypair)
def init_keypair(self):
# Figure out what keypair to use, and track whether
# or not we created it for this test
self.keypair = common.USER_KEYPAIR_ID
if not self.keypair:
self.keypair = 'key%s' % os.getpid()
self.created_key = False
try:
self.nova.keypairs.get(self.keypair)
except novaclient.exceptions.NotFound:
self.nova.keypairs.create(self.keypair)
self.created_key = True
def find_image_id(self, config):
basic_msg = '\nImage with %s "%s" was found in image list but it was '
'possibly not registered for Sahara. Please, make sure '
'image was correctly registered.'
long_msg = '\nNone of parameters of image (ID, name, tag)'
' was specified in configuration file of '
'integration tests. That is why there was '
'attempt to choose image by tag '
'"sahara_i_tests" and image with such tag '
'was found in image list but it was possibly '
'not registered for Sahara. Please, make '
'sure image was correctly registered.'
images = self.nova.images.list()
def try_get_image_id_and_ssh_username(image, msg):
try:
if not config.SSH_USERNAME:
return image.id, image.metadata[PROP_USERNAME]
return image.id, config.SSH_USERNAME
except KeyError as e:
print(msg)
raise(e)
# If config.IMAGE_ID is not None then find corresponding image
# and return its ID and username. If image not found then handle error
if config.IMAGE_ID:
for image in images:
if image.id == config.IMAGE_ID:
return try_get_image_id_and_ssh_username(
image,
basic_msg % ('ID', image.id))
self.fail(
'\n\nImage with ID "%s" not found in image list. '
'Please, make sure you specified right image ID.\n' %
config.IMAGE_ID)
# If config.IMAGE_NAME is not None then find corresponding image
# and return its ID and username. If image not found then handle error
if config.IMAGE_NAME:
for image in images:
if image.name == config.IMAGE_NAME:
return try_get_image_id_and_ssh_username(
image,
basic_msg % ('name', config.IMAGE_NAME))
self.fail(
'\n\nImage with name "%s" not found in image list. Please, '
'make sure you specified right image name.\n' %
config.IMAGE_NAME)
# If config.IMAGE_TAG is not None then find corresponding image
# and return its ID and username. If image not found then handle error
if config.IMAGE_TAG:
for image in images:
if (image.metadata.get(PROP_TAG + '%s'
% config.IMAGE_TAG)) and (
image.metadata.get(PROP_TAG + (
'%s' % config.PLUGIN_NAME))):
return try_get_image_id_and_ssh_username(
image,
basic_msg % ('tag', config.IMAGE_TAG))
self.fail(
'\n\nImage with tag "%s" not found in list of registered '
'images for Sahara. Please, make sure tag "%s" was added to '
'image and image was correctly registered.\n' %
(config.IMAGE_TAG, config.IMAGE_TAG))
# If config.IMAGE_ID, config.IMAGE_NAME and
# config.IMAGE_TAG are None then image is chosen
# by tag "sahara_i_tests". If image has tag "sahara_i_tests"
# (at the same time image ID, image name and image tag were not
# specified in configuration file of integration tests) then return
# its ID and username. Found image will be chosen as image for tests.
# If image with tag "sahara_i_tests" not found then handle error
for image in images:
if (image.metadata.get(PROP_TAG + 'sahara_i_tests')) and (
image.metadata.get(PROP_TAG + (
'%s' % config.PLUGIN_NAME))):
return try_get_image_id_and_ssh_username(image, long_msg)
self.fail(
'\n\nNone of parameters of image (ID, name, tag) was specified in '
'configuration file of integration tests. That is why there was '
'attempt to choose image by tag "sahara_i_tests" but image with '
'such tag not found in list of registered images for Sahara. '
'Please, make sure image was correctly registered. Please, '
'specify one of parameters of image (ID, name or tag) in '
'configuration file of integration tests.\n'
)
def build_cluster(self, config, node_group_info):
self.init_keypair()
cluster_name = "%s-%s-%s" % (common.CLUSTER_NAME,
config.PLUGIN_NAME,
config.HADOOP_VERSION.replace(".", ""))
# Create and tag an image
image_id, username = self.find_image_id(config)
self.cli.register_image(image_id, username, cluster_name)
self.image = self.util.find_image_by_id(image_id)
self.assertEqual(cluster_name, self.image.description)
for t in (config.PLUGIN_NAME, config.HADOOP_VERSION):
self.cli.tag_image(self.image.id, t)
self.image = self.util.find_image_by_id(self.image.id)
self.assertIn(t, self.image.tags)
for ng_info in node_group_info:
# Create node group templates
f = self.util.generate_json_file(ng_info["values"])
self.cli.node_group_template_create(f.name)
t = self.util.find_node_group_template_by_name(
ng_info["values"]["name"])
self.assertIsNotNone(t)
self.check_dict_elems_in_obj(ng_info["values"], t)
self.node_group_templates.append(
{
"name": t.name,
"node_group_template_id": t.id,
"count": ng_info["count"]
}
)
# Create cluster template
cluster_temp_dict = {"name": cluster_name,
"plugin_name": config.PLUGIN_NAME,
"hadoop_version": config.HADOOP_VERSION,
"node_groups": self.node_group_templates}
f = self.util.generate_json_file(cluster_temp_dict)
self.cli.cluster_template_create(f.name)
self.cluster_template = self.util.find_cluster_template_by_name(
cluster_name)
self.assertIsNotNone(self.cluster_template)
self.check_dict_elems_in_obj(cluster_temp_dict,
self.cluster_template,
exclude=['node_groups'])
for idx in range(len(self.node_group_templates)):
self.check_dict_is_subset(self.node_group_templates[idx],
self.cluster_template.node_groups[idx])
# Launch it
cluster_dict = {"name": self.cluster_template.name,
"cluster_template_id": self.cluster_template.id,
"hadoop_version": config.HADOOP_VERSION,
"default_image_id": self.image.id,
"plugin_name": config.PLUGIN_NAME,
"user_keypair_id": self.keypair,
"neutron_management_network": self.neutron_mgmt_net}
f = self.util.generate_json_file(cluster_dict)
self.cli.cluster_create(f.name)
self.cluster = self.util.find_cluster_by_name(
self.cluster_template.name)
self.assertIsNotNone(self.cluster)
self.check_dict_elems_in_obj(cluster_dict, self.cluster)
def launch_cluster_or_use_existing(self, config, ng_templates):
# If existing cluster is set, find it and set self.cluster.
skip_teardown = config.SKIP_CLUSTER_TEARDOWN
if config.EXISTING_CLUSTER_ID:
self.cluster = self.util.find_cluster_by_id(
config.EXISTING_CLUSTER_ID)
elif config.EXISTING_CLUSTER_NAME:
self.cluster = self.util.find_cluster_by_name(
config.EXISTING_CLUSTER_NAME)
if self.cluster:
# Always skip teardown if we used an existing cluster
skip_teardown = True
status = self.util.poll_cluster_state(self.cluster.id)
self.assertEqual(ut.CLUSTER_STATUS_ACTIVE, status)
else:
try:
self.build_cluster(config, ng_templates)
status = self.util.poll_cluster_state(self.cluster.id)
self.assertEqual(ut.CLUSTER_STATUS_ACTIVE, status)
except Exception as e:
if not skip_teardown:
self.teardown_via_client()
raise(e)
# A delay here seems necessary to make sure Oozie is active
time.sleep(common.DELAY_AFTER_ACTIVE * 60)
return skip_teardown
def teardown_cluster(self):
if self.cluster:
self.cli.cluster_delete(self.cluster.id)
self.assertRaises(api_base.APIException,
self.util.find_cluster_by_id,
self.cluster.id)
self.cluster = None
if self.cluster_template:
self.cli.cluster_template_delete(self.cluster_template.id)
self.assertRaises(api_base.APIException,
self.util.find_cluster_template_by_id,
self.cluster_template.id)
self.cluster_template = None
for ng in self.node_group_templates:
self.cli.node_group_template_delete(ng["node_group_template_id"])
self.assertRaises(api_base.APIException,
self.util.find_node_group_template_by_id,
ng["node_group_template_id"])
self.node_group_templates = []
if self.image:
# After we unregister the image, the description should
# be None
self.cli.unregister_image(self.image.id)
self.image = self.util.find_image_by_id(self.image.id)
self.assertIsNone(self.image.description)
def teardown_via_client(self):
# This is a last attempt to clean up, not part of the test.
# Try the cleanup and exit if something goes wrong.
try:
if self.cluster:
self.util.client.clusters.delete(self.cluster.id)
self.cluster = None
if self.cluster_template:
self.util.client.cluster_templates.delete(
self.cluster_template.id)
self.cluster_template = None
if self.worker:
self.util.client.node_group_templates.delete(self.worker.id)
self.worker = None
if self.master:
self.util.client.node_group_templates.delete(self.master.id)
self.master = None
if self.image:
self.util.client.images.unregister_image(self.image.id)
self.image = None
except Exception:
pass

View File

@ -1,326 +0,0 @@
# Copyright (c) 2014 Red Hat Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import saharaclient.api.base as api_base
from saharaclient.tests.integration.configs import config as cfg
import saharaclient.tests.integration.tests.base as base
import saharaclient.tests.integration.tests.utils as utils
cfg = cfg.ITConfig()
common = cfg.common_config
class EDPTest(base.ITestBase):
def setUp(self):
super(EDPTest, self).setUp()
self.swift_utils = utils.SwiftUtils()
self.path = 'saharaclient/tests/integration/tests/resources/'
self.job = None
self.job_template = None
self.lib_binary = None
self.main_binary = None
self.input_source = None
self.output_source = None
def tearDown(self):
super(EDPTest, self).tearDown()
self.swift_utils.delete_containers()
def _create_binary(self, marker, info, url):
# Use the marker value to distinguish the object, but
# the name must start with a letter and include a file
# extension.
binary_name = '%s-%s' % (marker, info['name'])
self.cli.job_binary_create(binary_name,
url,
username=common.OS_USERNAME,
password=common.OS_PASSWORD)
binary_obj = self.util.find_job_binary_by_name(binary_name)
self.assertIsNotNone(binary_obj)
self.assertEqual(binary_name, binary_obj.name)
self.assertEqual(url, binary_obj.url)
return binary_obj
def _create_swift_binary(self, marker, container, info):
if not info:
return None
self.swift_utils.upload(container, info['name'], info['data'])
url = 'swift://%s/%s' % (container, info['name'])
return self._create_binary(marker, info, url)
def _create_internal_binary(self, marker, info):
if not info:
return None, None
output = self.cli.job_binary_data_create(info['path'])
id = self.util.find_binary_internal_id(output)
url = 'internal-db://%s' % id
return self._create_binary(marker, info, url), id
def _create_data_source(self, name, url):
self.cli.data_source_create(name, 'swift', url,
username=common.OS_USERNAME,
password=common.OS_PASSWORD)
source = self.util.find_data_source_by_name(name)
self.assertIsNotNone(source)
return source
def _binary_info(self, fname, relative_path=""):
# Binaries need to be named, and the name must include
# the file extension since Oozie depends on it. So, we
# just store the filename for the name here.
info = {'name': fname}
if common.INTERNAL_JOB_BINARIES:
# We will use the cli to upload the file by path
info['path'] = self.path + relative_path + fname
else:
# We will upload the binary data to swift
info['data'] = open(self.path + relative_path + fname).read()
return info
def edp_common(self, job_type, lib=None, main=None, configs=None,
add_data_sources=True, pass_data_sources_as_args=False,
job_interface=None, execution_interface=None):
# Generate a new marker for this so we can keep containers separate
# and create some input data
job_interface = job_interface or []
execution_interface = execution_interface or {}
marker = "%s-%s" % (job_type.replace(".", ""), os.getpid())
container = self.swift_utils.create_container(marker)
self.swift_utils.generate_input(container, 'input')
input_url = 'swift://%s.sahara/input' % container
output_url = 'swift://%s.sahara/output' % container
# Create binaries
if common.INTERNAL_JOB_BINARIES:
(self.lib_binary,
self.lib_data_id) = self._create_internal_binary(marker, lib)
(self.main_binary,
self.main_data_id) = self._create_internal_binary(marker, main)
else:
self.lib_data_id = None
self.main_data_id = None
self.lib_binary = self._create_swift_binary(marker, container, lib)
self.main_binary = self._create_swift_binary(marker,
container, main)
# Create data sources
if add_data_sources:
self.input_source = self._create_data_source('input-%s' % marker,
input_url)
self.output_source = self._create_data_source('output-%s' % marker,
output_url)
else:
self.input_source = self.output_source = None
# Create a job template
job_template_name = marker
job_template_dict = {
"name": job_template_name,
"type": job_type,
"mains": [self.main_binary.id] if (self.main_binary and
self.main_binary.id) else [],
"libs": [self.lib_binary.id] if (self.lib_binary and
self.lib_binary.id) else [],
"interface": job_interface
}
f = self.util.generate_json_file(job_template_dict)
self.cli.job_template_create(f.name)
self.job_template = self.util.find_job_template_by_name(
job_template_name)
self.assertIsNotNone(self.job_template)
self.assertEqual(job_template_name, self.job_template.name)
self.assertEqual(job_type, self.job_template.type)
if self.lib_binary:
self.assertEqual(1, len(self.job_template.libs))
self.assertEqual(self.job_template.libs[0]['id'],
self.lib_binary.id)
if self.main_binary:
self.assertEqual(1, len(self.job_template.mains))
self.assertEqual(self.job_template.mains[0]['id'],
self.main_binary.id)
# Launch the job
if pass_data_sources_as_args:
args = [input_url, output_url]
else:
args = None
job_dict = {
"cluster_id": self.cluster.id,
"input_id": self.input_source and (
self.input_source.id or None),
"output_id": self.output_source and (
self.output_source.id or None),
"job_configs": {"configs": configs,
"args": args},
"interface": execution_interface
}
f = self.util.generate_json_file(job_dict)
self.cli.job_create(self.job_template.id, f.name)
# Find the job using the job_template_id
self.job = self.util.find_job_by_job_template_id(self.job_template.id)
self.assertIsNotNone(self.job)
self.assertEqual(self.cluster.id, self.job.cluster_id)
# poll for status
status = self.util.poll_job_execution(self.job.id)
self.assertEqual('SUCCEEDED', status)
# follow up with a deletion of the stuff we made from a util function
self.delete_job_objects()
def pig_edp(self):
self.edp_common('Pig',
lib=self._binary_info('edp-lib.jar'),
main=self._binary_info('edp-job.pig'))
def mapreduce_edp(self):
configs = {
"mapred.mapper.class": "org.apache.oozie.example.SampleMapper",
"mapred.reducer.class": "org.apache.oozie.example.SampleReducer"
}
self.edp_common('MapReduce',
lib=self._binary_info('edp-mapreduce.jar'),
configs=configs)
def mapreduce_streaming_edp(self):
configs = {
"edp.streaming.mapper": "/bin/cat",
"edp.streaming.reducer": "/usr/bin/wc"
}
self.edp_common('MapReduce.Streaming',
configs=configs)
def java_edp(self):
configs = {
'fs.swift.service.sahara.password': common.OS_PASSWORD,
'edp.java.main_class': 'org.openstack.sahara.examples.WordCount'
}
job_interface = [{
"name": "Swift Username",
"mapping_type": "configs",
"location": "fs.swift.service.sahara.username",
"value_type": "string",
"required": True
}]
execution_interface = {"Swift Username": common.OS_USERNAME}
self.edp_common('Java',
lib=self._binary_info('edp-java.jar',
relative_path='edp-java/'),
configs=configs,
add_data_sources=False,
pass_data_sources_as_args=True,
job_interface=job_interface,
execution_interface=execution_interface)
def run_edp_jobs(self, config):
try:
if not config.SKIP_JAVA_EDP_TEST:
self.java_edp()
if not config.SKIP_MAPREDUCE_EDP_TEST:
self.mapreduce_edp()
if not config.SKIP_MAPREDUCE_STREAMING_EDP_TEST:
self.mapreduce_streaming_edp()
if not config.SKIP_PIG_EDP_TEST:
self.pig_edp()
except Exception as e:
# Something went wrong, try to clean up what might be left
self.delete_job_objects_via_client()
raise(e)
def delete_job_objects(self):
if self.job:
self.cli.job_delete(self.job.id)
self.assertRaises(api_base.APIException,
self.util.find_job_by_id,
self.job.id)
self.job = None
if self.job_template:
self.cli.job_template_delete(self.job_template.id)
self.assertRaises(api_base.APIException,
self.util.find_job_template_by_id,
self.job_template.id)
self.job_template = None
if self.lib_binary:
self.cli.job_binary_delete(self.lib_binary.id)
self.assertRaises(api_base.APIException,
self.util.find_job_binary_by_id,
self.lib_binary.id)
self.lib_binary = None
if self.lib_data_id:
self.cli.job_binary_data_delete(self.lib_data_id)
self.assertRaises(api_base.APIException,
self.util.find_binary_internal_by_id,
self.lib_data_id)
self.lib_data_id = None
if self.main_binary:
self.cli.job_binary_delete(self.main_binary.id)
self.assertRaises(api_base.APIException,
self.util.find_job_binary_by_id,
self.main_binary.id)
self.main_binary = None
if self.main_data_id:
self.cli.job_binary_data_delete(self.main_data_id)
self.assertRaises(api_base.APIException,
self.util.find_binary_internal_by_id,
self.main_data_id)
self.main_data_id = None
if self.input_source:
self.cli.data_source_delete(self.input_source.id)
self.assertRaises(api_base.APIException,
self.util.find_data_source_by_id,
self.input_source.id)
self.input_source = None
if self.output_source:
self.cli.data_source_delete(self.output_source.id)
self.assertRaises(api_base.APIException,
self.util.find_data_source_by_id,
self.output_source.id)
self.output_source = None
def delete_job_objects_via_client(self):
try:
if self.job:
self.util.client.job_executions.delete(self.job.id)
self.job = None
if self.job_template:
self.util.client.jobs.delete(self.job_template.id)
self.job_template = None
if self.lib_binary:
self.util.client.job_binaries.delete(self.lib_binary.id)
self.lib_binary = None
if self.main_binary:
self.util.client.job_binaries.delete(self.main_binary.id)
self.main_binary = None
if self.input_source:
self.util.client.data_sources.delete(self.input_source.id)
self.input_source = None
if self.output_source:
self.util.client.data_sources.delete(self.output_source.id)
self.output_source = None
except Exception:
pass

View File

@ -1,39 +0,0 @@
# Copyright (c) 2014 Red Hat Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import saharaclient.tests.integration.tests.cluster as cluster
import saharaclient.tests.integration.tests.edp as edp
class FullTestDriver(edp.EDPTest, cluster.ClusterTest):
def drive_full_test(self, config, ng_templates):
# If we get an exception during cluster launch, the cluster has already
# been cleaned up and we don't have to do anything here
skip_teardown = self.launch_cluster_or_use_existing(config,
ng_templates)
try:
self.run_edp_jobs(config)
if not skip_teardown:
self.teardown_cluster()
except Exception as e:
# Oops. Teardown via CLI is part of the test,
# but something went wrong early. Try tear down via the client.
# TODO(tmckay): use excutils from openstack/common
import traceback
traceback.print_exc()
if not skip_teardown:
self.teardown_via_client()
raise(e)

View File

@ -1,4 +0,0 @@
Compiled against Hadoop 1.2.1
$ mkdir wordcount_classes
$ javac -classpath /usr/share/hadoop/hadoop-core-1.2.1.jar:/usr/share/hadoop/lib/commons-cli-1.2.jar -d wordcount_classes WordCount.java
$ jar -cvf edp-java.jar -C wordcount_classes/ .

View File

@ -1,95 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.openstack.sahara.examples;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class WordCount {
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: wordcount <in> <out>");
System.exit(2);
}
// ---- Begin modifications for EDP ----
// This will add properties from the <configuration> tag specified
// in the Oozie workflow. For java actions, Oozie writes the
// configuration values to a file pointed to by ooze.action.conf.xml
conf.addResource(new Path("file:///",
System.getProperty("oozie.action.conf.xml")));
// ---- End modifications for EDP ----
Job job = new Job(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

View File

@ -1,3 +0,0 @@
A = load '$INPUT' using PigStorage(':') as (fruit: chararray);
B = foreach A generate com.hadoopbook.pig.Trim(fruit);
store B into '$OUTPUT' USING PigStorage();

View File

@ -1,63 +0,0 @@
# Copyright (c) 2014 Red Hat Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import testtools
from saharaclient.tests.integration.configs import config as cfg
import saharaclient.tests.integration.tests.full_test_driver as driver
cfg = cfg.ITConfig()
hdp = cfg.hdp_config
class ClusterHDP(driver.FullTestDriver):
@testtools.skipIf(hdp.SKIP_ALL_TESTS_FOR_PLUGIN,
'All tests for Hdp plugin were skipped')
def test_cluster_hdp(self):
marker = str(os.getpid())
ng_templates = [
{
"values": {
"name": "w-%s" % marker,
"flavor_id": "2",
"plugin_name": hdp.PLUGIN_NAME,
"hadoop_version": hdp.HADOOP_VERSION,
"node_processes": ['TASKTRACKER', 'DATANODE',
'HDFS_CLIENT',
'MAPREDUCE_CLIENT',
'OOZIE_CLIENT', 'PIG'],
"floating_ip_pool": self.floating_ip_pool
},
"count": 1
},
{
"values": {
"name": "m-%s" % marker,
"flavor_id": "2",
"plugin_name": hdp.PLUGIN_NAME,
"hadoop_version": hdp.HADOOP_VERSION,
"node_processes": [
'JOBTRACKER', 'NAMENODE', 'SECONDARY_NAMENODE',
'GANGLIA_SERVER', 'NAGIOS_SERVER',
'AMBARI_SERVER', 'OOZIE_SERVER'],
"floating_ip_pool": self.floating_ip_pool
},
"count": 1
}
]
self.drive_full_test(hdp, ng_templates)

View File

@ -1,58 +0,0 @@
# Copyright (c) 2014 Red Hat Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import testtools
from saharaclient.tests.integration.configs import config as cfg
import saharaclient.tests.integration.tests.full_test_driver as driver
cfg = cfg.ITConfig()
vanilla = cfg.vanilla_config
class ClusterVanilla(driver.FullTestDriver):
@testtools.skipIf(vanilla.SKIP_ALL_TESTS_FOR_PLUGIN,
'All tests for Vanilla plugin were skipped')
def test_cluster_vanilla(self):
marker = str(os.getpid())
ng_templates = [
{
"values": {
"name": "w-%s" % marker,
"flavor_id": "2",
"plugin_name": vanilla.PLUGIN_NAME,
"hadoop_version": vanilla.HADOOP_VERSION,
"node_processes": ["tasktracker", "datanode"],
"floating_ip_pool": self.floating_ip_pool
},
"count": 1
},
{
"values": {
"name": "m-%s" % marker,
"flavor_id": "2",
"plugin_name": vanilla.PLUGIN_NAME,
"hadoop_version": vanilla.HADOOP_VERSION,
"node_processes": ["jobtracker", "namenode",
"oozie"],
"floating_ip_pool": self.floating_ip_pool
},
"count": 1
}
]
self.drive_full_test(vanilla, ng_templates)

View File

@ -1,68 +0,0 @@
# Copyright (c) 2014 Red Hat Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import testtools
from saharaclient.tests.integration.configs import config as cfg
import saharaclient.tests.integration.tests.full_test_driver as driver
cfg = cfg.ITConfig()
vanilla = cfg.vanilla2_config
class ClusterVanilla2(driver.FullTestDriver):
@testtools.skipIf(vanilla.SKIP_ALL_TESTS_FOR_PLUGIN,
'All tests for Vanilla2 plugin were skipped')
def test_cluster_vanilla(self):
marker = str(os.getpid())
ng_templates = [
{
"values": {
"name": "m-nr-%s" % marker,
"flavor_id": "2",
"plugin_name": vanilla.PLUGIN_NAME,
"hadoop_version": vanilla.HADOOP_VERSION,
"node_processes": ["namenode", "resourcemanager"],
"floating_ip_pool": self.floating_ip_pool
},
"count": 1
},
{
"values": {
"name": "m-oh-%s" % marker,
"flavor_id": "2",
"plugin_name": vanilla.PLUGIN_NAME,
"hadoop_version": vanilla.HADOOP_VERSION,
"node_processes": ["oozie", "historyserver"],
"floating_ip_pool": self.floating_ip_pool
},
"count": 1,
},
{
"values": {
"name": "w-%s" % marker,
"flavor_id": "2",
"plugin_name": vanilla.PLUGIN_NAME,
"hadoop_version": vanilla.HADOOP_VERSION,
"node_processes": ["nodemanager", "datanode"],
"floating_ip_pool": self.floating_ip_pool
},
"count": 2
}
]
self.drive_full_test(vanilla, ng_templates)

View File

@ -1,193 +0,0 @@
# Copyright (c) 2014 Red Hat Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import random
import re
import string
import tempfile
import time
import six
import saharaclient.api.client as client
from saharaclient.tests.integration.configs import config as cfg
from swiftclient import client as swift_client
cfg = cfg.ITConfig()
common = cfg.common_config
# cluster status
CLUSTER_STATUS_ACTIVE = "Active"
CLUSTER_STATUS_ERROR = "Error"
class Utils(object):
def __init__(self):
self.client = client.Client(username=common['OS_USERNAME'],
api_key=common['OS_PASSWORD'],
auth_url=common['OS_AUTH_URL'],
project_name=common['OS_PROJECT_NAME'])
def find_image_by_id(self, id):
return self.client.images.get(id)
def find_cluster_by_id(self, id):
return self.client.clusters.get(id)
def find_node_group_template_by_id(self, id):
return self.client.node_group_templates.get(id)
def find_cluster_template_by_id(self, id):
return self.client.cluster_templates.get(id)
def find_object_by_field(self, val, obj_list, field="name"):
for obj in obj_list:
if getattr(obj, field) == val:
return obj
def find_node_group_template_by_name(self, name):
return self.find_object_by_field(
name,
self.client.node_group_templates.list())
def find_cluster_template_by_name(self, name):
return self.find_object_by_field(name,
self.client.cluster_templates.list())
def find_cluster_by_name(self, name):
return self.find_object_by_field(name,
self.client.clusters.list())
def find_job_by_id(self, id):
return self.client.job_executions.get(id)
def find_job_binary_by_id(self, id):
return self.client.job_binaries.get(id)
def find_job_template_by_id(self, id):
return self.client.jobs.get(id)
def find_data_source_by_id(self, id):
return self.client.data_sources.get(id)
def find_binary_internal_by_id(self, id):
return self.client.job_binary_internals.get(id)
def find_job_binary_by_name(self, name):
return self.find_object_by_field(name,
self.client.job_binaries.list())
def find_job_template_by_name(self, name):
return self.find_object_by_field(name,
self.client.jobs.list())
def find_data_source_by_name(self, name):
return self.find_object_by_field(name,
self.client.data_sources.list())
def find_job_by_job_template_id(self, id):
return self.find_object_by_field(id,
self.client.job_executions.list(),
"job_id")
def find_binary_internal_id(self, output):
pattern = '\|\s*%s\s*\|\s*%s' # match '| id | name'
internals = [(i.id,
i.name) for i in self.client.job_binary_internals.list()]
for i in internals:
prog = re.compile(pattern % i)
if prog.search(output):
return i[0]
def generate_json_file(self, temp):
f = tempfile.NamedTemporaryFile(delete=True)
f.write(json.dumps(temp))
f.flush()
return f
def poll_cluster_state(self, id):
cluster = self.client.clusters.get(id)
# TODO(tmckay): this should use timeutils but we need
# to add it to openstack/common
timeout = common['CLUSTER_CREATION_TIMEOUT'] * 60
while str(cluster.status) != CLUSTER_STATUS_ACTIVE:
if str(cluster.status) == CLUSTER_STATUS_ERROR or timeout <= 0:
break
time.sleep(10)
timeout -= 10
cluster = self.client.clusters.get(id)
return str(cluster.status)
def poll_job_execution(self, id):
# TODO(tmckay): this should use timeutils but we need
# to add it to openstack/common
timeout = common['JOB_LAUNCH_TIMEOUT'] * 60
status = self.client.job_executions.get(id).info['status']
while status != 'SUCCEEDED':
if status == 'KILLED' or timeout <= 0:
break
time.sleep(10)
timeout -= 10
status = self.client.job_executions.get(id).info['status']
return status
class SwiftUtils(object):
def __init__(self):
self.containers = []
self.client = swift_client.Connection(
authurl=common['OS_AUTH_URL'],
user=common['OS_USERNAME'],
key=common['OS_PASSWORD'],
tenant_name=common['OS_TENANT_NAME'],
auth_version=common['SWIFT_AUTH_VERSION'])
def create_container(self, marker):
container_name = 'cli-test-%s' % marker
self.client.put_container(container_name)
self.containers.append(container_name)
return container_name
def generate_input(self, container_name, input_name):
self.client.put_object(
container_name, input_name, ''.join(
random.choice(':' + ' ' + '\n' + string.ascii_lowercase)
for x in range(10000)
)
)
def upload(self, container_name, obj_name, data):
self.client.put_object(container_name, obj_name, data)
def delete_containers(self):
for container in self.containers:
objects = [obj['name'] for obj in (
self.client.get_container(container)[1])]
for obj in objects:
self.client.delete_object(container, obj)
self.client.delete_container(container)
class AssertionWrappers(object):
def check_dict_elems_in_obj(self, d, obj, exclude=[]):
for key, val in six.iteritems(d):
if key not in exclude:
self.assertEqual(val, getattr(obj, key))
def check_dict_is_subset(self, dict1, dict2):
# There is an assert for this in Python 2.7 but not 2.6
self.assertTrue(all(
k in dict2 and dict2[k] == v for k, v in six.iteritems(dict1)))

View File

@ -7,13 +7,9 @@ hacking<0.11,>=0.10.0
coverage>=3.6 # Apache-2.0
discover # BSD
mock>=1.2 # BSD
oslo.config>=3.7.0 # Apache-2.0
oslosphinx!=3.4.0,>=2.5.0 # Apache-2.0
oslotest>=1.10.0 # Apache-2.0
os-testr>=0.4.1 # Apache-2.0
python-neutronclient!=4.1.0,>=2.6.0 # Apache-2.0
python-novaclient!=2.33.0,>=2.29.0 # Apache-2.0
python-swiftclient>=2.2.0 # Apache-2.0
reno>=0.1.1 # Apache2
requests-mock>=0.7.0 # Apache-2.0
sphinx!=1.2.0,!=1.3b1,<1.3,>=1.1.2 # BSD

View File

@ -18,12 +18,6 @@ commands = find . -type f -name "*.pyc" -delete
whitelist_externals = find
passenv = http_proxy HTTP_PROXY https_proxy HTTPS_PROXY no_proxy NO_PROXY
[testenv:integration]
setenv =
VIRTUAL_ENV={envdir}
DISCOVER_DIRECTORY=saharaclient/tests/integration/
commands = ostestr {posargs}
[testenv:debug]
commands = oslo_debug_helper -t saharaclient/tests/unit {posargs}