Modify Tempest tests of resource_management.

Currently, Kingbird uses curl requests to run Tempest tests
This commit replaces curl requests with kingbird python sdk
(python-kingbirdclient)for resource_management only.
Add kingbirdclient to the test-requriements.txt
Partially Implements: kingbirdclient-tempest
Change-Id: I7fbeec5376dc3a2333a7cc9ad51108feb79a7b03
This commit is contained in:
Goutham Pratapa 2017-06-07 16:03:15 +05:30
parent c357203af5
commit 3c2bc63a9f
4 changed files with 271 additions and 380 deletions

View File

@ -1,151 +0,0 @@
# Copyright (c) 2017 Ericsson AB.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import requests
from keystoneclient.auth.identity import v3
from keystoneclient import session
from keystoneclient.v3 import client as ks_client
from novaclient import client as nv_client
from oslo_log import log as logging
from tempest import config
CONF = config.CONF
NOVA_API_VERSION = "2.37"
KEYPAIRS = ["kb_test_keypair1", "kb_test_keypair2"]
resource_sync_url = "/os-sync/"
LOG = logging.getLogger(__name__)
def get_session():
return get_current_session(
CONF.auth.admin_username,
CONF.auth.admin_password,
CONF.auth.admin_project_name
)
def get_current_session(username, password, tenant_name):
auth = v3.Password(
auth_url=CONF.identity.uri_v3,
username=username,
password=password,
project_name=tenant_name,
user_domain_name=CONF.auth.admin_domain_name,
project_domain_name=CONF.auth.admin_domain_name)
sess = session.Session(auth=auth)
return sess
def get_openstack_drivers(keystone_client, region, project_name,
user_name, password):
resources = dict()
# Create Project, User and assign role to new user
project = keystone_client.projects.create(project_name,
CONF.auth.admin_domain_name)
user = keystone_client.users.create(user_name, CONF.auth.admin_domain_name,
project.id, password)
sess = get_current_session(user_name, password, project_name)
# Create new client to form session
new_key_client = ks_client.Client(session=sess)
# Create member role to which we create keypair and Sync
mem_role = [current_role.id for current_role in
keystone_client.roles.list()
if current_role.name == 'Member'][0]
keystone_client.roles.grant(mem_role, user=user, project=project)
token = new_key_client.session.get_token()
nova_client = nv_client.Client(NOVA_API_VERSION,
session=sess,
region_name=region)
resources = {"user_id": user.id, "project_id": project.id,
"session": sess, "token": token,
"nova_version": NOVA_API_VERSION,
"keypairs": KEYPAIRS,
"os_drivers": {'keystone': keystone_client,
'nova': nova_client}}
return resources
def get_keystone_client(session):
return ks_client.Client(session=session)
def get_resource_sync_url_and_headers(token, project_id, api_url):
headers = {
'Content-Type': 'application/json',
'X-Auth-Token': token,
}
url_string = CONF.kingbird.endpoint_url + CONF.kingbird.api_version + \
"/" + project_id + api_url
return headers, url_string
def sync_resource(token, project_id, post_body):
body = json.dumps(post_body)
headers, url_string = get_resource_sync_url_and_headers(token, project_id,
resource_sync_url)
response = requests.post(url_string, headers=headers, data=body)
return response
def get_sync_job_list(token, project_id, action=None):
headers, url_string = get_resource_sync_url_and_headers(token, project_id,
resource_sync_url)
if action:
url_string = url_string + action
response = requests.get(url_string, headers=headers)
return response
def delete_db_entries(token, project_id, job_id):
headers, url_string = get_resource_sync_url_and_headers(token, project_id,
resource_sync_url)
url_string = url_string + job_id
response = requests.delete(url_string, headers=headers)
return response.status_code
def get_regions(key_client):
return [current_region.id for current_region in
key_client.regions.list()]
def cleanup_resources(openstack_drivers, resource_ids):
keystone_client = openstack_drivers['keystone']
keystone_client.projects.delete(resource_ids['project_id'])
keystone_client.users.delete(resource_ids['user_id'])
def cleanup_keypairs(regions, resource_ids,
current_sess):
for current_region in regions:
for keypair in KEYPAIRS:
nova_client = nv_client.Client(NOVA_API_VERSION,
session=current_sess,
region_name=current_region)
nova_client.keypairs.delete(keypair,
user_id=resource_ids['user_id'])
def create_keypairs(openstack_drivers):
nova_client = openstack_drivers['nova']
resources = dict()
for keypair in KEYPAIRS:
resources[keypair] = nova_client.keypairs.create(keypair)
return resources

