StackUser add _delete_keypair function

Add function which allows a subclass to delete a keypair, required
for migration of User/AccessKey resources

Change-Id: I5f8923fa604a26c47d7a4226e8932dbd367bfbd1
blueprint: instance-users
This commit is contained in:
Steven Hardy 2014-02-11 12:32:59 +00:00
parent 1eaabf672a
commit c7b6a7d5a8
3 changed files with 93 additions and 1 deletions

View File

@ -130,3 +130,22 @@ class StackUser(resource.Resource):
db_api.resource_data_set(self, 'secret_key', kp.secret,
redact=True)
return kp
def _delete_keypair(self):
# Subclasses may optionally call this to delete a keypair created
# via _create_keypair
user_id = self._get_user_id()
credential_id = db_api.resource_data_get(self, 'credential_id')
try:
self.keystone().delete_stack_domain_user_keypair(
user_id=user_id, project_id=self.stack.stack_user_project_id,
credential_id=credential_id)
except ValueError:
self.keystone().delete_ec2_keypair(
user_id=user_id, credential_id=credential_id)
for data_key in ('access_key', 'secret_key', 'credential_id'):
try:
db_api.resource_data_delete(self, data_key)
except exception.NotFound:
pass

View File

@ -114,7 +114,8 @@ class FakeKeystoneClient(object):
if user_id == self.user_id:
return self.creds
def delete_ec2_keypair(self, user_id, access):
def delete_ec2_keypair(self, user_id=None, access=None,
credential_id=None):
if user_id == self.user_id and access == self.creds.access:
self.creds = None
else:
@ -159,3 +160,7 @@ class FakeKeystoneClient(object):
def disable_stack_domain_user(self, user_id, project_id):
pass
def delete_stack_domain_user_keypair(self, user_id, project_id,
credential_id):
pass

View File

@ -281,3 +281,71 @@ class StackUserTest(HeatTestCase):
self.assertEqual((rsrc.CREATE, rsrc.COMPLETE), rsrc.state)
self.assertRaises(exception.Error, rsrc._create_keypair)
self.m.VerifyAll()
def test_delete_keypair(self):
rsrc = self._user_create(stack_name='user_testdel',
project_id='aprojectdel',
user_id='auserdel')
self.m.StubOutWithMock(fakes.FakeKeystoneClient,
'delete_stack_domain_user_keypair')
fakes.FakeKeystoneClient.delete_stack_domain_user_keypair(
user_id='auserdel', project_id='aprojectdel',
credential_id='acredential').AndReturn(None)
self.m.ReplayAll()
scheduler.TaskRunner(rsrc.create)()
self.assertEqual((rsrc.CREATE, rsrc.COMPLETE), rsrc.state)
db_api.resource_data_set(rsrc, 'credential_id', 'acredential')
db_api.resource_data_set(rsrc, 'access_key', 'access123')
db_api.resource_data_set(rsrc, 'secret_key', 'verysecret')
rsrc._delete_keypair()
rs_data = db_api.resource_data_get_all(rsrc)
self.assertEqual({'user_id': 'auserdel'}, rs_data)
self.m.VerifyAll()
def test_delete_keypair_legacy(self):
rsrc = self._user_create(stack_name='user_testdel',
project_id='aprojectdel',
user_id='auserdel')
self.m.StubOutWithMock(fakes.FakeKeystoneClient,
'delete_stack_domain_user_keypair')
fakes.FakeKeystoneClient.delete_stack_domain_user_keypair(
user_id='auserdel', project_id='aprojectdel',
credential_id='acredential').AndRaise(ValueError())
self.m.StubOutWithMock(fakes.FakeKeystoneClient,
'delete_ec2_keypair')
fakes.FakeKeystoneClient.delete_ec2_keypair(
user_id='auserdel', credential_id='acredential').AndReturn(None)
self.m.ReplayAll()
scheduler.TaskRunner(rsrc.create)()
self.assertEqual((rsrc.CREATE, rsrc.COMPLETE), rsrc.state)
db_api.resource_data_set(rsrc, 'credential_id', 'acredential')
db_api.resource_data_set(rsrc, 'access_key', 'access123')
db_api.resource_data_set(rsrc, 'secret_key', 'verysecret')
rsrc._delete_keypair()
rs_data = db_api.resource_data_get_all(rsrc)
self.assertEqual({'user_id': 'auserdel'}, rs_data)
self.m.VerifyAll()
def test_delete_keypair_notfound(self):
rsrc = self._user_create(stack_name='user_testdel',
project_id='aprojectdel',
user_id='auserdel')
self.m.StubOutWithMock(fakes.FakeKeystoneClient,
'delete_stack_domain_user_keypair')
fakes.FakeKeystoneClient.delete_stack_domain_user_keypair(
user_id='auserdel', project_id='aprojectdel',
credential_id='acredential').AndReturn(None)
self.m.ReplayAll()
scheduler.TaskRunner(rsrc.create)()
self.assertEqual((rsrc.CREATE, rsrc.COMPLETE), rsrc.state)
db_api.resource_data_set(rsrc, 'credential_id', 'acredential')
rsrc._delete_keypair()
rs_data = db_api.resource_data_get_all(rsrc)
self.assertEqual({'user_id': 'auserdel'}, rs_data)
self.m.VerifyAll()