Adds cleanup on v2.2 keypair api and tests

Removes unused arguments in test_keypairs.
Sets the keypair 'type' field to enum in the v2.2 validation
schema.
Removes check if the keypair 'type' is valid, as it will be done
by the validation schema.
Adds functional tests for ssh and x509 keypairs.

Closes-Bug: 1434033

Change-Id: I39f0a363458a17a4011a0661a45dc449f122ba1a
This commit is contained in:
Claudiu Belu 2015-03-05 21:39:08 +02:00
parent ef4c6ee909
commit 6fd652c64b
8 changed files with 83 additions and 36 deletions

View File

@ -41,7 +41,7 @@ class KeypairController(wsgi.Controller):
def _filter_keypair(self, keypair, **attrs):
# TODO(claudiub): After v2 and v2.1 is no longer supported,
# keypair_type can be added to the clean dict below
# keypair.type can be added to the clean dict below
clean = {
'name': keypair.name,
'public_key': keypair.public_key,
@ -61,14 +61,14 @@ class KeypairController(wsgi.Controller):
Sending name will generate a key and return private_key
and fingerprint.
Keypair will have the type ssh or x509, specified by key_type.
Keypair will have the type ssh or x509, specified by type.
You can send a public_key to add an existing ssh/x509 key.
params: keypair object with:
name (required) - string
public_key (optional) - string
key_type (optional) - string
type (optional) - string
"""
return self._create(req, body, type=True)
@ -95,7 +95,7 @@ class KeypairController(wsgi.Controller):
params = body['keypair']
name = params['name']
key_type = params.get('key_type', keypair_obj.KEYPAIR_TYPE_SSH)
key_type = params.get('type', keypair_obj.KEYPAIR_TYPE_SSH)
try:
if 'public_key' in params:

View File

@ -39,7 +39,10 @@ create_v22 = {
'type': 'object',
'properties': {
'name': parameter_types.name,
'type': {'type': 'string'},
'type': {
'type': 'string',
'enum': ['ssh', 'x509']
},
'public_key': {'type': 'string'},
},
'required': ['name'],

View File

@ -3741,11 +3741,6 @@ class KeypairAPI(base.Base):
notify.info(context, 'keypair.%s' % event_suffix, payload)
def _validate_new_key_pair(self, context, user_id, key_name, key_type):
if key_type not in [keypair_obj.KEYPAIR_TYPE_SSH,
keypair_obj.KEYPAIR_TYPE_X509]:
raise exception.InvalidKeypair(
reason=_('Specified Keypair type "%s" is invalid') % key_type)
safe_chars = "_- " + string.digits + string.ascii_letters
clean_value = "".join(x for x in key_name if x in safe_chars)
if clean_value != key_name:

View File

@ -150,7 +150,7 @@ def generate_x509_fingerprint(pem_key):
'-fingerprint', '-noout',
process_input=pem_key)
fingerprint = string.strip(out.rpartition('=')[2])
return fingerprint
return fingerprint.lower()
except processutils.ProcessExecutionError as ex:
raise exception.InvalidKeypair(
reason=_('failed to generate X509 fingerprint. '

View File

@ -260,6 +260,9 @@ class ApiSampleTestBase(integrated_helpers._IntegratedTestBase):
xmltime_re = ('\d{4}-[0,1]\d-[0-3]\d '
'\d{2}:\d{2}:\d{2}'
'(\.\d{6})?(\+00:00)?')
# NOTE(claudiub): the x509 keypairs are different from the
# ssh keypairs. For example, the x509 fingerprint has 40 bytes.
return {
'isotime': isotime_re,
'strtime': strtime_re,
@ -273,12 +276,14 @@ class ApiSampleTestBase(integrated_helpers._IntegratedTestBase):
'uuid': '[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}'
'-[0-9a-f]{4}-[0-9a-f]{12}',
'reservation_id': 'r-[0-9a-zA-Z]{8}',
'private_key': '-----BEGIN RSA PRIVATE KEY-----'
'private_key': '(-----BEGIN RSA PRIVATE KEY-----|)'
'[a-zA-Z0-9\n/+=]*'
'-----END RSA PRIVATE KEY-----',
'public_key': 'ssh-rsa[ a-zA-Z0-9/+=]*'
'Generated-by-Nova',
'fingerprint': '([0-9a-f]{2}:){15}[0-9a-f]{2}',
'(-----END RSA PRIVATE KEY-----|)',
'public_key': '(ssh-rsa|-----BEGIN CERTIFICATE-----)'
'[ a-zA-Z0-9\n/+=]*'
'(Generated-by-Nova|-----END CERTIFICATE-----)',
'fingerprint': '(([0-9a-f]{2}:){19}|([0-9a-f]{2}:){15})'
'[0-9a-f]{2}',
'keypair_type': 'ssh|x509',
'host': self._get_host(),
'host_name': '[0-9a-z]{32}',

View File

@ -17,6 +17,7 @@ import uuid
from nova.objects import keypair as keypair_obj
from nova.tests.functional.v3 import api_sample_base
from nova.tests.unit import fake_crypto
class KeyPairsSampleJsonTest(api_sample_base.ApiSampleTestBaseV3):
@ -29,11 +30,10 @@ class KeyPairsSampleJsonTest(api_sample_base.ApiSampleTestBaseV3):
subs['keypair_name'] = 'keypair-[0-9a-f-]+'
return subs
def test_keypairs_post(self, public_key=None):
return self._check_keypairs_post(public_key,
api_version=self.request_api_version)
def test_keypairs_post(self):
return self._check_keypairs_post()
def _check_keypairs_post(self, public_key, **kwargs):
def _check_keypairs_post(self, **kwargs):
"""Get api sample of key pairs post request."""
key_name = 'keypair-' + str(uuid.uuid4())
subs = dict(keypair_name=key_name, **kwargs)
@ -50,18 +50,15 @@ class KeyPairsSampleJsonTest(api_sample_base.ApiSampleTestBaseV3):
return key_name
def test_keypairs_import_key_post(self):
self._check_keypairs_import_key_post()
public_key = fake_crypto.get_ssh_public_key()
self._check_keypairs_import_key_post(public_key)
def _check_keypairs_import_key_post(self, **kwargs):
def _check_keypairs_import_key_post(self, public_key, **kwargs):
# Get api sample of key pairs post to import user's key.
key_name = 'keypair-' + str(uuid.uuid4())
subs = {
'keypair_name': key_name,
'public_key': "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAAAgQDx8nkQv/zgGg"
"B4rMYmIf+6A4l6Rr+o/6lHBQdW5aYd44bd8JttDCE/F/pNRr0l"
"RE+PiqSPO8nDPHw0010JeMH9gYgnnFlyY3/OcJ02RhIPyyxYpv"
"9FhY+2YiUkpwFOcLImyrxEsYXpD/0d3ac30bNH6Sw9JD9UZHYc"
"pSxsIbECHw== Generated-by-Nova"
'public_key': public_key
}
subs.update(**kwargs)
response = self._do_post('os-keypairs', 'keypairs-import-post-req',
@ -103,10 +100,53 @@ class KeyPairsV22SampleJsonTest(KeyPairsSampleJsonTest):
expected_post_status_code = 201
expected_delete_status_code = 204
def test_keypairs_post(self, public_key=None):
def test_keypairs_post(self):
# NOTE(claudiub): overrides the method with the same name in
# KeypairsSampleJsonTest, as it is used by other tests.
return self._check_keypairs_post(
public_key, keypair_type=keypair_obj.KEYPAIR_TYPE_SSH)
keypair_type=keypair_obj.KEYPAIR_TYPE_SSH)
def test_keypairs_post_x509(self):
return self._check_keypairs_post(
keypair_type=keypair_obj.KEYPAIR_TYPE_X509)
def test_keypairs_post_invalid(self):
key_name = 'keypair-' + str(uuid.uuid4())
subs = dict(keypair_name=key_name, keypair_type='fakey_type')
response = self._do_post('os-keypairs', 'keypairs-post-req', subs,
api_version=self.request_api_version)
self.assertEqual(400, response.status_code)
def test_keypairs_import_key_post(self):
# NOTE(claudiub): overrides the method with the same name in
# KeypairsSampleJsonTest, since the API sample expects a keypair_type.
public_key = fake_crypto.get_ssh_public_key()
self._check_keypairs_import_key_post(
keypair_type=keypair_obj.KEYPAIR_TYPE_SSH)
public_key, keypair_type=keypair_obj.KEYPAIR_TYPE_SSH)
def test_keypairs_import_key_post_x509(self):
public_key = fake_crypto.get_x509_cert_and_fingerprint()[0]
public_key = public_key.replace('\n', '\\n')
self._check_keypairs_import_key_post(
public_key, keypair_type=keypair_obj.KEYPAIR_TYPE_X509)
def _check_keypairs_import_key_post_invalid(self, keypair_type):
key_name = 'keypair-' + str(uuid.uuid4())
subs = {
'keypair_name': key_name,
'keypair_type': keypair_type,
'public_key': fake_crypto.get_ssh_public_key()
}
response = self._do_post('os-keypairs', 'keypairs-import-post-req',
subs, api_version=self.request_api_version)
self.assertEqual(400, response.status_code)
def test_keypairs_import_key_post_invalid_type(self):
self._check_keypairs_import_key_post_invalid(
keypair_type='fakey_type')
def test_keypairs_import_key_post_invalid_combination(self):
self._check_keypairs_import_key_post_invalid(
keypair_type=keypair_obj.KEYPAIR_TYPE_X509)

View File

@ -141,11 +141,6 @@ class CreateImportSharedTestMixIn(object):
msg = "Keypair name contains unsafe characters"
self.assertInvalidKeypair(msg, '* BAD CHARACTERS! *')
def test_invalid_keypair_type(self):
self.keypair_type = 'fakey_type'
msg = 'Specified Keypair type "fakey_type" is invalid'
self.assertInvalidKeypair(msg, 'test')
def test_already_exists(self):
def db_key_pair_create_duplicate(context, keypair):
raise exception.KeyPairExists(key_name=keypair.get('name', ''))

View File

@ -110,7 +110,7 @@ YZhQPOYoNPEOYru116HdHzjGDVifgWf/nDL8Un5tjJFDSf7jSLtA
def get_x509_cert_and_fingerprint():
fingerprint = "A1:6F:6D:EA:A6:36:D0:3A:C6:EB:B6:EE:07:94:3E:2A:90:98:2B:C9"
fingerprint = "a1:6f:6d:ea:a6:36:d0:3a:c6:eb:b6:ee:07:94:3e:2a:90:98:2b:c9"
certif = (
"-----BEGIN CERTIFICATE-----\n"
"MIIDIjCCAgqgAwIBAgIJAIE8EtWfZhhFMA0GCSqGSIb3DQEBCwUAMCQxIjAgBgNV\n"
@ -132,3 +132,12 @@ def get_x509_cert_and_fingerprint():
"ZBcstxwcB4GIwnp1DrPW9L2gw5eLe1Sl6wdz443TW8K/KPV9rWQ=\n"
"-----END CERTIFICATE-----\n")
return certif, fingerprint
def get_ssh_public_key():
public_key = ("ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAAAgQDx8nkQv/zgGg"
"B4rMYmIf+6A4l6Rr+o/6lHBQdW5aYd44bd8JttDCE/F/pNRr0l"
"RE+PiqSPO8nDPHw0010JeMH9gYgnnFlyY3/OcJ02RhIPyyxYpv"
"9FhY+2YiUkpwFOcLImyrxEsYXpD/0d3ac30bNH6Sw9JD9UZHYc"
"pSxsIbECHw== Generated-by-Nova")
return public_key