View File

@ -1,4 +1,4 @@
# Copyright 2017 Ericsson AB
# Copyright 2017 Ericsson AB.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -13,86 +13,182 @@
# License for the specific language governing permissions and limitations
# under the License.
from tempest.lib.common import api_version_utils
from keystoneclient.auth.identity import v3
from keystoneclient import session
from keystoneclient.v3 import client as ks_client
from kingbirdclient.api.v1 import client as kb_client
from tempest.api.compute import base as kp_base
from tempest import config
from tempest.lib.common.utils import data_utils
import tempest.test
from kingbird.tests.tempest.scenario.resource_management \
import resource_sync_client
from kingbird.tests.tempest.scenario import consts
from novaclient import client as nv_client
CONF = config.CONF
KINGBIRD_URL = CONF.kingbird.endpoint_url + CONF.kingbird.api_version
NOVA_API_VERSION = "2.37"
class BaseKingbirdTest(api_version_utils.BaseMicroversionTest,
tempest.test.BaseTestCase):
"""Base test case class for all Kingbird Resource sync API tests."""
class BaseKingbirdClass(object):
@classmethod
def skip_checks(cls):
super(BaseKingbirdTest, cls).skip_checks()
def _keypair_sync_job_create(self, force, keys=None):
result = dict()
admin_client = self._get_admin_keystone()
regions = self._get_regions(admin_client)
target_regions = regions
target_regions.remove(self.client.region)
if keys:
create_response = self._sync_job_create(
consts.KEYPAIR_RESOURCE_TYPE, target_regions, keys,
force=force)
result['keys'] = keys
else:
key_list = self._create_keypairs()
# Now sync all the keypairs in other regions.
create_response = self._sync_job_create(
consts.KEYPAIR_RESOURCE_TYPE, target_regions, key_list,
force=force)
result['keys'] = key_list
job_id = create_response.get('job_status').get('id')
result['job_id'] = job_id
result['admin'] = admin_client
result['target'] = target_regions
return result
def setUp(self):
super(BaseKingbirdTest, self).setUp()
def _check_job_status(self):
# Wait until the status of the job is not "IN_PROGRESS"
job_list_resp = self.get_sync_job_list()
status = job_list_resp.get('job_set')[0].get('sync_status')
return status != consts.JOB_PROGRESS
@classmethod
def setup_credentials(cls):
super(BaseKingbirdTest, cls).setup_credentials()
session = resource_sync_client.get_session()
cls.keystone_client = resource_sync_client.get_keystone_client(session)
cls.regions = resource_sync_client.get_regions(cls.keystone_client)
def _sync_job_create(self, resource_type, target_regions, resource_list,
force):
# JSON body used to sync resource.
body = {"resource_type": resource_type,
"resources": resource_list,
"source": self.client.region, "force": force,
"target": target_regions}
response = self.sync_resource(body)
return response
def _get_admin_keystone(self):
auth = v3.Password(
auth_url=CONF.identity.uri_v3,
username=CONF.auth.admin_username,
password=CONF.auth.admin_password,
project_name=CONF.auth.admin_project_name,
user_domain_name=CONF.auth.admin_domain_name,
project_domain_name=CONF.auth.admin_domain_name)
self.sess = session.Session(auth=auth)
keystone_admin = ks_client.Client(session=self.sess)
return keystone_admin
def _get_regions(self, keystone_admin):
regions = [current_region.id for current_region in
keystone_admin.regions.list()]
return regions
def sync_resource(self, post_body):
response = self.kingbird_client.sync_manager.\
sync_resources(**post_body)
result = dict()
for i in response:
result['id'] = i.id
result['status'] = i.status
result['created_at'] = i.created_at
res = dict()
res['job_status'] = result
return res
def delete_db_entries(self, job_id):
self.kingbird_client.sync_manager.delete_sync_job(job_id)
def get_sync_job_list(self, action=None):
response = self.kingbird_client.sync_manager.list_sync_jobs(action)
result = dict()
job_set = list()
res = dict()
for i in response:
result['id'] = i.id
result['sync_status'] = i.status
result['created_at'] = i.created_at
result['updated_at'] = i.updated_at
job_set.append(result.copy())
res['job_set'] = job_set
return res
def get_sync_job_detail(self, job_id):
response = self.kingbird_client.sync_manager.sync_job_detail(job_id)
result = dict()
job_set = list()
res = dict()
for i in response:
result['resource'] = i.resource_name
result['target_region'] = i.target_region
result['sync_status'] = i.status
result['created_at'] = i.created_at
result['updated_at'] = i.updated_at
result['source_region'] = i.source_region
result['resource_type'] = i.resource_type
job_set.append(result.copy())
res['job_set'] = job_set
return res
class BaseKBKeypairTest(kp_base.BaseV2ComputeTest):
"""Base test case class for all keypair API tests."""
@classmethod
def setup_clients(cls):
super(BaseKingbirdTest, cls).setup_clients()
super(BaseKBKeypairTest, cls).setup_clients()
cls.client = cls.keypairs_client
@classmethod
def create_resources(cls):
# Create Project, User, flavor, subnet & network for test
project_name = data_utils.rand_name('kb-project')
user_name = data_utils.rand_name('kb-user')
password = data_utils.rand_name('kb-password')
cls.openstack_details = resource_sync_client.get_openstack_drivers(
cls.keystone_client,
cls.regions[0],
project_name,
user_name,
password)
cls.openstack_drivers = cls.openstack_details['os_drivers']
cls.token = cls.openstack_details['token']
cls.session = cls.openstack_details['session']
def _create_keypairs(self):
key_list = list()
for _ in range(2):
keypair = self.create_keypair()
key_list.append(keypair['name'])
return key_list
@classmethod
def resource_cleanup(cls):
super(BaseKingbirdTest, cls).resource_cleanup()
def _check_keypairs_in_target_region(self, target_regions, key_list):
for region in target_regions:
for keypair in key_list:
nova_client = nv_client.Client(NOVA_API_VERSION,
session=self.sess,
region_name=region)
source_keypair = nova_client.keypairs.get(
keypair, self.client.user_id)
self.assertEqual(source_keypair.name, keypair)
@classmethod
def create_keypairs(cls):
cls.resource_ids = resource_sync_client.\
create_keypairs(cls.openstack_drivers)
cls.resource_ids.update(cls.openstack_details)
def _cleanup_resources(self, key_list, regions, user_id):
for region in regions:
for key in key_list:
nova_client = nv_client.Client(NOVA_API_VERSION,
session=self.sess,
region_name=region)
nova_client.keypairs.delete(key, user_id)
@classmethod
def delete_keypairs(cls):
resource_sync_client.cleanup_keypairs(cls.regions,
cls.resource_ids, cls.session)
def _delete_keypair(self, keypair_name, **params):
self.client.delete_keypair(keypair_name, **params)
@classmethod
def delete_resources(cls):
resource_sync_client.cleanup_resources(cls.openstack_drivers,
cls.resource_ids)
@classmethod
def sync_keypair(cls, project_id, post_body):
response = resource_sync_client.sync_resource(cls.token, project_id,
post_body)
return response
@classmethod
def get_sync_list(cls, project_id, action=None):
response = resource_sync_client.get_sync_job_list(
cls.token, project_id, action)
return response
@classmethod
def delete_db_entries(cls, project_id, job_id):
response = resource_sync_client.delete_db_entries(
cls.token, project_id, job_id)
return response
def create_keypair(self, keypair_name=None,
pub_key=None, keypair_type=None,
user_id=None):
if keypair_name is None:
keypair_name = data_utils.rand_name('kb-keypair')
kwargs = {'name': keypair_name}
delete_params = {}
if pub_key:
kwargs.update({'public_key': pub_key})
if keypair_type:
kwargs.update({'type': keypair_type})
if user_id:
kwargs.update({'user_id': user_id})
delete_params['user_id'] = user_id
body = self.client.create_keypair(**kwargs)['keypair']
self.kingbird_client = kb_client.Client(
kingbird_url=KINGBIRD_URL, auth_token=self.client.token,
project_id=self.client.tenant_id)
self.addCleanup(self._delete_keypair, keypair_name, **delete_params)
return body

