Merge "Prohibit certificate order resource"

This commit is contained in:
Zuul 2024-03-08 16:18:58 +00:00 committed by Gerrit Code Review
commit 33d188e0af
6 changed files with 8 additions and 480 deletions

View File

@ -26,9 +26,6 @@ from barbican.queue import client as async_client
LOG = utils.getLogger(__name__)
_DEPRECATION_MSG = '%s has been deprecated in the Newton release. ' \
'It will be removed in the Pike release.'
def _order_not_found():
"""Throw exception indicating order not found."""

View File

@ -20,7 +20,6 @@ import re
import jsonschema as schema
from ldap3.core import exceptions as ldap_exceptions
from ldap3.utils.dn import parse_dn
from OpenSSL import crypto
from oslo_utils import timeutils
from barbican.api import controllers
@ -449,7 +448,7 @@ class TypeOrderValidator(ValidatorBase, CACommonHelpersMixin):
"type": {
"type": "string",
"required": True,
"enum": ['key', 'asymmetric', 'certificate']
"enum": ['key', 'asymmetric']
}
}
}
@ -461,11 +460,7 @@ class TypeOrderValidator(ValidatorBase, CACommonHelpersMixin):
order_type = json_data.get('type').lower()
if order_type == models.OrderType.CERTIFICATE:
certificate_meta = json_data.get('meta')
self._validate_certificate_meta(certificate_meta, schema_name)
elif order_type == models.OrderType.ASYMMETRIC:
if order_type == models.OrderType.ASYMMETRIC:
asymmetric_meta = json_data.get('meta')
self._validate_asymmetric_meta(asymmetric_meta, schema_name)
@ -515,122 +510,6 @@ class TypeOrderValidator(ValidatorBase, CACommonHelpersMixin):
raise exception.MissingMetadataField(required=key)
return data
def _validate_certificate_meta(self, certificate_meta, schema_name):
"""Validation specific to meta for certificate type order."""
self._assert_validity(certificate_meta.get('payload') is None,
schema_name,
u._("'payload' not allowed "
"for certificate type order"), "meta")
if 'profile' in certificate_meta:
if 'ca_id' not in certificate_meta:
raise exception.MissingMetadataField(required='ca_id')
jump_table = {
'simple-cmc': self._validate_simple_cmc_request,
'full-cmc': self._validate_full_cmc_request,
'stored-key': self._validate_stored_key_request,
'custom': self._validate_custom_request
}
request_type = certificate_meta.get("request_type", "custom")
if request_type not in jump_table:
raise exception.InvalidCertificateRequestType(request_type)
jump_table[request_type](certificate_meta)
def _validate_simple_cmc_request(self, certificate_meta):
"""Validates simple CMC (which are PKCS10 requests)."""
request_data = self._get_required_metadata_value(
certificate_meta, "request_data")
self._validate_pkcs10_data(request_data)
def _validate_full_cmc_request(self, certificate_meta):
"""Validate full CMC request.
:param certificate_meta: request data from the order
:raises: FullCMCNotSupported
"""
raise exception.FullCMCNotSupported()
def _validate_stored_key_request(self, certificate_meta):
"""Validate stored-key cert request."""
self._get_required_metadata_value(
certificate_meta, "container_ref")
subject_dn = self._get_required_metadata_value(
certificate_meta, "subject_dn")
self._validate_subject_dn_data(subject_dn)
# container will be validated by validate_stored_key_rsa_container()
extensions = certificate_meta.get("extensions", None)
if extensions:
self._validate_extensions_data(extensions)
def _validate_custom_request(self, certificate_meta):
"""Validate custom data request
We cannot do any validation here because the request
parameters are custom. Validation will be done by the
plugin. We may choose to select the relevant plugin and
call the supports() method to raise validation errors.
"""
pass
def _validate_pkcs10_data(self, request_data):
"""Confirm that the request_data is valid base64 encoded PKCS#10.
Base64 decode the request, if it fails raise PayloadDecodingError.
Then parse data into the ASN.1 structure defined by PKCS10 and
verify the signing information.
If parsing of verifying fails, raise InvalidPKCS10Data.
"""
try:
csr_pem = base64.b64decode(request_data)
except Exception:
raise exception.PayloadDecodingError()
try:
csr = crypto.load_certificate_request(crypto.FILETYPE_PEM,
csr_pem)
except Exception:
reason = u._("Bad format")
raise exception.InvalidPKCS10Data(reason=reason)
try:
pubkey = csr.get_pubkey()
csr.verify(pubkey)
except Exception:
reason = u._("Signing key incorrect")
raise exception.InvalidPKCS10Data(reason=reason)
def _validate_full_cmc_data(self, request_data):
"""Confirm that request_data is valid Full CMC data."""
"""
TODO(alee-3) complete this function
Parse data into the ASN.1 structure defined for full CMC.
If parsing fails, raise InvalidCMCData
"""
pass
def _validate_extensions_data(self, extensions):
"""Confirm that the extensions data is valid.
:param extensions: base 64 encoded ASN.1 string of extension data
:raises: CertificateExtensionsNotSupported
"""
"""
TODO(alee-3) complete this function
Parse the extensions data into the correct ASN.1 structure.
If the parsing fails, throw InvalidExtensionsData.
For now, fail this validation because extensions parsing is not
supported.
"""
raise exception.CertificateExtensionsNotSupported()
def _validate_meta_parameters(self, meta, order_type, schema_name):
self._assert_validity(meta.get('algorithm'),
schema_name,

View File

@ -17,11 +17,9 @@ import datetime
import unittest
from oslo_serialization import base64
import testtools
from barbican.common import exception as excep
from barbican.common import validators
from barbican.tests import certificate_utils as certs
from barbican.tests import keys
from barbican.tests import utils
@ -1191,11 +1189,6 @@ class WhenTestingKeyTypeOrderValidator(utils.BaseTestCase):
self.validator = validators.TypeOrderValidator()
def test_should_pass_with_certificate_type_in_order_refs(self):
self.key_order_req['type'] = 'certificate'
result = self.validator.validate(self.key_order_req)
self.assertEqual('certificate', result['type'])
def test_should_pass_with_null_content_type_in_meta(self):
self.key_order_req['meta']['payload_content_type'] = None
result = self.validator.validate(self.key_order_req)
@ -1365,238 +1358,6 @@ class WhenTestingAsymmetricTypeOrderValidator(utils.BaseTestCase):
self.assertEqual("bit_length", exception.invalid_property)
class WhenTestingSimpleCMCOrderValidator(utils.BaseTestCase):
def setUp(self):
super(WhenTestingSimpleCMCOrderValidator, self).setUp()
self.type = 'certificate'
request_data = base64.encode_as_text(certs.create_good_csr())
self.meta = {'request_type': 'simple-cmc',
'request_data': request_data,
'requestor_name': 'Barbican User',
'requestor_email': 'barbican_user@example.com',
'requestor_phone': '555-1212'}
self._set_order()
self.validator = validators.TypeOrderValidator()
def _set_order(self):
self.order_req = {'type': self.type,
'meta': self.meta}
def test_should_pass_good_data(self):
self.validator.validate(self.order_req)
def test_should_raise_with_no_metadata(self):
self.order_req = {'type': self.type}
self.assertRaises(excep.InvalidObject,
self.validator.validate,
self.order_req)
def test_should_raise_with_bad_request_type(self):
self.meta['request_type'] = 'bad_request_type'
self._set_order()
self.assertRaises(excep.InvalidCertificateRequestType,
self.validator.validate,
self.order_req)
def test_should_raise_with_no_request_data(self):
del self.meta['request_data']
self._set_order()
self.assertRaises(excep.MissingMetadataField,
self.validator.validate,
self.order_req)
def test_should_raise_with_pkcs10_data_with_bad_base64(self):
self.meta['request_data'] = certs.create_bad_csr()
self._set_order()
self.assertRaises(excep.PayloadDecodingError,
self.validator.validate,
self.order_req)
def test_should_raise_with_bad_pkcs10_data(self):
request_data = base64.encode_as_text(certs.create_bad_csr())
self.meta['request_data'] = request_data
self._set_order()
self.assertRaises(excep.InvalidPKCS10Data,
self.validator.validate,
self.order_req)
def test_should_raise_with_signed_wrong_key_pkcs10_data(self):
self.meta['request_data'] = base64.encode_as_text(
certs.create_csr_signed_with_wrong_key())
self._set_order()
self.assertRaises(excep.InvalidPKCS10Data,
self.validator.validate,
self.order_req)
def test_should_raise_with_unsigned_pkcs10_data(self):
self.meta['request_data'] = base64.encode_as_text(
certs.create_csr_that_has_not_been_signed())
self._set_order()
self.assertRaises(excep.InvalidPKCS10Data,
self.validator.validate,
self.order_req)
def test_should_raise_with_payload_in_order(self):
self.meta['payload'] = 'payload'
self.assertRaises(excep.InvalidObject,
self.validator.validate,
self.order_req)
class WhenTestingFullCMCOrderValidator(utils.BaseTestCase):
def setUp(self):
super(WhenTestingFullCMCOrderValidator, self).setUp()
self.type = 'certificate'
self.meta = {'request_type': 'full-cmc',
'request_data': VALID_FULL_CMC,
'requestor_name': 'Barbican User',
'requestor_email': 'barbican_user@example.com',
'requestor_phone': '555-1212'}
self._set_order()
self.validator = validators.TypeOrderValidator()
def _set_order(self):
self.order_req = {'type': self.type,
'meta': self.meta}
def test_should_raise_not_yet_implemented(self):
self.assertRaises(excep.FullCMCNotSupported,
self.validator.validate,
self.order_req)
@testtools.skip("Feature not yet implemented")
def test_should_pass_good_data(self):
self.validator.validate(self.order_req)
@testtools.skip("Feature not yet implemented")
def test_should_raise_with_no_request_data(self):
del self.meta['request_data']
self._set_order()
self.assertRaises(excep.MissingMetadataField,
self.validator.validate,
self.order_req)
@testtools.skip("Not yet implemented")
def test_should_raise_with_bad_cmc_data(self):
self.meta['request_data'] = 'Bad CMC Data'
self._set_order()
self.assertRaises(excep.InvalidCMCData,
self.validator.validate,
self.order_req)
class WhenTestingCustomOrderValidator(utils.BaseTestCase):
def setUp(self):
super(WhenTestingCustomOrderValidator, self).setUp()
self.type = 'certificate'
self.meta = {'request_type': 'custom',
'ca_param_1': 'value_1',
'ca_param_2': 'value_2',
'requestor_name': 'Barbican User',
'requestor_email': 'barbican_user@example.com',
'requestor_phone': '555-1212'}
self._set_order()
self.validator = validators.TypeOrderValidator()
def _set_order(self):
self.order_req = {'type': self.type,
'meta': self.meta}
def test_should_pass_good_data(self):
self.validator.validate(self.order_req)
def test_should_pass_with_no_request_type(self):
# defaults to custom
del self.meta['request_type']
self._set_order()
self.validator.validate(self.order_req)
class WhenTestingStoredKeyOrderValidator(utils.BaseTestCase):
def setUp(self):
super(WhenTestingStoredKeyOrderValidator, self).setUp()
self.type = 'certificate'
self.meta = {'request_type': 'stored-key',
'container_ref':
'https://localhost/v1/containers/good_container_ref',
'subject_dn': 'cn=barbican-server,o=example.com',
'requestor_name': 'Barbican User',
'requestor_email': 'barbican_user@example.com',
'requestor_phone': '555-1212'}
self.order_req = {'type': self.type,
'meta': self.meta}
self.validator = validators.TypeOrderValidator()
def test_should_pass_good_data(self):
self.validator.validate(self.order_req)
def test_should_raise_with_no_container_ref(self):
del self.meta['container_ref']
self.assertRaises(excep.MissingMetadataField,
self.validator.validate,
self.order_req)
def test_should_raise_with_no_subject_dn(self):
del self.meta['subject_dn']
self.assertRaises(excep.MissingMetadataField,
self.validator.validate,
self.order_req)
def test_should_pass_with_profile_and_ca_id(self):
self.meta['ca_id'] = 'my_ca_id'
self.meta['profile'] = 'my_profile'
self.validator.validate(self.order_req)
def test_should_raise_with_profile_and_no_ca_id(self):
self.meta['profile'] = 'my_profile'
self.assertRaises(excep.MissingMetadataField,
self.validator.validate,
self.order_req)
def test_should_raise_with_extensions_data(self):
self.meta['extensions'] = VALID_EXTENSIONS
self.assertRaises(excep.CertificateExtensionsNotSupported,
self.validator.validate,
self.order_req)
@testtools.skip("Not yet implemented")
def test_should_raise_with_bad_extensions_data(self):
self.meta['extensions'] = 'Bad extensions data'
self.assertRaises(excep.InvalidExtensionsData,
self.validator.validate,
self.order_req)
def test_should_pass_with_one_cn_in_dn(self):
self.meta['subject_dn'] = "CN=example1"
self.validator.validate(self.order_req)
def test_should_pass_with_two_cn_in_dn(self):
self.meta['subject_dn'] = "CN=example1,CN=example2"
self.validator.validate(self.order_req)
def test_should_raise_with_blank_dn(self):
self.meta['subject_dn'] = ""
self.assertRaises(excep.InvalidSubjectDN,
self.validator.validate,
self.order_req)
def test_should_raise_with_bad_subject_dn(self):
self.meta['subject_dn'] = "Bad subject DN data"
self.assertRaises(excep.InvalidSubjectDN,
self.validator.validate,
self.order_req)
def test_should_raise_with_payload_in_order(self):
self.meta['payload'] = 'payload'
self.assertRaises(excep.InvalidObject,
self.validator.validate,
self.order_req)
@utils.parameterized_test_case
class WhenTestingAclValidator(utils.BaseTestCase):
def setUp(self):

View File

@ -2,11 +2,6 @@
Orders API - Reference
**********************
.. warning::
DEPRECATION WARNING: The Certificates Order resource has been deprecated
and will be removed in the P release.
.. _get_orders:
GET /v1/orders
@ -144,7 +139,7 @@ Parameters
| Attribute Name | Type | Description | Default |
+============================+=========+==============================================+============+
| type | string | The type of key to be generated. Valid types | None |
| | | are key, asymmetric, and certificate | |
| | | are key and asymmetric | |
+----------------------------+---------+----------------------------------------------+------------+
| meta | | Dictionary containing the secret metadata | None |
| | dict | used to generate the secret. | |

View File

@ -95,25 +95,6 @@ def get_order_rsa_container_with_passphrase():
"mode": "cbc"}}
def get_order_certificate(container_ref):
return {'type': 'certificate',
'meta': {'request_type': 'stored-key',
'container_ref': container_ref,
'subject_dn': 'cn=server.example.com,o=example.com',
'requestor_name': 'Barbican User',
'requestor_email': 'user@example.com',
'requestor_phone': '555-1212'}}
def get_order_certificate_simple_cmc(csr):
return {'type': 'certificate',
'meta': {'request_type': 'simple-cmc',
'requestor_name': 'Barbican User',
'requestor_email': 'user@example.com',
'requestor_phone': '555-1212',
'request_data': csr}}
@utils.parameterized_test_case
class RSATestCase(base.TestCase):
"""Positive test cases for all ways of working with RSA keys
@ -267,70 +248,6 @@ class RSATestCase(base.TestCase):
secrets = self.get_container(container_ref)
self.verify_container_keys_equal(secrets)
@testcase.attr('positive')
@testtools.skipIf(utils.is_pkcs11_enabled(),
"PKCS11 does not support this operation")
def test_rsa_order_certificate_from_ordered_container(self):
"""Post an order for a certificate"""
order_ref = self.order_container()
container_ref = self.get_container_order(order_ref)
secrets = self.get_container(container_ref)
self.verify_container_keys_valid(secrets)
order_ref = self.order_certificate(container_ref)
order_status = self.get_certificate_order(order_ref)
self.verify_certificate_order_status(order_status)
@testcase.attr('positive')
@testtools.skipIf(utils.is_kmip_enabled() or utils.is_vault_enabled()
or utils.is_pkcs11_enabled(),
"PyKMIP does not support this operation")
def test_rsa_order_certificate_from_ordered_container_with_pass(self):
"""Post an order for a certificate"""
order_ref = self.order_container(with_passphrase=True)
container_ref = self.get_container_order(order_ref)
secrets = self.get_container(container_ref)
self.verify_container_keys_valid(secrets, with_passphrase=True)
order_ref = self.order_certificate(container_ref)
order_status = self.get_certificate_order(order_ref)
self.verify_certificate_order_status(order_status)
@testcase.attr('positive')
def test_rsa_order_certificate_from_stored_container(self):
"""Post an order for a certificate"""
public_ref = self.create_public_key()
self.update_public_key(public_ref)
private_ref = self.create_private_key()
self.update_private_key(private_ref)
container_ref = self.store_container(public_ref, private_ref)
secrets = self.get_container(container_ref)
self.verify_container_keys_equal(secrets)
order_ref = self.order_certificate(container_ref)
order_status = self.get_certificate_order(order_ref)
self.verify_certificate_order_status(order_status)
@testcase.attr('positive')
@testtools.skipIf(utils.is_kmip_enabled(),
"PyKMIP does not support this operation")
def test_rsa_order_certificate_from_stored_container_with_pass(self):
"""Post an order for a certificate"""
public_ref = self.store_public_key()
private_ref = self.store_encrypted_private_key()
phrase_ref = self.store_passphrase()
container_ref = self.store_container(
public_ref, private_ref, phrase_ref)
secrets = self.get_container(container_ref)
self.verify_container_keys_equal(secrets, with_passphrase=True)
order_ref = self.order_certificate(container_ref)
order_status = self.get_certificate_order(order_ref)
self.verify_certificate_order_status(order_status)
@testcase.attr('positive')
def test_rsa_order_certificate_from_csr(self):
"""Post an order for a certificate"""
order_ref = self.order_certificate_from_csr()
order_status = self.get_certificate_order(order_ref)
self.verify_certificate_order_status(order_status)
# ----------------------- Helper Functions ---------------------------
def store_private_key(self):
pem = keys.get_private_key_pem()
@ -583,29 +500,3 @@ class RSATestCase(base.TestCase):
resp = self.order_behaviors.get_order(order_ref)
self.assertEqual(200, resp.status_code)
return resp.model.container_ref
def order_certificate(self, container_ref):
test_model = order_models.OrderModel(
**get_order_certificate(container_ref))
resp, order_ref = self.order_behaviors.create_order(test_model)
self.assertEqual(202, resp.status_code)
return order_ref
def get_certificate_order(self, order_ref):
resp = self.order_behaviors.get_order(order_ref)
self.assertEqual(200, resp.status_code)
order_status = (resp.model.status,
resp.model.sub_status)
return order_status
def verify_certificate_order_status(self, order_status):
self.assertEqual(("PENDING", "cert_request_pending"),
order_status)
def order_certificate_from_csr(self):
csr = keys.get_csr_pem()
test_model = order_models.OrderModel(
**get_order_certificate_simple_cmc(base64.b64encode(csr)))
resp, order_ref = self.order_behaviors.create_order(test_model)
self.assertEqual(202, resp.status_code)
return order_ref

View File

@ -0,0 +1,5 @@
---
upgrade:
- |
The deprecated certificate order resource was removed. Because of this,
create order API no longer accepts ``certificate`` type.