Add testcase 'Run EDP jobs with many types'

This testcase checks creating and launching many EDP job with all
possible types, such as Hive, Spark, Pig, etc

Change-Id: I503e525e48a98e0d14af145256d8e65c2c1582b4
This commit is contained in:
Georgy Dyuldin 2016-02-16 10:55:29 +03:00 committed by Vitaly Gridnev
parent 3709705463
commit 900266a320
4 changed files with 109 additions and 34 deletions

View File

@ -38,7 +38,7 @@ http_image=http://download.cirros-cloud.net/0.3.1/cirros-0.3.1-x86_64-uec.tar.gz
[sahara]
# http accessible image (string value)
fake_http_image=http://uec-images.ubuntu.com/trusty/current/trusty-server-cloudimg-amd64-disk1.img
fake_http_image=https://cloud-images.ubuntu.com/trusty/current/trusty-server-cloudimg-amd64-disk1.img
# ssh username for image (string value)
fake_image_ssh_user=ubuntu
# Floating IP pool name (string value)

View File

@ -28,11 +28,16 @@ class JobsPage(mixins.DeleteMixin, basepage.BaseDataProcessingPage):
raise Exception('Job {} status is {}'.format(name, status))
return status == "Succeeded"
def delete(self, name):
row = self._get_row_by_template_name(name)
row.mark()
def delete_many(self, names=()):
for name in names:
row = self._get_row_by_template_name(name)
if row is not None:
row.mark()
self.table.get_delete_form().submit()
def delete(self, name):
self.delete_many([name])
def wait_until_job_succeeded(self, name, timeout=None):
self._wait_until(lambda x: self.is_job_succeeded(name),
timeout=timeout)

View File

@ -12,6 +12,7 @@
from openstack_dashboard.test.integration_tests.regions import forms
from openstack_dashboard.test.integration_tests.regions import tables
from selenium.webdriver.common import by
from sahara_dashboard.test.integration_tests.pages import basepage
from sahara_dashboard.test.integration_tests.pages import mixins
@ -20,11 +21,13 @@ from sahara_dashboard.test.integration_tests.pages import mixins
class CreateMixin(object):
CREATE_FIELD_MAPPING = (
('job_name', 'job_type', 'main_binary', 'job_description'),
('lib_binaries',),
)
LAUNCH_ON_EXIST_CLUSTER_FIELD_MAPPING = (
('job_input', 'job_output', 'cluster'),
('adapt_spark_swift', 'datasource_substitute'),
('adapt_spark_swift', 'datasource_substitute', 'streaming_mapper',
'streaming_reducer'),
(),
)
@ -50,23 +53,31 @@ class JobtemplatesPage(mixins.DeleteMixin, basepage.BaseDataProcessingPage):
def get_table_mixins(cls):
return super(JobtemplatesPage, cls).get_table_mixins() + (CreateMixin,)
def create(self, name, job_type, binary_name):
def create(self, name, job_type, binary_name, libs=()):
form = self.table.get_create_form()
form.job_name.text = name
form.job_type.text = job_type
form.main_binary.text = binary_name
if binary_name is not None:
form.main_binary.text = binary_name
form.switch_to(1)
for lib in libs:
form.lib_binaries.text = lib
form.src_elem.find_element_by_id("add_lib_button").click()
form.submit()
def launch_on_exists(self, job_name, input_name, output_name,
cluster_name, adapt_swift=True,
datasource_substitution=True, configuration=None,
parameters=None, arguments=()):
parameters=None, arguments=(),
mapper=None, reducer=None):
configuration = configuration or {}
parameters = parameters or {}
row = self._get_row_with_name(job_name)
form = self.table.get_launch_on_exists_form(row)
form.job_input.text = input_name
form.job_output.text = output_name
if input_name is not None:
form.job_input.text = input_name
if output_name is not None:
form.job_output.text = output_name
form.cluster.text = cluster_name
form.switch_to(1)
@ -79,29 +90,40 @@ class JobtemplatesPage(mixins.DeleteMixin, basepage.BaseDataProcessingPage):
else:
form.datasource_substitute.unmark()
config_block = form.src_elem.find_element_by_id('configs')
add_btn = config_block.find_element_by_link_text('Add')
for key, value in configuration.items():
add_btn.click()
inputs = config_block.find_elements_by_css_selector(
'input[type=text]')[-2:]
inputs[0].send_keys(key)
inputs[1].send_keys(value)
if mapper is not None:
form.streaming_mapper.text = mapper
if reducer is not None:
form.streaming_reducer.text = reducer
config_block = form.src_elem.find_element_by_id('params')
add_btn = config_block.find_element_by_link_text('Add')
for key, value in parameters.items():
add_btn.click()
inputs = config_block.find_elements_by_css_selector(
'input[type=text]')[-2:]
inputs[0].send_keys(key)
inputs[1].send_keys(value)
locator = (by.By.ID, 'configs')
if form._is_element_visible(*locator):
config_block = form.src_elem.find_element(*locator)
add_btn = config_block.find_element_by_link_text('Add')
for key, value in configuration.items():
add_btn.click()
inputs = config_block.find_elements_by_css_selector(
'input[type=text]')[-2:]
inputs[0].send_keys(key)
inputs[1].send_keys(value)
config_block = form.src_elem.find_element_by_id('args_array')
add_btn = config_block.find_element_by_link_text('Add')
for value in arguments:
add_btn.click()
input_el = config_block.find_elements_by_css_selector(
'input[type=text]')[-1]
input_el.send_keys(value)
locator = (by.By.ID, 'params')
if form._is_element_visible(*locator):
params_block = form.src_elem.find_element(*locator)
add_btn = params_block.find_element_by_link_text('Add')
for key, value in parameters.items():
add_btn.click()
inputs = params_block.find_elements_by_css_selector(
'input[type=text]')[-2:]
inputs[0].send_keys(key)
inputs[1].send_keys(value)
locator = (by.By.ID, 'args_array')
if form._is_element_visible(*locator):
args_block = form.src_elem.find_element(*locator)
add_btn = args_block.find_element_by_link_text('Add')
for value in arguments:
add_btn.click()
input_el = args_block.find_elements_by_css_selector(
'input[type=text]')[-1]
input_el.send_keys(value)
form.submit()

