Add testcases 'Update EDP resources'

Testcases check updating data sources, job binaries, passing extra
parameters to job templates on launching it.

Change-Id: Id443d6a716309df0add9235634c27c2e12876b8e
This commit is contained in:
Georgy Dyuldin 2016-01-26 17:53:02 +03:00
parent eb627941d0
commit 10476765e7
6 changed files with 438 additions and 115 deletions

View File

@ -10,11 +10,11 @@
# License for the specific language governing permissions and limitations
# under the License.
from openstack_dashboard.test.integration_tests.regions import forms
from openstack_dashboard.test.integration_tests.regions import tables
from sahara_dashboard.test.integration_tests.pages import basepage
from sahara_dashboard.test.integration_tests.pages import mixins
from sahara_dashboard.test.integration_tests.regions import forms
CREATE_FIELD_MAPPING = (
@ -51,6 +51,36 @@ class NodegrouptemplatesPage(mixins.PluginSelectMixin, mixins.DeleteMixin,
return forms.TabbedFormRegion(
self.driver, self.conf, field_mappings=CREATE_FIELD_MAPPING)
def _set_checkbox_group(self, form, group_name, values=()):
for el in form.src_elem.find_elements_by_xpath(
'.//input[@name="{}"]'.format(group_name)
):
elem_id = el.get_attribute('id')
label_text = form.src_elem.find_element_by_css_selector(
'label[for={}]'.format(elem_id)).text
if (label_text in values) != el.is_selected():
el.click()
def _fill_form(self, form, **kwargs):
for tab_num, fields in enumerate(CREATE_FIELD_MAPPING):
form.switch_to(tab_num)
for key in fields:
value = kwargs.get(key)
if value is None:
continue
if isinstance(value, (list, tuple)):
self._set_checkbox_group(form, key, value)
continue
field = getattr(form, key, None)
if (field.src_elem.get_attribute('type') in
('checkbox', 'radio')):
if value:
field.mark()
else:
field.unmark()
else:
field.text = value
def create(self, plugin_name, plugin_version, nodegroup_name, flavor,
floating_ip_pool=None, availability_zone='nova',
proxygateway=False, processes=(), **kwargs):
@ -64,13 +94,13 @@ class NodegrouptemplatesPage(mixins.PluginSelectMixin, mixins.DeleteMixin,
'proxygateway': proxygateway,
'processes': processes,
})
form.set_values(**kwargs)
self._fill_form(form, **kwargs)
form.submit()
def update(self, group_name, **kwargs):
row = self._get_row_with_name(group_name)
form = self.table.get_edit_form(row)
form.set_values(**kwargs)
self._fill_form(form, **kwargs)
form.submit()
def get_details(self, name):

View File

@ -18,14 +18,14 @@ from sahara_dashboard.test.integration_tests.pages import mixins
class CreateMixin(object):
CREATE_FIELD_MAPPING = {
"data_source_name": "data_source_name",
"data_source_type": "data_source_type",
"data_source_url": "data_source_url",
"data_source_credential_user": "data_source_credential_user",
"data_source_credential_pass": "data_source_credential_pass",
"data_source_description": "data_source_description"
}
CREATE_FIELD_MAPPING = (
"data_source_name",
"data_source_type",
"data_source_url",
"data_source_credential_user",
"data_source_credential_pass",
"data_source_description"
)
@tables.bind_table_action('create data source')
def get_create_form(self, button):
@ -33,6 +33,13 @@ class CreateMixin(object):
return forms.FormRegion(self.driver, self.conf,
field_mappings=self.CREATE_FIELD_MAPPING)
@tables.bind_row_action('edit data source')
def get_update_form(self, button, row):
button.click()
return forms.TabbedFormRegion(
self.driver, self.conf, field_mappings=(self.CREATE_FIELD_MAPPING,)
)
class DatasourcesPage(mixins.DeleteMixin, basepage.BaseDataProcessingPage):
@ -48,3 +55,24 @@ class DatasourcesPage(mixins.DeleteMixin, basepage.BaseDataProcessingPage):
form.data_source_type.text = source_type
form.data_source_url.text = url
form.submit()
def update(self, name, **kwargs):
row = self._get_row_with_name(name)
form = self.table.get_update_form(row)
for key in CreateMixin.CREATE_FIELD_MAPPING:
if key in kwargs:
getattr(form, key).text = kwargs[key]
form.submit()
def get_details(self, name):
details = {}
self.table.src_elem.find_element_by_link_text(name).click()
items = self.driver.find_elements_by_css_selector('div.detail dt')
for item in items:
key = item.text
value_elem = item.find_element_by_xpath('./following-sibling::*')
if value_elem.tag_name != "dd":
continue
value = value_elem.text
details[key] = value
return details

View File

@ -12,36 +12,44 @@
from openstack_dashboard.test.integration_tests.pages import basepage
from openstack_dashboard.test.integration_tests.regions import forms
from openstack_dashboard.test.integration_tests.regions import messages
from openstack_dashboard.test.integration_tests.regions import tables
class JobBinariesTable(tables.TableRegion):
name = 'job_binaries'
CREATE_BINARY_FORM_FIELDS = {
"name": "job_binary_name",
"type": "job_binary_type",
"url": "job_binary_url",
"internal": "job_binary_internal",
"file": "job_binary_file",
"script_name": "job_binary_script_name",
"script": "job_binary_script",
"username": "job_binary_username",
"password": "job_binary_password",
"description": "job_binary_description"
}
CREATE_BINARY_FORM_FIELDS = (
"job_binary_name",
"job_binary_type",
"job_binary_url",
"job_binary_username",
"job_binary_password",
"job_binary_internal",
"job_binary_file",
"job_binary_script_name",
"job_binary_script",
"job_binary_description"
)
@tables.bind_table_action('create')
def create_job(self, create_button):
create_button.click()
def get_create_form(self, button):
button.click()
return forms.FormRegion(
self.driver, self.conf,
field_mappings=self.CREATE_BINARY_FORM_FIELDS)
@tables.bind_table_action('delete')
def delete_job(self, delete_button):
delete_button.click()
def get_delete_form(self, button):
button.click()
return forms.BaseFormRegion(self.driver, self.conf)
@tables.bind_row_action('edit_job_binary')
def get_update_form(self, button, row):
button.click()
return forms.FormRegion(
self.driver, self.conf,
field_mappings=self.CREATE_BINARY_FORM_FIELDS)
class JobbinariesPage(basepage.BaseNavigationPage):
@ -51,40 +59,67 @@ class JobbinariesPage(basepage.BaseNavigationPage):
super(JobbinariesPage, self).__init__(driver, conf)
self._page_title = "Data Processing"
def _get_row_with_job_binary_name(self, name):
return self.job_binaries_table.get_row(
def _get_row_with_name(self, name):
return self.table.get_row(
self.JOB_BINARIES_TABLE_NAME_COLUMN, name)
@property
def job_binaries_table(self):
def table(self):
return JobBinariesTable(self.driver, self.conf)
def delete_job_binary(self, name):
row = self._get_row_with_job_binary_name(name)
row = self._get_row_with_name(name)
row.mark()
confirm_delete_form = self.job_binaries_table.delete_job()
confirm_delete_form = self.table.get_delete_form()
confirm_delete_form.submit()
def create_job_binary(self, binary_name, script_name):
create_job_binary_form = self.job_binaries_table.create_job()
form = self.table.get_create_form()
create_job_binary_form.name.text = binary_name
create_job_binary_form.type.text = "Internal database"
create_job_binary_form.internal.text = "*Create a script"
create_job_binary_form.script_name.text = script_name
create_job_binary_form.script.text = "test_script_text"
create_job_binary_form.description.text = "test description"
create_job_binary_form.submit()
def create_job_binary_from_file(self, binary_name, path):
form = self.job_binaries_table.create_job()
form.name.text = binary_name
form.type.text = "Internal database"
form.internal.text = "*Upload a new file"
form.file.src_elem.send_keys(path)
form.description.text = "test description"
form.job_binary_name.text = binary_name
form.job_binary_type.text = "Internal database"
form.job_binary_internal.text = "*Create a script"
form.job_binary_script_name.text = script_name
form.job_binary_script.text = "test_script_text"
form.job_binary_description.text = "test description"
form.submit()
def create_job_binary_from_file(self, binary_name, path):
form = self.table.get_create_form()
form.job_binary_name.text = binary_name
form.job_binary_type.text = "Internal database"
form.job_binary_internal.text = "*Upload a new file"
form.job_binary_file.src_elem.send_keys(path)
form.job_binary_description.text = "test description"
form.submit()
def update_job_binary(self, name, **kwargs):
row = self._get_row_with_name(name)
form = self.table.get_update_form(row)
for key in JobBinariesTable.CREATE_BINARY_FORM_FIELDS:
if key in kwargs:
getattr(form, key).text = kwargs[key]
form.submit()
def get_details(self, name):
details = {}
self.table.src_elem.find_element_by_link_text(name).click()
items = self.driver.find_elements_by_css_selector('div.detail dt')
for item in items:
key = item.text
value_elem = item.find_element_by_xpath('./following-sibling::*')
if value_elem.tag_name != "dd":
continue
value = value_elem.text
details[key] = value
return details
def is_job_binary_present(self, name):
return bool(self._get_row_with_job_binary_name(name))
return bool(self._get_row_with_name(name))
def has_success_message(self):
return self.find_message_and_dismiss(messages.SUCCESS)
def has_error_message(self):
return self.find_message_and_dismiss(messages.ERROR)

View File

@ -36,3 +36,31 @@ class JobsPage(mixins.DeleteMixin, basepage.BaseDataProcessingPage):
def wait_until_job_succeeded(self, name, timeout=None):
self._wait_until(lambda x: self.is_job_succeeded(name),
timeout=timeout)
def get_details(self, template_name):
row = self._get_row_by_template_name(template_name)
row.cells['name'].find_element_by_tag_name('a').click()
details = {}
self.driver.implicitly_wait(0)
items = self.driver.find_elements_by_css_selector('div.detail dt')
for item in items:
key = item.text
value_elem = item.find_element_by_xpath('./following-sibling::*')
if value_elem.tag_name != "dd":
continue
sub_elements = value_elem.find_elements_by_css_selector(
'li > span')
if sub_elements:
value = {}
for sub_element in sub_elements:
sub_key = sub_element.text.strip(':')
value[sub_key] = set()
for elem in sub_element.find_elements_by_xpath(
'./following-sibling::ul/li'
):
value[sub_key].add(elem.text)
else:
value = value_elem.text
details[key] = value
self.driver.implicitly_wait(30)
return details

View File

@ -24,6 +24,8 @@ class CreateMixin(object):
LAUNCH_ON_EXIST_CLUSTER_FIELD_MAPPING = (
('job_input', 'job_output', 'cluster'),
('adapt_spark_swift', 'datasource_substitute'),
(),
)
@tables.bind_table_action('create job')
@ -56,10 +58,50 @@ class JobtemplatesPage(mixins.DeleteMixin, basepage.BaseDataProcessingPage):
form.submit()
def launch_on_exists(self, job_name, input_name, output_name,
cluster_name):
cluster_name, adapt_swift=True,
datasource_substitution=True, configuration=None,
parameters=None, arguments=()):
configuration = configuration or {}
parameters = parameters or {}
row = self._get_row_with_name(job_name)
form = self.table.get_launch_on_exists_form(row)
form.job_input.text = input_name
form.job_output.text = output_name
form.cluster.text = cluster_name
form.switch_to(1)
if adapt_swift:
form.adapt_spark_swift.mark()
else:
form.adapt_spark_swift.unmark()
if datasource_substitution:
form.datasource_substitute.mark()
else:
form.datasource_substitute.unmark()
config_block = form.src_elem.find_element_by_id('configs')
add_btn = config_block.find_element_by_link_text('Add')
for key, value in configuration.items():
add_btn.click()
inputs = config_block.find_elements_by_css_selector(
'input[type=text]')[-2:]
inputs[0].send_keys(key)
inputs[1].send_keys(value)
config_block = form.src_elem.find_element_by_id('params')
add_btn = config_block.find_element_by_link_text('Add')
for key, value in parameters.items():
add_btn.click()
inputs = config_block.find_elements_by_css_selector(
'input[type=text]')[-2:]
inputs[0].send_keys(key)
inputs[1].send_keys(value)
config_block = form.src_elem.find_element_by_id('args_array')
add_btn = config_block.find_element_by_link_text('Add')
for value in arguments:
add_btn.click()
input_el = config_block.find_elements_by_css_selector(
'input[type=text]')[-1]
input_el.send_keys(value)
form.submit()

View File

@ -19,18 +19,24 @@ PLUGIN_VERSION = '0.1'
IMAGE_TAGS = ('fake', PLUGIN_VERSION)
class TestCRUD(SaharaTestCase):
class TestCRUDBase(SaharaTestCase):
def setUp(self):
super(TestCRUD, self).setUp()
super(TestCRUDBase, self).setUp()
self.flavor_name = self.gen_name('flavor')
self.image_name = self.gen_name("image")
self.master_name = self.gen_name("master")
self.worker_name = self.gen_name("worker")
self.template_name = self.gen_name("template")
self.cluster_name = self.gen_name("cluster")
self.ds_input_name = self.gen_name('input')
self.ds_output_name = self.gen_name('output')
self.job_binary_name = self.gen_name('function')
self.jobtemplate_name = self.gen_name('test-job')
def create_flavor(self):
flavors_page = self.home_pg.go_to_system_flavorspage()
flavors_page.create_flavor(
name=self.flavor_name,
vcpus=self.CONFIG.sahara.flavor_vcpus,
@ -38,12 +44,24 @@ class TestCRUD(SaharaTestCase):
root_disk=self.CONFIG.sahara.flavor_root_disk,
ephemeral_disk=self.CONFIG.sahara.flavor_ephemeral_disk,
swap_disk=self.CONFIG.sahara.flavor_swap_disk)
self.assertTrue(flavors_page.is_flavor_present(self.flavor_name))
def delete_flavor(self):
flavors_page = self.home_pg.go_to_system_flavorspage()
flavors_page.delete_flavor(self.flavor_name)
self.assertFalse(flavors_page.is_flavor_present(self.flavor_name))
def create_image(self):
image_pg = self.home_pg.go_to_compute_imagespage()
image_pg.create_image(self.image_name,
location=self.CONFIG.sahara.fake_http_image)
image_pg.wait_until_image_active(self.image_name)
def delete_image(self):
image_pg = self.home_pg.go_to_compute_imagespage()
image_pg.delete_image(self.image_name)
def register_image(self):
image_reg_pg = (
self.home_pg.go_to_dataprocessing_clusters_imageregistrypage())
image_ssh_user = self.CONFIG.sahara.fake_image_ssh_user
@ -51,6 +69,11 @@ class TestCRUD(SaharaTestCase):
"Test description", tags=IMAGE_TAGS)
image_reg_pg.wait_until_image_registered(self.image_name)
def unregister_image(self):
image_reg_pg = (
self.home_pg.go_to_dataprocessing_clusters_imageregistrypage())
image_reg_pg.unregister_image(self.image_name)
def create_cluster(self):
nodegrouptpls_pg = (
self.home_pg.go_to_dataprocessing_clusters_nodegrouptemplatespage()
@ -113,60 +136,93 @@ class TestCRUD(SaharaTestCase):
cluster_pg.get_cluster_instances_count(self.cluster_name), 3,
"Cluster was not scaled")
def run_edp_job(self):
def create_datasources(self):
datasource_pg = (
self.home_pg.go_to_dataprocessing_jobs_datasourcespage())
input_name = self.gen_name('input')
datasource_pg.create(name=input_name, source_type="HDFS",
datasource_pg.create(name=self.ds_input_name, source_type="HDFS",
url="hdfs://user/input")
output_name = self.gen_name('output')
datasource_pg.create(name=output_name, source_type="Swift",
datasource_pg.create(name=self.ds_output_name, source_type="Swift",
url="swift://container/output")
# create job binary
job_binary_pg = (
self.home_pg.go_to_dataprocessing_jobs_jobbinariespage())
job_binary_name = self.gen_name('function')
job_binary_pg.create_job_binary_from_file(job_binary_name, __file__)
self.assertTrue(job_binary_pg.is_job_binary_present(job_binary_name),
"Job binary is not in the binaries job table after"
" its creation.")
# create job template
jobtemplates_pg = (
self.home_pg.go_to_dataprocessing_jobs_jobtemplatespage())
jobtemplate_name = self.gen_name('test-job')
jobtemplates_pg.create(name=jobtemplate_name, job_type='Pig',
binary_name=job_binary_name)
# launch job
jobtemplates_pg.launch_on_exists(job_name=jobtemplate_name,
input_name=input_name,
output_name=output_name,
cluster_name=self.cluster_name)
jobs_pg = self.home_pg.go_to_dataprocessing_jobs_jobspage()
jobs_pg.wait_until_job_succeeded(jobtemplate_name)
# delete job
jobs_pg.delete(jobtemplate_name)
# delete job_template
jobtemplates_pg = (
self.home_pg.go_to_dataprocessing_jobs_jobtemplatespage())
jobtemplates_pg.delete(jobtemplate_name)
# delete job binary
job_binary_pg = (
self.home_pg.go_to_dataprocessing_jobs_jobbinariespage())
job_binary_pg.delete_job_binary(job_binary_name)
self.assertFalse(job_binary_pg.is_job_binary_present(job_binary_name),
"Job binary was not removed from binaries job table.")
def delete_datasources(self):
datasource_pg = (
self.home_pg.go_to_dataprocessing_jobs_datasourcespage())
datasource_pg.delete(input_name)
datasource_pg.delete(output_name)
datasource_pg.delete(self.ds_input_name)
datasource_pg.delete(self.ds_output_name)
def create_job_binary(self):
job_binary_pg = (
self.home_pg.go_to_dataprocessing_jobs_jobbinariespage())
job_binary_pg.create_job_binary_from_file(
self.job_binary_name, __file__)
self.assertTrue(
job_binary_pg.is_job_binary_present(self.job_binary_name),
"Job binary is not in the binaries job table after its creation.")
def delete_job_binary(self):
job_binary_pg = (
self.home_pg.go_to_dataprocessing_jobs_jobbinariespage())
job_binary_pg.delete_job_binary(self.job_binary_name)
self.assertFalse(
job_binary_pg.is_job_binary_present(self.job_binary_name),
"Job binary was not removed from binaries job table.")
def create_job_template(self):
jobtemplates_pg = (
self.home_pg.go_to_dataprocessing_jobs_jobtemplatespage())
jobtemplates_pg.create(name=self.jobtemplate_name, job_type='Pig',
binary_name=self.job_binary_name)
def delete_job_template(self):
jobtemplates_pg = (
self.home_pg.go_to_dataprocessing_jobs_jobtemplatespage())
jobtemplates_pg.delete(self.jobtemplate_name)
def run_edp_job(self):
jobtemplates_pg = (
self.home_pg.go_to_dataprocessing_jobs_jobtemplatespage())
jobtemplates_pg.launch_on_exists(job_name=self.jobtemplate_name,
input_name=self.ds_input_name,
output_name=self.ds_output_name,
cluster_name=self.cluster_name)
jobs_pg = self.home_pg.go_to_dataprocessing_jobs_jobspage()
jobs_pg.wait_until_job_succeeded(self.jobtemplate_name)
def run_edp_job_with_parameters(self):
jobtemplates_pg = (
self.home_pg.go_to_dataprocessing_jobs_jobtemplatespage())
jobtemplates_pg.launch_on_exists(job_name=self.jobtemplate_name,
input_name=self.ds_input_name,
output_name=self.ds_output_name,
cluster_name=self.cluster_name,
adapt_swift=False,
datasource_substitution=False,
configuration={'foo': 'bar'},
parameters={'key': 'value'},
arguments=('one', 'two'))
jobs_pg = self.home_pg.go_to_dataprocessing_jobs_jobspage()
details = jobs_pg.get_details(self.jobtemplate_name)
expected = {
'Job Configuration': {
'configs': set([
'edp.substitute_data_source_for_name = False',
'foo = bar',
'edp.substitute_data_source_for_uuid = False'
]),
'params': set(['key = value']),
'args': set(['one', 'two']),
'job_execution_info': set([]),
},
'Job Template': self.jobtemplate_name,
}
details = {k: v for k, v in details.items() if k in expected}
self.assertEqual(expected, details)
def delete_job(self):
jobs_pg = self.home_pg.go_to_dataprocessing_jobs_jobspage()
jobs_pg.delete(self.jobtemplate_name)
def delete_cluster(self):
cluster_pg = self.home_pg.go_to_dataprocessing_clusters_clusterspage()
@ -177,12 +233,6 @@ class TestCRUD(SaharaTestCase):
self.assertFalse(cluster_pg.is_present(self.cluster_name),
"Cluster was not deleted.")
def test_cluster_operate(self):
self.create_cluster()
self.run_edp_job()
self.cluster_scale()
self.delete_cluster()
def tearDown(self):
clustertpls_pg = (
self.home_pg.go_to_dataprocessing_clusters_clustertemplatespage())
@ -193,17 +243,36 @@ class TestCRUD(SaharaTestCase):
self.home_pg.go_to_dataprocessing_clusters_nodegrouptemplatespage()
)
nodegrouptpls_pg.delete_many((self.worker_name, self.master_name))
super(TestCRUDBase, self).tearDown()
image_reg_pg = (
self.home_pg.go_to_dataprocessing_clusters_imageregistrypage())
image_reg_pg.unregister_image(self.image_name)
image_pg = self.home_pg.go_to_compute_imagespage()
image_pg.delete_image(self.image_name)
class TestCRUD(TestCRUDBase):
flavors_page = self.home_pg.go_to_system_flavorspage()
flavors_page.delete_flavor(self.flavor_name)
self.assertFalse(flavors_page.is_flavor_present(self.flavor_name))
def setUp(self):
super(TestCRUD, self).setUp()
self.create_flavor()
self.create_image()
self.register_image()
def test_cluster_operate(self):
self.create_cluster()
self.create_datasources()
self.create_job_binary()
self.create_job_template()
self.run_edp_job()
self.delete_job()
self.run_edp_job_with_parameters()
self.delete_job()
self.delete_job_template()
self.delete_job_binary()
self.delete_datasources()
self.cluster_scale()
self.delete_cluster()
def tearDown(self):
self.unregister_image()
self.delete_image()
self.delete_flavor()
super(TestCRUD, self).tearDown()
@ -415,3 +484,94 @@ class TestUpdateClusterTemplate(SaharaTestCase):
nodegrouptpls_pg.delete_many((self.worker_name, self.master_name,
self.new_worker_name))
super(TestUpdateClusterTemplate, self).tearDown()
class TestUpdateDataSource(SaharaTestCase):
def setUp(self):
super(TestUpdateDataSource, self).setUp()
datasource_pg = (
self.home_pg.go_to_dataprocessing_jobs_datasourcespage())
self.name = self.gen_name('input')
self.new_name = '{}-new'.format(self.name)
datasource_pg.create(name=self.name, source_type="HDFS",
url="hdfs://user/input")
def test_update(self):
datasource_pg = (
self.home_pg.go_to_dataprocessing_jobs_datasourcespage())
kwargs = {
'data_source_name': self.new_name,
'data_source_type': 'Swift',
'data_source_url': 'swift://container.sahara/object',
'data_source_credential_user': 'test-user',
'data_source_credential_pass': 'test-password',
'data_source_description': '{} description'.format(self.new_name),
}
datasource_pg.update(self.name, **kwargs)
self.assertTrue(datasource_pg.has_success_message())
self.assertFalse(datasource_pg.has_error_message())
self.assertTrue(datasource_pg.is_present(self.new_name),
"Node group template was not updated.")
details = datasource_pg.get_details(self.new_name)
expected = {
'Description': kwargs['data_source_description'],
'Name': self.new_name,
'Type': 'swift',
'URL': 'swift://container.sahara/object'
}
details = {k: v for k, v in details.items() if k in expected}
self.assertEqual(expected, details)
def tearDown(self):
datasource_pg = (
self.home_pg.go_to_dataprocessing_jobs_datasourcespage())
datasource_pg.delete_many([self.name, self.new_name])
super(TestUpdateDataSource, self).tearDown()
class TestUpdateJobBinaries(SaharaTestCase):
def setUp(self):
super(TestUpdateJobBinaries, self).setUp()
# create job binary
job_binary_pg = (
self.home_pg.go_to_dataprocessing_jobs_jobbinariespage())
self.name = self.gen_name('function')
self.new_name = '{}-new'.format(self.name)
job_binary_pg.create_job_binary_from_file(self.name, __file__)
def test_update(self):
job_binary_pg = (
self.home_pg.go_to_dataprocessing_jobs_jobbinariespage())
kwargs = {
"job_binary_name": self.new_name,
"job_binary_type": "Internal database",
"job_binary_internal": "*Create a script",
"job_binary_script_name": "script-name",
"job_binary_script": "script-text",
"job_binary_description": "{} description".format(self.new_name),
}
job_binary_pg.update_job_binary(self.name, **kwargs)
self.assertTrue(job_binary_pg.has_success_message())
self.assertFalse(job_binary_pg.has_error_message())
self.assertTrue(job_binary_pg.is_job_binary_present(self.new_name),
"Job binary was not updated.")
details = job_binary_pg.get_details(self.new_name)
expected = {
'Description': kwargs['job_binary_description'],
'Name': self.new_name,
}
details = {k: v for k, v in details.items() if k in expected}
self.assertEqual(expected, details)
def tearDown(self):
job_binary_pg = (
self.home_pg.go_to_dataprocessing_jobs_jobbinariespage())
for name in (self.name, self.new_name):
try:
job_binary_pg.delete_job_binary(name)
except Exception:
pass
super(TestUpdateJobBinaries, self).tearDown()