View File

@ -1,4 +1,4 @@
# Copyright 2017 Ericsson AB
# Copyright 2017 Ericsson AB.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -13,222 +13,167 @@
# License for the specific language governing permissions and limitations
# under the License.
from kingbird.tests.tempest.scenario.resource_management \
import resource_sync_client
from kingbird.tests.tempest.scenario.resource_management. \
sync_tests import base
from kingbird.tests.tempest.scenario import consts
from kingbird.tests import utils
import kingbirdclient
from novaclient import client as nv_client
from tempest.lib import decorators
from kingbird.tests.tempest.scenario import consts
from kingbird.tests.tempest.scenario.resource_management.sync_tests \
import base
from kingbird.tests import utils
FORCE = "True"
DEFAULT_FORCE = "False"
class KingbirdKPTest(base.BaseKingbirdTest):
class KingbirdKeyPairSyncTest(base.BaseKBKeypairTest,
base.BaseKingbirdClass):
@classmethod
def setup_clients(self):
super(KingbirdKPTest, self).setup_clients()
def tearDown(self):
super(KingbirdKPTest, self).tearDown()
@classmethod
def resource_cleanup(self):
super(KingbirdKPTest, self).resource_cleanup()
self.delete_keypairs()
self.delete_resources()
@classmethod
def resource_setup(self):
super(KingbirdKPTest, self).resource_setup()
self.create_resources()
self.create_keypairs()
@classmethod
def setup_credentials(self):
super(KingbirdKPTest, self).setup_credentials()
self.session = resource_sync_client.get_session()
self.key_client = resource_sync_client.\
get_keystone_client(self.session)
self.regions = resource_sync_client.get_regions(self.key_client)
def _check_job_status(self):
# Wait until the status of the job is not "IN_PROGRESS"
job_list_resp = self.get_sync_list(self.resource_ids["project_id"])
status = job_list_resp.json().get('job_set')[0].get('sync_status')
return status != consts.JOB_PROGRESS
def _sync_job_create(self, force):
body = {"resource_set": {"resource_type": consts.KEYPAIR_RESOURCE_TYPE,
"resources": self.resource_ids["keypairs"],
"source": self.regions[0], "force": force,
"target": self.regions[1:]}}
response = self.sync_keypair(self.resource_ids["project_id"], body)
return response
def _delete_entries_in_db(self, project, job):
response = self.delete_db_entries(self.resource_ids["project_id"], job)
return response
def test_keypair_sync(self):
create_response = self._sync_job_create(force=FORCE)
job_id = create_response.json().get('job_status').get('id')
self.assertEqual(create_response.status_code, 200)
@decorators.idempotent_id('5024d38b-a46a-4ebb-84be-40c9929eb864')
def test_kingbird_keypair_sync(self):
# Keypairs created should be available in the response list
# Create 2 keypairs:
job_details = self._keypair_sync_job_create(FORCE)
utils.wait_until_true(
lambda: self._check_job_status(),
exception=RuntimeError("Timed out waiting for job %s " % job_id))
target_regions = self.regions[1:]
for region in target_regions:
for keypair in self.resource_ids["keypairs"]:
nova_client = nv_client.Client(
self.resource_ids["nova_version"], session=self.session,
region_name=region)
source_keypair = nova_client.keypairs.get(
keypair, self.resource_ids["user_id"])
self.assertEqual(source_keypair.name, keypair)
# Clean_up the database entries
delete_response = self._delete_entries_in_db(
self.resource_ids["project_id"], job_id)
self.assertEqual(delete_response, 200)
exception=RuntimeError("Timed out waiting for job %s " %
job_details['job_id']))
# Check for resources in target_regions
self._check_keypairs_in_target_region(job_details['target'],
job_details['keys'])
# Clean_up the database entries and resources
self.delete_db_entries(job_details['job_id'])
self._cleanup_resources(job_details['keys'], job_details['target'],
self.client.user_id)
def test_get_sync_list(self):
create_response = self._sync_job_create(force=FORCE)
self.assertEqual(create_response.status_code, 200)
job_id = create_response.json().get('job_status').get('id')
@decorators.idempotent_id('8eeb04d1-6371-4834-b2e1-0d2dbed98cd5')
def test_get_kingbird_sync_list(self):
job_details = self._keypair_sync_job_create(FORCE)
utils.wait_until_true(
lambda: self._check_job_status(),
exception=RuntimeError("Timed out waiting for job %s " % job_id))
job_list_resp = self.get_sync_list(self.resource_ids["project_id"])
self.assertEqual(job_list_resp.status_code, 200)
# Clean_up the database entries
delete_response = self._delete_entries_in_db(
self.resource_ids["project_id"], job_id)
self.assertEqual(delete_response, 200)
exception=RuntimeError("Timed out waiting for job %s " %
job_details['job_id']))
job_list_resp = self.get_sync_job_list()
self.assertEqual(job_list_resp['job_set'][0]['id'],
job_details['job_id'])
# Clean_up the database entries and resources
self.delete_db_entries(job_details['job_id'])
self._cleanup_resources(job_details['keys'], job_details['target'],
self.client.user_id)
@decorators.idempotent_id('fed7e0b3-0d47-4729-9959-8b6b14230f48')
def test_get_sync_job_details(self):
create_response = self._sync_job_create(force=FORCE)
self.assertEqual(create_response.status_code, 200)
job_id = create_response.json().get('job_status').get('id')
job_details = self._keypair_sync_job_create(FORCE)
utils.wait_until_true(
lambda: self._check_job_status(),
exception=RuntimeError("Timed out waiting for job %s " % job_id))
job_list_resp = self.get_sync_list(self.resource_ids["project_id"],
job_id)
self.assertEqual(job_list_resp.status_code, 200)
exception=RuntimeError("Timed out waiting for job %s " %
job_details['job_id']))
job_list_resp = self.get_sync_job_detail(job_details['job_id'])
key_list = job_details['keys']
for i in range(len(key_list)):
for j in range(len(job_list_resp.get('job_set'))):
if key_list[i] in job_list_resp.get('job_set')[j].values():
self.assertEqual(
job_list_resp.get('job_set')[j].get('resource'),
key_list[i])
self.assertEqual(
job_list_resp.json().get('job_set')[0].get('resource'),
self.resource_ids["keypairs"][0])
self.assertEqual(
job_list_resp.json().get('job_set')[1].get('resource'),
self.resource_ids["keypairs"][1])
self.assertEqual(
job_list_resp.json().get('job_set')[0].get('resource_type'),
job_list_resp.get('job_set')[0].get('resource_type'),
consts.KEYPAIR_RESOURCE_TYPE)
# Clean_up the database entries
delete_response = self._delete_entries_in_db(
self.resource_ids["project_id"], job_id)
self.assertEqual(delete_response, 200)
self.delete_db_entries(job_details['job_id'])
self._cleanup_resources(job_details['keys'], job_details['target'],
self.client.user_id)
@decorators.idempotent_id('f5701f6a-183b-41fe-b0ab-e0ddef3fbd86')
def test_get_active_jobs(self):
create_response = self._sync_job_create(force=FORCE)
self.assertEqual(create_response.status_code, 200)
job_id = create_response.json().get('job_status').get('id')
active_job = self.get_sync_list(self.resource_ids["project_id"],
consts.JOB_ACTIVE)
status = active_job.json().get('job_set')[0].get('sync_status')
self.assertEqual(active_job.status_code, 200)
job_details = self._keypair_sync_job_create(FORCE)
active_job = self.get_sync_job_list(consts.JOB_ACTIVE)
status = active_job.get('job_set')[0].get('sync_status')
self.assertEqual(status, consts.JOB_PROGRESS)
utils.wait_until_true(
lambda: self._check_job_status(),
exception=RuntimeError("Timed out waiting for job %s " % job_id))
exception=RuntimeError("Timed out waiting for job %s " %
job_details['job_id']))
# Clean_up the database entries
delete_response = self._delete_entries_in_db(
self.resource_ids["project_id"], job_id)
self.assertEqual(delete_response, 200)
self.delete_db_entries(job_details['job_id'])
self._cleanup_resources(job_details['keys'], job_details['target'],
self.client.user_id)
@decorators.idempotent_id('9e3fd15b-e170-4fa6-877c-ad4c22a137af')
def test_delete_active_jobs(self):
create_response = self._sync_job_create(force=FORCE)
self.assertEqual(create_response.status_code, 200)
job_id = create_response.json().get('job_status').get('id')
response = self._delete_entries_in_db(self.resource_ids["project_id"],
job_id)
job_details = self._keypair_sync_job_create(FORCE)
self.assertRaisesRegexp(kingbirdclient.exceptions.APIException,
"406 *",
self.delete_db_entries,
job_details['job_id'])
# Actual result when we try and delete an active_job
self.assertEqual(response, 406)
# Clean_up the database entries
utils.wait_until_true(
lambda: self._check_job_status(),
exception=RuntimeError("Timed out waiting for job %s " % job_id))
delete_response = self._delete_entries_in_db(
self.resource_ids["project_id"], job_id)
self.assertEqual(delete_response, 200)
exception=RuntimeError("Timed out waiting for job %s " %
job_details['job_id']))
# Clean_up the database entries
self.delete_db_entries(job_details['job_id'])
self._cleanup_resources(job_details['keys'], job_details['target'],
self.client.user_id)
@decorators.idempotent_id('adf565b1-c076-4273-b7d2-305cc144d0e2')
def test_delete_already_deleted_job(self):
create_response = self._sync_job_create(force=FORCE)
self.assertEqual(create_response.status_code, 200)
job_id = create_response.json().get('job_status').get('id')
job_details = self._keypair_sync_job_create(FORCE)
# Clean_up the database entries
utils.wait_until_true(
lambda: self._check_job_status(),
exception=RuntimeError("Timed out waiting for job %s " % job_id))
delete_response = self._delete_entries_in_db(
self.resource_ids["project_id"], job_id)
self.assertEqual(delete_response, 200)
delete_response2 = self._delete_entries_in_db(
self.resource_ids["project_id"], job_id)
self.assertEqual(delete_response2, 404)
exception=RuntimeError("Timed out waiting for job %s " %
job_details['job_id']))
self.delete_db_entries(job_details['job_id'])
self.assertRaisesRegexp(kingbirdclient.exceptions.APIException,
"404 *",
self.delete_db_entries, job_details['job_id'])
self._cleanup_resources(job_details['keys'], job_details['target'],
self.client.user_id)
@decorators.idempotent_id('fcdf50e0-1f7f-4844-8806-23de0fb221d6')
def test_keypair_sync_with_force_true(self):
create_response_1 = self._sync_job_create(force=FORCE)
self.assertEqual(create_response_1.status_code, 200)
job_id_1 = create_response_1.json().get('job_status').get('id')
job_details_1 = self._keypair_sync_job_create(FORCE)
job_id_1 = job_details_1['job_id']
utils.wait_until_true(
lambda: self._check_job_status(),
exception=RuntimeError("Timed out waiting for job %s " % job_id_1))
delete_response = self._delete_entries_in_db(
self.resource_ids["project_id"], job_id_1)
self.assertEqual(delete_response, 200)
create_response_2 = self._sync_job_create(force=FORCE)
self.assertEqual(create_response_2.status_code, 200)
job_id_2 = create_response_2.json().get('job_status').get('id')
self.delete_db_entries(job_id_1)
job_details_2 = self._keypair_sync_job_create(FORCE)
job_id_2 = job_details_2['job_id']
utils.wait_until_true(
lambda: self._check_job_status(),
exception=RuntimeError("Timed out waiting for job %s " % job_id_2))
# Clean_up the database entries
delete_response_2 = self._delete_entries_in_db(
self.resource_ids["project_id"], job_id_2)
self.assertEqual(delete_response_2, 200)
self.delete_db_entries(job_id_2)
self._cleanup_resources(job_details_2['keys'], job_details_2['target'],
self.client.user_id)
@decorators.idempotent_id('a340a910-d367-401e-b79a-0a979a3ce65b')
def test_keypair_sync_with_force_false(self):
create_response_1 = self._sync_job_create(force=DEFAULT_FORCE)
self.assertEqual(create_response_1.status_code, 200)
job_id_1 = create_response_1.json().get('job_status').get('id')
job_details_1 = self._keypair_sync_job_create(DEFAULT_FORCE)
job_id_1 = job_details_1['job_id']
utils.wait_until_true(
lambda: self._check_job_status(),
exception=RuntimeError("Timed out waiting for job %s " % job_id_1))
delete_response_1 = self._delete_entries_in_db(
self.resource_ids["project_id"], job_id_1)
self.assertEqual(delete_response_1, 200)
create_response_2 = self._sync_job_create(force=DEFAULT_FORCE)
self.assertEqual(create_response_2.status_code, 200)
job_id_2 = create_response_2.json().get('job_status').get('id')
self.delete_db_entries(job_id_1)
job_details_2 = self._keypair_sync_job_create(DEFAULT_FORCE,
job_details_1['keys'])
job_id_2 = job_details_2['job_id']
utils.wait_until_true(
lambda: self._check_job_status(),
exception=RuntimeError("Timed out waiting for job %s " % job_id_2))
job_list_resp = self.get_sync_list(self.resource_ids["project_id"],
job_id_2)
self.assertEqual(job_list_resp.status_code, 200)
job_list_resp = self.get_sync_job_detail(job_id_2)
# This job fail because resoruce is already created.
# We can use force to recreate that resource.
self.assertEqual(
job_list_resp.json().get('job_set')[0].get('sync_status'),
job_list_resp.get('job_set')[0].get('sync_status'),
consts.JOB_FAILURE)
self.assertEqual(
job_list_resp.json().get('job_set')[1].get('sync_status'),
job_list_resp.get('job_set')[1].get('sync_status'),
consts.JOB_FAILURE)
# Clean_up the database entries
delete_response_2 = self._delete_entries_in_db(
self.resource_ids["project_id"], job_id_2)
self.assertEqual(delete_response_2, 200)
self.delete_db_entries(job_id_2)
self._cleanup_resources(job_details_2['keys'], job_details_2['target'],
self.client.user_id)

View File

@ -20,3 +20,4 @@ os-testr>=0.8.0 # Apache-2.0
tempest-lib>=0.14.0 # Apache-2.0
ddt>=1.0.1 # MIT
pylint==1.7.1 # GPLv2
python-kingbirdclient>=0.1.0 # Apache-2.0