View File

@ -55,7 +55,9 @@ class TestCRUDBase(SaharaTestCase):
image_pg = self.home_pg.go_to_compute_imagespage()
image_pg.create_image(self.image_name,
location=self.CONFIG.sahara.fake_http_image)
image_pg.wait_until_image_active(self.image_name)
image_pg._wait_until(
lambda x: image_pg.is_image_active(self.image_name),
timeout=10 * 60)
def delete_image(self):
image_pg = self.home_pg.go_to_compute_imagespage()
@ -254,6 +256,51 @@ class TestCRUD(TestCRUDBase):
self.create_image()
self.register_image()
def run_many_edp_jobs(self):
jobtemplates_pg = (
self.home_pg.go_to_dataprocessing_jobs_jobtemplatespage())
job_names = {}
for job_type in ("Hive", "Java", "MapReduce", "Streaming MapReduce",
"Pig", "Shell", "Spark", "Storm"):
job_name = "{0}-{1}".format(self.jobtemplate_name, job_type)
job_name = job_name.replace(' ', '-')
job_names[job_type] = job_name
binary_name = self.job_binary_name
libs = []
if job_type in ("Java", "MapReduce", "Streaming MapReduce"):
binary_name = None
libs.append(self.job_binary_name)
jobtemplates_pg.create(name=job_name, job_type=job_type,
binary_name=binary_name, libs=libs)
for job_type, job_name in job_names.items():
jobtemplates_pg = (
self.home_pg.go_to_dataprocessing_jobs_jobtemplatespage())
input_name = self.ds_input_name
output_name = self.ds_output_name
mapper = None
reducer = None
if job_type in ("Java", "Shell", "Storm", "Spark"):
input_name = None
output_name = None
if job_type == "Streaming MapReduce":
mapper = "mapper"
reducer = "reducer"
jobtemplates_pg.launch_on_exists(job_name=job_name,
input_name=input_name,
output_name=output_name,
cluster_name=self.cluster_name,
mapper=mapper,
reducer=reducer)
jobs_pg = self.home_pg.go_to_dataprocessing_jobs_jobspage()
jobs_pg.wait_until_job_succeeded(job_name)
jobs_pg.delete_many(job_names.values())
jobtemplates_pg = (
self.home_pg.go_to_dataprocessing_jobs_jobtemplatespage())
jobtemplates_pg.delete_many(job_names.values())
def test_cluster_operate(self):
self.create_cluster()
self.create_datasources()
@ -263,6 +310,7 @@ class TestCRUD(TestCRUDBase):
self.delete_job()
self.run_edp_job_with_parameters()
self.delete_job()
self.run_many_edp_jobs()
self.delete_job_template()
self.delete_job_binary()
self.delete_datasources()