Replace CRLF by LF

... because LF is now commonly used as newline code.

Change-Id: Iaebf9330fb1305d9314d9fec05f9923efd7e61c6
This commit is contained in:
Takashi Kajinami 2024-01-27 12:40:51 +09:00
parent 31176777c7
commit def6216576
16 changed files with 2099 additions and 2099 deletions

View File

@ -1,6 +1,6 @@
---
features:
- Support Tacker service to obtain an OAuth 2.0 access token from an
external authorization server, and then use the access token to access
related OpenStack services that uses the external_oauth2_token filter
provided by the keystone middleware for permission authentication.
---
features:
- Support Tacker service to obtain an OAuth 2.0 access token from an
external authorization server, and then use the access token to access
related OpenStack services that uses the external_oauth2_token filter
provided by the keystone middleware for permission authentication.

View File

@ -1,11 +1,11 @@
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: oidc-cluster-admin-binding
roleRef:
kind: ClusterRole
name: cluster-admin
apiGroup: rbac.authorization.k8s.io
subjects:
- kind: User
name: end-user
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: oidc-cluster-admin-binding
roleRef:
kind: ClusterRole
name: cluster-admin
apiGroup: rbac.authorization.k8s.io
subjects:
- kind: User
name: end-user

View File

@ -1,12 +1,12 @@
[req]
req_extensions = v3_req
distinguished_name = req_distinguished_name
[req_distinguished_name]
[ v3_req ]
basicConstraints = CA:FALSE
keyUsage = nonRepudiation, digitalSignature, keyEncipherment
subjectAltName = @alt_names
[alt_names]
[req]
req_extensions = v3_req
distinguished_name = req_distinguished_name
[req_distinguished_name]
[ v3_req ]
basicConstraints = CA:FALSE
keyUsage = nonRepudiation, digitalSignature, keyEncipherment
subjectAltName = @alt_names
[alt_names]
IP.1 = 127.0.0.1

View File

@ -1,35 +1,35 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import tacker.vnfm.lcm_user_data.utils as UserDataUtil
from tacker.vnfm.lcm_user_data.abstract_user_data import AbstractUserData
class SampleUserData(AbstractUserData):
@staticmethod
def instantiate(base_hot_dict=None,
vnfd_dict=None,
inst_req_info=None,
grant_info=None):
api_param = UserDataUtil.get_diff_base_hot_param_from_api(
base_hot_dict, inst_req_info)
initial_param_dict = \
UserDataUtil.create_initial_param_server_port_dict(base_hot_dict)
vdu_flavor_dict = \
UserDataUtil.create_vdu_flavor_capability_name_dict(vnfd_dict)
vdu_image_dict = UserDataUtil.create_sw_image_dict(vnfd_dict)
cpd_vl_dict = UserDataUtil.create_network_dict(inst_req_info,
initial_param_dict)
final_param_dict = UserDataUtil.create_final_param_dict(
initial_param_dict, vdu_flavor_dict, vdu_image_dict, cpd_vl_dict)
return {**final_param_dict, **api_param}
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import tacker.vnfm.lcm_user_data.utils as UserDataUtil
from tacker.vnfm.lcm_user_data.abstract_user_data import AbstractUserData
class SampleUserData(AbstractUserData):
@staticmethod
def instantiate(base_hot_dict=None,
vnfd_dict=None,
inst_req_info=None,
grant_info=None):
api_param = UserDataUtil.get_diff_base_hot_param_from_api(
base_hot_dict, inst_req_info)
initial_param_dict = \
UserDataUtil.create_initial_param_server_port_dict(base_hot_dict)
vdu_flavor_dict = \
UserDataUtil.create_vdu_flavor_capability_name_dict(vnfd_dict)
vdu_image_dict = UserDataUtil.create_sw_image_dict(vnfd_dict)
cpd_vl_dict = UserDataUtil.create_network_dict(inst_req_info,
initial_param_dict)
final_param_dict = UserDataUtil.create_final_param_dict(
initial_param_dict, vdu_flavor_dict, vdu_image_dict, cpd_vl_dict)
return {**final_param_dict, **api_param}

View File

@ -1,72 +1,72 @@
tosca_definitions_version: tosca_simple_yaml_1_2
description: VNF template definition
imports:
- etsi_nfv_sol001_common_types.yaml
- etsi_nfv_sol001_vnfd_types.yaml
node_types:
Sample.VNF.Node:
derived_from: tosca.nodes.nfv.VNF
properties:
descriptor_id:
type: string
default: '3b3c61e4-26b6-4686-80fc-e9ff83010c08'
descriptor_version:
type: string
constraints: [ valid_values: [ '1.0' ] ]
default: '1.0'
provider:
type: string
constraints: [ valid_values: [ Sample ] ]
default: Sample
product_name:
type: string
constraints: [ valid_values: [ Node ] ]
default: Node
software_version:
type: string
constraints: [ valid_values: [ '10.1' ] ]
default: '10.1'
vnfm_info:
type: list
entry_schema:
type: string
constraints: [ valid_values: [ Tacker ] ]
default: [ Tacker ]
flavour_id:
type: string
constraints: [ valid_values: [ ha, scalable ] ]
default: ha
flavour_description:
type: string
default: 'vnf'
requirements:
- VNF0_extnet0:
capability: tosca.capabilities.nfv.VirtualLinkable
relationship: tosca.relationships.nfv.VirtualLinksTo
occurrences: [ 0, 1 ]
- VNF1_extnet0:
capability: tosca.capabilities.nfv.VirtualLinkable
relationship: tosca.relationships.nfv.VirtualLinksTo
occurrences: [ 0, 1 ]
interfaces:
Vnflcm:
type: tosca.interfaces.nfv.Vnflcm
instantiate: []
instantiate_start: []
instantiate_end: []
scale: []
scale_start: []
scale_end: []
heal: []
heal_start: []
heal_end: []
terminate: []
terminate_start: []
terminate_end: []
modify_information: []
modify_information_start: []
modify_information_end: []
tosca_definitions_version: tosca_simple_yaml_1_2
description: VNF template definition
imports:
- etsi_nfv_sol001_common_types.yaml
- etsi_nfv_sol001_vnfd_types.yaml
node_types:
Sample.VNF.Node:
derived_from: tosca.nodes.nfv.VNF
properties:
descriptor_id:
type: string
default: '3b3c61e4-26b6-4686-80fc-e9ff83010c08'
descriptor_version:
type: string
constraints: [ valid_values: [ '1.0' ] ]
default: '1.0'
provider:
type: string
constraints: [ valid_values: [ Sample ] ]
default: Sample
product_name:
type: string
constraints: [ valid_values: [ Node ] ]
default: Node
software_version:
type: string
constraints: [ valid_values: [ '10.1' ] ]
default: '10.1'
vnfm_info:
type: list
entry_schema:
type: string
constraints: [ valid_values: [ Tacker ] ]
default: [ Tacker ]
flavour_id:
type: string
constraints: [ valid_values: [ ha, scalable ] ]
default: ha
flavour_description:
type: string
default: 'vnf'
requirements:
- VNF0_extnet0:
capability: tosca.capabilities.nfv.VirtualLinkable
relationship: tosca.relationships.nfv.VirtualLinksTo
occurrences: [ 0, 1 ]
- VNF1_extnet0:
capability: tosca.capabilities.nfv.VirtualLinkable
relationship: tosca.relationships.nfv.VirtualLinksTo
occurrences: [ 0, 1 ]
interfaces:
Vnflcm:
type: tosca.interfaces.nfv.Vnflcm
instantiate: []
instantiate_start: []
instantiate_end: []
scale: []
scale_start: []
scale_end: []
heal: []
heal_start: []
heal_end: []
terminate: []
terminate_start: []
terminate_end: []
modify_information: []
modify_information_start: []
modify_information_end: []

View File

@ -1,31 +1,31 @@
tosca_definitions_version: tosca_simple_yaml_1_2
description: VNF definitions
imports:
- etsi_nfv_sol001_common_types.yaml
- etsi_nfv_sol001_vnfd_types.yaml
- Common.yaml
- df_ha.yaml
- df_scalable.yaml
topology_template:
inputs:
selected_flavour:
type: string
description: VNF deployment flavour selected by the consumer. Itis provided in the API.
node_templates:
VNF:
type: Sample.VNF.Node
properties:
flavour_id: { get_input: selected_flavour }
flavour_description: 'vnf'
descriptor_id: 75aaa9fa-9c79-dcf5-bda2-5b98a08c9f54
provider: Sample
product_name: Node
software_version: '10.1'
descriptor_version: '1.0'
vnfm_info:
- Tacker
requirements:
tosca_definitions_version: tosca_simple_yaml_1_2
description: VNF definitions
imports:
- etsi_nfv_sol001_common_types.yaml
- etsi_nfv_sol001_vnfd_types.yaml
- Common.yaml
- df_ha.yaml
- df_scalable.yaml
topology_template:
inputs:
selected_flavour:
type: string
description: VNF deployment flavour selected by the consumer. Itis provided in the API.
node_templates:
VNF:
type: Sample.VNF.Node
properties:
flavour_id: { get_input: selected_flavour }
flavour_description: 'vnf'
descriptor_id: 75aaa9fa-9c79-dcf5-bda2-5b98a08c9f54
provider: Sample
product_name: Node
software_version: '10.1'
descriptor_version: '1.0'
vnfm_info:
- Tacker
requirements:

View File

@ -1,232 +1,232 @@
tosca_definitions_version: tosca_simple_yaml_1_2
description: Sample VNF ha DF
imports:
- etsi_nfv_sol001_common_types.yaml
- etsi_nfv_sol001_vnfd_types.yaml
- Common.yaml
topology_template:
inputs:
descriptor_id:
type: string
descriptor_version:
type: string
provider:
type: string
product_name:
type: string
software_version:
type: string
vnfm_info:
type: list
entry_schema:
type: string
flavour_id:
type: string
flavour_description:
type: string
configurable_properties:
type: map
substitution_mappings:
node_type: Sample.VNF.Node
properties:
flavour_id: ha
requirements:
VDU0_extnet: [ VDU0_extCP0, external_virtual_link ]
VDU1_extnet: [ VDU1_extCP0, external_virtual_link ]
VDU_extnet: [ VDU_extvCP, external_virtual_link ]
RT_extnet: [ RT_extCP, external_virtual_link ]
node_templates:
VNF:
type: Sample.VNF.Node
properties:
flavour_description: 'ha'
configurable_properties:
is_autoscale_enabled: false
is_autoheal_enabled: false
vnfm_info:
- Tacker
interfaces:
Vnflcm:
instantiate: []
instantiate_start: []
instantiate_end: []
scale: []
scale_start: []
scale_end: []
heal: []
heal_start: []
heal_end: []
terminate: []
terminate_start: []
terminate_end: []
modify_information: []
modify_information_start: []
modify_information_end: []
VDU_0:
type: tosca.nodes.nfv.Vdu.Compute
properties:
name: VDU_0
description: VDU_0
vdu_profile:
min_number_of_instances: 1
max_number_of_instances: 1
sw_image_data:
name: sample_image
version: '1.0'
checksum:
algorithm: sha-512
hash: 6513f21e44aa3da349f248188a44bc304a3653a04122d8fb4535423c8e1d14cd6a153f735bb0982e2161b5b5186106570c17a9e58b64dd39390617cd5a350f78
container_format: bare
disk_format: qcow2
min_disk: 0 GB
size: 1869 MB
capabilities:
virtual_compute:
properties:
requested_additional_capabilities:
properties:
requested_additional_capability_name: sample_flavor
support_mandatory: true
target_performance_parameters:
entry_schema: test
virtual_memory:
virtual_mem_size: 512 MB
virtual_cpu:
num_virtual_cpu: 1
virtual_local_storage:
- size_of_storage: 1 GB
VDU_1:
type: tosca.nodes.nfv.Vdu.Compute
properties:
name: VDU_1
description: VDU_1
vdu_profile:
min_number_of_instances: 1
max_number_of_instances: 1
sw_image_data:
name: sample_image
version: '1.0'
checksum:
algorithm: sha-512
hash: 6513f21e44aa3da349f248188a44bc304a3653a04122d8fb4535423c8e1d14cd6a153f735bb0982e2161b5b5186106570c17a9e58b64dd39390617cd5a350f78
container_format: bare
disk_format: qcow2
min_disk: 0 GB
size: 1869 MB
capabilities:
virtual_compute:
properties:
requested_additional_capabilities:
properties:
requested_additional_capability_name: sample_flavor
support_mandatory: true
target_performance_parameters:
entry_schema: test
virtual_memory:
virtual_mem_size: 512 MB
virtual_cpu:
num_virtual_cpu: 1
virtual_local_storage:
- size_of_storage: 1 GB
VDU0_CP0:
type: tosca.nodes.nfv.VduCp
properties:
order: 0
bitrate_requirement: 1
vnic_type: normal
layer_protocols: [ ipv4 ]
protocol:
- associated_layer_protocol: ipv4
address_data:
- address_type: ip_address
l3_address_data:
ip_address_assignment: true
floating_ip_activated: false
requirements:
- virtual_binding: VDU_0
- virtual_link: VDU_intnet0
VDU1_CP0:
type: tosca.nodes.nfv.VduCp
properties:
order: 0
bitrate_requirement: 1
vnic_type: normal
layer_protocols: [ ipv4 ]
protocol:
- associated_layer_protocol: ipv4
address_data:
- address_type: ip_address
l3_address_data:
ip_address_assignment: true
floating_ip_activated: false
requirements:
- virtual_binding: VDU_1
- virtual_link: VDU_intnet0
VDU0_extCP0:
type: tosca.nodes.nfv.VnfExtCp
properties:
layer_protocols: [ ipv4 ]
requirements:
- internal_virtual_link: VDU_intnet0
VDU1_extCP0:
type: tosca.nodes.nfv.VnfExtCp
properties:
layer_protocols: [ ipv4 ]
requirements:
- internal_virtual_link: VDU_intnet0
VDU_intnet0:
type: tosca.nodes.nfv.VnfVirtualLink
properties:
connectivity_type:
layer_protocols: [ ipv4 ]
vl_profile:
max_bitrate_requirements:
root: 1000000
min_bitrate_requirements:
root: 100000
virtual_link_protocol_data:
- associated_layer_protocol: ipv4
l2_protocol_data:
network_type: vxlan
l3_protocol_data:
ip_version: ipv4
cidr: '192.168.0.0/24'
dhcp_enabled: true
RT_extCP:
type: tosca.nodes.nfv.VnfExtCp
properties:
layer_protocols: [ ipv4 ]
requirements:
- internal_virtual_link: VDU_intnet0
VDU_extvCP:
type: tosca.nodes.nfv.VnfExtCp
properties:
layer_protocols: [ ipv4 ]
requirements:
- internal_virtual_link: VDU_intnet0
groups:
VDU_AntiAffinityGroup:
type: tosca.groups.nfv.PlacementGroup
members: [ VDU_0, VDU_1 ]
policies:
- VDU_placement_policy:
type: tosca.policies.nfv.AntiAffinityRule
targets: [ VDU_AntiAffinityGroup ]
properties:
scope: nfvi_node
tosca_definitions_version: tosca_simple_yaml_1_2
description: Sample VNF ha DF
imports:
- etsi_nfv_sol001_common_types.yaml
- etsi_nfv_sol001_vnfd_types.yaml
- Common.yaml
topology_template:
inputs:
descriptor_id:
type: string
descriptor_version:
type: string
provider:
type: string
product_name:
type: string
software_version:
type: string
vnfm_info:
type: list
entry_schema:
type: string
flavour_id:
type: string
flavour_description:
type: string
configurable_properties:
type: map
substitution_mappings:
node_type: Sample.VNF.Node
properties:
flavour_id: ha
requirements:
VDU0_extnet: [ VDU0_extCP0, external_virtual_link ]
VDU1_extnet: [ VDU1_extCP0, external_virtual_link ]
VDU_extnet: [ VDU_extvCP, external_virtual_link ]
RT_extnet: [ RT_extCP, external_virtual_link ]
node_templates:
VNF:
type: Sample.VNF.Node
properties:
flavour_description: 'ha'
configurable_properties:
is_autoscale_enabled: false
is_autoheal_enabled: false
vnfm_info:
- Tacker
interfaces:
Vnflcm:
instantiate: []
instantiate_start: []
instantiate_end: []
scale: []
scale_start: []
scale_end: []
heal: []
heal_start: []
heal_end: []
terminate: []
terminate_start: []
terminate_end: []
modify_information: []
modify_information_start: []
modify_information_end: []
VDU_0:
type: tosca.nodes.nfv.Vdu.Compute
properties:
name: VDU_0
description: VDU_0
vdu_profile:
min_number_of_instances: 1
max_number_of_instances: 1
sw_image_data:
name: sample_image
version: '1.0'
checksum:
algorithm: sha-512
hash: 6513f21e44aa3da349f248188a44bc304a3653a04122d8fb4535423c8e1d14cd6a153f735bb0982e2161b5b5186106570c17a9e58b64dd39390617cd5a350f78
container_format: bare
disk_format: qcow2
min_disk: 0 GB
size: 1869 MB
capabilities:
virtual_compute:
properties:
requested_additional_capabilities:
properties:
requested_additional_capability_name: sample_flavor
support_mandatory: true
target_performance_parameters:
entry_schema: test
virtual_memory:
virtual_mem_size: 512 MB
virtual_cpu:
num_virtual_cpu: 1
virtual_local_storage:
- size_of_storage: 1 GB
VDU_1:
type: tosca.nodes.nfv.Vdu.Compute
properties:
name: VDU_1
description: VDU_1
vdu_profile:
min_number_of_instances: 1
max_number_of_instances: 1
sw_image_data:
name: sample_image
version: '1.0'
checksum:
algorithm: sha-512
hash: 6513f21e44aa3da349f248188a44bc304a3653a04122d8fb4535423c8e1d14cd6a153f735bb0982e2161b5b5186106570c17a9e58b64dd39390617cd5a350f78
container_format: bare
disk_format: qcow2
min_disk: 0 GB
size: 1869 MB
capabilities:
virtual_compute:
properties:
requested_additional_capabilities:
properties:
requested_additional_capability_name: sample_flavor
support_mandatory: true
target_performance_parameters:
entry_schema: test
virtual_memory:
virtual_mem_size: 512 MB
virtual_cpu:
num_virtual_cpu: 1
virtual_local_storage:
- size_of_storage: 1 GB
VDU0_CP0:
type: tosca.nodes.nfv.VduCp
properties:
order: 0
bitrate_requirement: 1
vnic_type: normal
layer_protocols: [ ipv4 ]
protocol:
- associated_layer_protocol: ipv4
address_data:
- address_type: ip_address
l3_address_data:
ip_address_assignment: true
floating_ip_activated: false
requirements:
- virtual_binding: VDU_0
- virtual_link: VDU_intnet0
VDU1_CP0:
type: tosca.nodes.nfv.VduCp
properties:
order: 0
bitrate_requirement: 1
vnic_type: normal
layer_protocols: [ ipv4 ]
protocol:
- associated_layer_protocol: ipv4
address_data:
- address_type: ip_address
l3_address_data:
ip_address_assignment: true
floating_ip_activated: false
requirements:
- virtual_binding: VDU_1
- virtual_link: VDU_intnet0
VDU0_extCP0:
type: tosca.nodes.nfv.VnfExtCp
properties:
layer_protocols: [ ipv4 ]
requirements:
- internal_virtual_link: VDU_intnet0
VDU1_extCP0:
type: tosca.nodes.nfv.VnfExtCp
properties:
layer_protocols: [ ipv4 ]
requirements:
- internal_virtual_link: VDU_intnet0
VDU_intnet0:
type: tosca.nodes.nfv.VnfVirtualLink
properties:
connectivity_type:
layer_protocols: [ ipv4 ]
vl_profile:
max_bitrate_requirements:
root: 1000000
min_bitrate_requirements:
root: 100000
virtual_link_protocol_data:
- associated_layer_protocol: ipv4
l2_protocol_data:
network_type: vxlan
l3_protocol_data:
ip_version: ipv4
cidr: '192.168.0.0/24'
dhcp_enabled: true
RT_extCP:
type: tosca.nodes.nfv.VnfExtCp
properties:
layer_protocols: [ ipv4 ]
requirements:
- internal_virtual_link: VDU_intnet0
VDU_extvCP:
type: tosca.nodes.nfv.VnfExtCp
properties:
layer_protocols: [ ipv4 ]
requirements:
- internal_virtual_link: VDU_intnet0
groups:
VDU_AntiAffinityGroup:
type: tosca.groups.nfv.PlacementGroup
members: [ VDU_0, VDU_1 ]
policies:
- VDU_placement_policy:
type: tosca.policies.nfv.AntiAffinityRule
targets: [ VDU_AntiAffinityGroup ]
properties:
scope: nfvi_node

View File

@ -1,391 +1,391 @@
tosca_definitions_version: tosca_simple_yaml_1_2
description: Sample VNF default DF
imports:
- etsi_nfv_sol001_common_types.yaml
- etsi_nfv_sol001_vnfd_types.yaml
- Common.yaml
topology_template:
inputs:
descriptor_id:
type: string
descriptor_version:
type: string
provider:
type: string
product_name:
type: string
software_version:
type: string
vnfm_info:
type: list
entry_schema:
type: string
flavour_id:
type: string
flavour_description:
type: string
configurable_properties:
type: map
substitution_mappings:
node_type: Sample.VNF.Node
properties:
flavour_id: scalable
requirements:
VDU0_extnet: [ VDU0_CP1, external_virtual_link ]
VDU1_extnet: [ VDU1_CP1, external_virtual_link ]
VDU2_extnet: [ VDU2_CP1, external_virtual_link ]
node_templates:
VNF:
type: Sample.VNF.Node
properties:
flavour_description: 'scalable'
configurable_properties:
is_autoscale_enabled: false
is_autoheal_enabled: false
vnfm_info:
- Tacker
interfaces:
Vnflcm:
instantiate: []
instantiate_start: []
instantiate_end: []
scale: []
scale_start: []
scale_end: []
heal: []
heal_start: []
heal_end: []
terminate: []
terminate_start: []
terminate_end: []
modify_information: []
modify_information_start: []
modify_information_end: []
VDU_0:
type: tosca.nodes.nfv.Vdu.Compute
properties:
name: VDU_0
description: VDU_0
vdu_profile:
min_number_of_instances: 1
max_number_of_instances: 1
sw_image_data:
name: sample_image
version: '1.0'
checksum:
algorithm: sha-512
hash: 6513f21e44aa3da349f248188a44bc304a3653a04122d8fb4535423c8e1d14cd6a153f735bb0982e2161b5b5186106570c17a9e58b64dd39390617cd5a350f78
container_format: bare
disk_format: qcow2
min_disk: 0 GB
size: 1869 MB
capabilities:
virtual_compute:
properties:
requested_additional_capabilities:
properties:
requested_additional_capability_name: sample_flavor
support_mandatory: true
target_performance_parameters:
entry_schema: test
virtual_memory:
virtual_mem_size: 512 MB
virtual_cpu:
num_virtual_cpu: 1
virtual_local_storage:
- size_of_storage: 1 GB
VDU_1:
type: tosca.nodes.nfv.Vdu.Compute
properties:
name: VDU_1
description: VDU_1
vdu_profile:
min_number_of_instances: 1
max_number_of_instances: 1
sw_image_data:
name: sample_image
version: '1.0'
checksum:
algorithm: sha-512
hash: 6513f21e44aa3da349f248188a44bc304a3653a04122d8fb4535423c8e1d14cd6a153f735bb0982e2161b5b5186106570c17a9e58b64dd39390617cd5a350f78
container_format: bare
disk_format: qcow2
min_disk: 0 GB
size: 1869 MB
capabilities:
virtual_compute:
properties:
requested_additional_capabilities:
properties:
requested_additional_capability_name: sample_flavor
support_mandatory: true
target_performance_parameters:
entry_schema: test
virtual_memory:
virtual_mem_size: 512 MB
virtual_cpu:
num_virtual_cpu: 1
virtual_local_storage:
- size_of_storage: 1 GB
VDU_2:
type: tosca.nodes.nfv.Vdu.Compute
properties:
name: VDU_2
description: VDU_2
vdu_profile:
min_number_of_instances: 0
max_number_of_instances: 1
sw_image_data:
name: sample_image
version: '1.0'
checksum:
algorithm: sha-512
hash: 6513f21e44aa3da349f248188a44bc304a3653a04122d8fb4535423c8e1d14cd6a153f735bb0982e2161b5b5186106570c17a9e58b64dd39390617cd5a350f78
container_format: bare
disk_format: qcow2
min_disk: 0 GB
size: 1869 MB
capabilities:
virtual_compute:
properties:
requested_additional_capabilities:
properties:
requested_additional_capability_name: sample_flavor
support_mandatory: true
target_performance_parameters:
entry_schema: test
virtual_memory:
virtual_mem_size: 512 MB
virtual_cpu:
num_virtual_cpu: 1
virtual_local_storage:
- size_of_storage: 1 GB
VDU0_CP0:
type: tosca.nodes.nfv.VduCp
properties:
order: 0
bitrate_requirement: 1
vnic_type: normal
layer_protocols: [ ipv4 ]
protocol:
- associated_layer_protocol: ipv4
address_data:
- address_type: ip_address
l3_address_data:
ip_address_assignment: true
floating_ip_activated: false
requirements:
- virtual_binding: VDU_0
- virtual_link: int_net
VDU0_CP1:
type: tosca.nodes.nfv.VduCp
properties:
order: 0
bitrate_requirement: 1
vnic_type: normal
layer_protocols: [ ipv4 ]
protocol:
- associated_layer_protocol: ipv4
address_data:
- address_type: ip_address
l3_address_data:
ip_address_assignment: true
floating_ip_activated: false
requirements:
- virtual_binding: VDU_0
VDU1_CP0:
type: tosca.nodes.nfv.VduCp
properties:
order: 0
bitrate_requirement: 1
vnic_type: normal
layer_protocols: [ ipv4 ]
protocol:
- associated_layer_protocol: ipv4
address_data:
- address_type: ip_address
l3_address_data:
ip_address_assignment: true
floating_ip_activated: false
requirements:
- virtual_binding: VDU_1
- virtual_link: int_net
VDU1_CP1:
type: tosca.nodes.nfv.VduCp
properties:
order: 0
bitrate_requirement: 1
vnic_type: normal
layer_protocols: [ ipv4 ]
protocol:
- associated_layer_protocol: ipv4
address_data:
- address_type: ip_address
l3_address_data:
ip_address_assignment: true
floating_ip_activated: false
requirements:
- virtual_binding: VDU_1
VDU2_CP0:
type: tosca.nodes.nfv.VduCp
properties:
order: 0
bitrate_requirement: 1
vnic_type: normal
layer_protocols: [ ipv4 ]
protocol:
- associated_layer_protocol: ipv4
address_data:
- address_type: ip_address
l3_address_data:
ip_address_assignment: true
floating_ip_activated: false
requirements:
- virtual_binding: VDU_2
- virtual_link: int_net
VDU2_CP1:
type: tosca.nodes.nfv.VduCp
properties:
order: 0
bitrate_requirement: 1
vnic_type: normal
layer_protocols: [ ipv4 ]
protocol:
- associated_layer_protocol: ipv4
address_data:
- address_type: ip_address
l3_address_data:
ip_address_assignment: true
floating_ip_activated: false
requirements:
- virtual_binding: VDU_2
int_net:
type: tosca.nodes.nfv.VnfVirtualLink
properties:
connectivity_type:
layer_protocols: [ ipv4 ]
vl_profile:
max_bitrate_requirements:
root: 1000000
min_bitrate_requirements:
root: 100000
virtual_link_protocol_data:
- associated_layer_protocol: ipv4
l2_protocol_data:
network_type: vxlan
l3_protocol_data:
ip_version: ipv4
cidr: '192.168.1.0/24'
groups:
VDU_AntiAffinityGroup:
type: tosca.groups.nfv.PlacementGroup
members: [ VDU_0, VDU_1, VDU_2 ]
policies:
- VDU_placement_policy:
type: tosca.policies.nfv.AntiAffinityRule
targets: [ VDU_AntiAffinityGroup ]
properties:
scope: nfvi_node
- vdu_scale:
type: tosca.policies.nfv.ScalingAspects
properties:
aspects:
VDU_2:
name: VDU_2
description: VDU_2
max_scale_level: 1
step_deltas:
- delta_1
- vdu_0_initial_delta:
type: tosca.policies.nfv.VduInitialDelta
properties:
initial_delta:
number_of_instances: 1
targets: [ VDU_0 ]
- vdu_1_initial_delta:
type: tosca.policies.nfv.VduInitialDelta
properties:
initial_delta:
number_of_instances: 1
targets: [ VDU_1 ]
- vdu_2_initial_delta:
type: tosca.policies.nfv.VduInitialDelta
properties:
initial_delta:
number_of_instances: 0
targets: [ VDU_2 ]
- vdu_2_scaling_aspect_deltas:
type: tosca.policies.nfv.VduScalingAspectDeltas
properties:
aspect: VDU_2
deltas:
delta_1:
number_of_instances: 1
targets: [ VDU_2 ]
- instantiation_levels:
type: tosca.policies.nfv.InstantiationLevels
properties:
levels:
r-node-min:
description: vdu-min structure
scale_info:
VDU_2:
scale_level: 0
r-node-max:
description: vdu-max structure
scale_info:
VDU_2:
scale_level: 1
- vdu_0_instantiation_levels:
type: tosca.policies.nfv.VduInstantiationLevels
properties:
levels:
r-node-min:
number_of_instances: 1
r-node-max:
number_of_instances: 1
targets: [ VDU_0 ]
- vdu_1_instantiation_levels:
type: tosca.policies.nfv.VduInstantiationLevels
properties:
levels:
r-node-min:
number_of_instances: 1
r-node-max:
number_of_instances: 1
targets: [ VDU_1 ]
- vdu_2_instantiation_levels:
type: tosca.policies.nfv.VduInstantiationLevels
properties:
levels:
r-node-min:
number_of_instances: 0
r-node-max:
number_of_instances: 1
targets: [ VDU_2 ]
tosca_definitions_version: tosca_simple_yaml_1_2
description: Sample VNF default DF
imports:
- etsi_nfv_sol001_common_types.yaml
- etsi_nfv_sol001_vnfd_types.yaml
- Common.yaml
topology_template:
inputs:
descriptor_id:
type: string
descriptor_version:
type: string
provider:
type: string
product_name:
type: string
software_version:
type: string
vnfm_info:
type: list
entry_schema:
type: string
flavour_id:
type: string
flavour_description:
type: string
configurable_properties:
type: map
substitution_mappings:
node_type: Sample.VNF.Node
properties:
flavour_id: scalable
requirements:
VDU0_extnet: [ VDU0_CP1, external_virtual_link ]
VDU1_extnet: [ VDU1_CP1, external_virtual_link ]
VDU2_extnet: [ VDU2_CP1, external_virtual_link ]
node_templates:
VNF:
type: Sample.VNF.Node
properties:
flavour_description: 'scalable'
configurable_properties:
is_autoscale_enabled: false
is_autoheal_enabled: false
vnfm_info:
- Tacker
interfaces:
Vnflcm:
instantiate: []
instantiate_start: []
instantiate_end: []
scale: []
scale_start: []
scale_end: []
heal: []
heal_start: []
heal_end: []
terminate: []
terminate_start: []
terminate_end: []
modify_information: []
modify_information_start: []
modify_information_end: []
VDU_0:
type: tosca.nodes.nfv.Vdu.Compute
properties:
name: VDU_0
description: VDU_0
vdu_profile:
min_number_of_instances: 1
max_number_of_instances: 1
sw_image_data:
name: sample_image
version: '1.0'
checksum:
algorithm: sha-512
hash: 6513f21e44aa3da349f248188a44bc304a3653a04122d8fb4535423c8e1d14cd6a153f735bb0982e2161b5b5186106570c17a9e58b64dd39390617cd5a350f78
container_format: bare
disk_format: qcow2
min_disk: 0 GB
size: 1869 MB
capabilities:
virtual_compute:
properties:
requested_additional_capabilities:
properties:
requested_additional_capability_name: sample_flavor
support_mandatory: true
target_performance_parameters:
entry_schema: test
virtual_memory:
virtual_mem_size: 512 MB
virtual_cpu:
num_virtual_cpu: 1
virtual_local_storage:
- size_of_storage: 1 GB
VDU_1:
type: tosca.nodes.nfv.Vdu.Compute
properties:
name: VDU_1
description: VDU_1
vdu_profile:
min_number_of_instances: 1
max_number_of_instances: 1
sw_image_data:
name: sample_image
version: '1.0'
checksum:
algorithm: sha-512
hash: 6513f21e44aa3da349f248188a44bc304a3653a04122d8fb4535423c8e1d14cd6a153f735bb0982e2161b5b5186106570c17a9e58b64dd39390617cd5a350f78
container_format: bare
disk_format: qcow2
min_disk: 0 GB
size: 1869 MB
capabilities:
virtual_compute:
properties:
requested_additional_capabilities:
properties:
requested_additional_capability_name: sample_flavor
support_mandatory: true
target_performance_parameters:
entry_schema: test
virtual_memory:
virtual_mem_size: 512 MB
virtual_cpu:
num_virtual_cpu: 1
virtual_local_storage:
- size_of_storage: 1 GB
VDU_2:
type: tosca.nodes.nfv.Vdu.Compute
properties:
name: VDU_2
description: VDU_2
vdu_profile:
min_number_of_instances: 0
max_number_of_instances: 1
sw_image_data:
name: sample_image
version: '1.0'
checksum:
algorithm: sha-512
hash: 6513f21e44aa3da349f248188a44bc304a3653a04122d8fb4535423c8e1d14cd6a153f735bb0982e2161b5b5186106570c17a9e58b64dd39390617cd5a350f78
container_format: bare
disk_format: qcow2
min_disk: 0 GB
size: 1869 MB
capabilities:
virtual_compute:
properties:
requested_additional_capabilities:
properties:
requested_additional_capability_name: sample_flavor
support_mandatory: true
target_performance_parameters:
entry_schema: test
virtual_memory:
virtual_mem_size: 512 MB
virtual_cpu:
num_virtual_cpu: 1
virtual_local_storage:
- size_of_storage: 1 GB
VDU0_CP0:
type: tosca.nodes.nfv.VduCp
properties:
order: 0
bitrate_requirement: 1
vnic_type: normal
layer_protocols: [ ipv4 ]
protocol:
- associated_layer_protocol: ipv4
address_data:
- address_type: ip_address
l3_address_data:
ip_address_assignment: true
floating_ip_activated: false
requirements:
- virtual_binding: VDU_0
- virtual_link: int_net
VDU0_CP1:
type: tosca.nodes.nfv.VduCp
properties:
order: 0
bitrate_requirement: 1
vnic_type: normal
layer_protocols: [ ipv4 ]
protocol:
- associated_layer_protocol: ipv4
address_data:
- address_type: ip_address
l3_address_data:
ip_address_assignment: true
floating_ip_activated: false
requirements:
- virtual_binding: VDU_0
VDU1_CP0:
type: tosca.nodes.nfv.VduCp
properties:
order: 0
bitrate_requirement: 1
vnic_type: normal
layer_protocols: [ ipv4 ]
protocol:
- associated_layer_protocol: ipv4
address_data:
- address_type: ip_address
l3_address_data:
ip_address_assignment: true
floating_ip_activated: false
requirements:
- virtual_binding: VDU_1
- virtual_link: int_net
VDU1_CP1:
type: tosca.nodes.nfv.VduCp
properties:
order: 0
bitrate_requirement: 1
vnic_type: normal
layer_protocols: [ ipv4 ]
protocol:
- associated_layer_protocol: ipv4
address_data:
- address_type: ip_address
l3_address_data:
ip_address_assignment: true
floating_ip_activated: false
requirements:
- virtual_binding: VDU_1
VDU2_CP0:
type: tosca.nodes.nfv.VduCp
properties:
order: 0
bitrate_requirement: 1
vnic_type: normal
layer_protocols: [ ipv4 ]
protocol:
- associated_layer_protocol: ipv4
address_data:
- address_type: ip_address
l3_address_data:
ip_address_assignment: true
floating_ip_activated: false
requirements:
- virtual_binding: VDU_2
- virtual_link: int_net
VDU2_CP1:
type: tosca.nodes.nfv.VduCp
properties:
order: 0
bitrate_requirement: 1
vnic_type: normal
layer_protocols: [ ipv4 ]
protocol:
- associated_layer_protocol: ipv4
address_data:
- address_type: ip_address
l3_address_data:
ip_address_assignment: true
floating_ip_activated: false
requirements:
- virtual_binding: VDU_2
int_net:
type: tosca.nodes.nfv.VnfVirtualLink
properties:
connectivity_type:
layer_protocols: [ ipv4 ]
vl_profile:
max_bitrate_requirements:
root: 1000000
min_bitrate_requirements:
root: 100000
virtual_link_protocol_data:
- associated_layer_protocol: ipv4
l2_protocol_data:
network_type: vxlan
l3_protocol_data:
ip_version: ipv4
cidr: '192.168.1.0/24'
groups:
VDU_AntiAffinityGroup:
type: tosca.groups.nfv.PlacementGroup
members: [ VDU_0, VDU_1, VDU_2 ]
policies:
- VDU_placement_policy:
type: tosca.policies.nfv.AntiAffinityRule
targets: [ VDU_AntiAffinityGroup ]
properties:
scope: nfvi_node
- vdu_scale:
type: tosca.policies.nfv.ScalingAspects
properties:
aspects:
VDU_2:
name: VDU_2
description: VDU_2
max_scale_level: 1
step_deltas:
- delta_1
- vdu_0_initial_delta:
type: tosca.policies.nfv.VduInitialDelta
properties:
initial_delta:
number_of_instances: 1
targets: [ VDU_0 ]
- vdu_1_initial_delta:
type: tosca.policies.nfv.VduInitialDelta
properties:
initial_delta:
number_of_instances: 1
targets: [ VDU_1 ]
- vdu_2_initial_delta:
type: tosca.policies.nfv.VduInitialDelta
properties:
initial_delta:
number_of_instances: 0
targets: [ VDU_2 ]
- vdu_2_scaling_aspect_deltas:
type: tosca.policies.nfv.VduScalingAspectDeltas
properties:
aspect: VDU_2
deltas:
delta_1:
number_of_instances: 1
targets: [ VDU_2 ]
- instantiation_levels:
type: tosca.policies.nfv.InstantiationLevels
properties:
levels:
r-node-min:
description: vdu-min structure
scale_info:
VDU_2:
scale_level: 0
r-node-max:
description: vdu-max structure
scale_info:
VDU_2:
scale_level: 1
- vdu_0_instantiation_levels:
type: tosca.policies.nfv.VduInstantiationLevels
properties:
levels:
r-node-min:
number_of_instances: 1
r-node-max:
number_of_instances: 1
targets: [ VDU_0 ]
- vdu_1_instantiation_levels:
type: tosca.policies.nfv.VduInstantiationLevels
properties:
levels:
r-node-min:
number_of_instances: 1
r-node-max:
number_of_instances: 1
targets: [ VDU_1 ]
- vdu_2_instantiation_levels:
type: tosca.policies.nfv.VduInstantiationLevels
properties:
levels:
r-node-min:
number_of_instances: 0
r-node-max:
number_of_instances: 1
targets: [ VDU_2 ]

View File

@ -1,35 +1,35 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import tacker.vnfm.lcm_user_data.utils as UserDataUtil
from tacker.vnfm.lcm_user_data.abstract_user_data import AbstractUserData
class SampleUserData(AbstractUserData):
@staticmethod
def instantiate(base_hot_dict=None,
vnfd_dict=None,
inst_req_info=None,
grant_info=None):
api_param = UserDataUtil.get_diff_base_hot_param_from_api(
base_hot_dict, inst_req_info)
initial_param_dict = \
UserDataUtil.create_initial_param_server_port_dict(base_hot_dict)
vdu_flavor_dict = \
UserDataUtil.create_vdu_flavor_capability_name_dict(vnfd_dict)
vdu_image_dict = UserDataUtil.create_sw_image_dict(vnfd_dict)
cpd_vl_dict = UserDataUtil.create_network_dict(inst_req_info,
initial_param_dict)
final_param_dict = UserDataUtil.create_final_param_dict(
initial_param_dict, vdu_flavor_dict, vdu_image_dict, cpd_vl_dict)
return {**final_param_dict, **api_param}
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import tacker.vnfm.lcm_user_data.utils as UserDataUtil
from tacker.vnfm.lcm_user_data.abstract_user_data import AbstractUserData
class SampleUserData(AbstractUserData):
@staticmethod
def instantiate(base_hot_dict=None,
vnfd_dict=None,
inst_req_info=None,
grant_info=None):
api_param = UserDataUtil.get_diff_base_hot_param_from_api(
base_hot_dict, inst_req_info)
initial_param_dict = \
UserDataUtil.create_initial_param_server_port_dict(base_hot_dict)
vdu_flavor_dict = \
UserDataUtil.create_vdu_flavor_capability_name_dict(vnfd_dict)
vdu_image_dict = UserDataUtil.create_sw_image_dict(vnfd_dict)
cpd_vl_dict = UserDataUtil.create_network_dict(inst_req_info,
initial_param_dict)
final_param_dict = UserDataUtil.create_final_param_dict(
initial_param_dict, vdu_flavor_dict, vdu_image_dict, cpd_vl_dict)
return {**final_param_dict, **api_param}

View File

@ -1,370 +1,370 @@
# Copyright (C) 2023 Fujitsu
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Auth for External Server OAuth2.0 authentication
"""
import time
import uuid
import jwt.utils
from oslo_config import cfg
from oslo_log import log as logging
import requests.auth
from keystoneauth1 import exceptions as ksa_exceptions
from keystoneauth1.loading import session as session_loading
from tacker._i18n import _
from tacker.common.exceptions import TackerException
LOG = logging.getLogger(__name__)
_EXT_AUTH_CONFIG_GROUP_NAME = 'ext_oauth2_auth'
_EXTERNAL_AUTH2_OPTS = [
cfg.BoolOpt('use_ext_oauth2_auth', default=False,
help='Set True to use external Oauth2.0 auth server.'),
cfg.StrOpt('token_endpoint',
help='The endpoint for access token API.'),
cfg.StrOpt('scope',
help='The scope that the access token can access.'),
]
_EXTERNAL_AUTH2_KEYSTONE_MIDDLEWARE_OPTS = [
cfg.StrOpt('certfile',
help='Required if identity server requires client '
'certificate.'),
cfg.StrOpt('keyfile',
help='Required if identity server requires client '
'private key.'),
cfg.StrOpt('cafile',
help='A PEM encoded Certificate Authority to use when '
'verifying HTTPs connections. Defaults to system CAs.'),
cfg.BoolOpt('insecure', default=False, help='Verify HTTPS connections.'),
cfg.IntOpt('http_connect_timeout',
help='Request timeout value for communicating with Identity '
'API server.'),
cfg.StrOpt('audience',
help='The Audience should be the URL of the Authorization '
"Server's Token Endpoint. The Authorization Server will "
'verify that it is an intended audience for the token.'),
cfg.StrOpt('auth_method',
default='client_secret_basic',
choices=('client_secret_basic', 'client_secret_post',
'tls_client_auth', 'private_key_jwt',
'client_secret_jwt'),
help='The auth_method must use the authentication method '
'specified by the Authorization Server.'),
cfg.StrOpt('client_id',
help='The OAuth 2.0 Client Identifier valid at the '
'Authorization Server.'),
cfg.StrOpt('client_secret',
help='The OAuth 2.0 client secret. When the auth_method is '
'client_secret_basic, client_secret_post, or '
'client_secret_jwt, the value is used, and otherwise the '
'value is ignored.'),
cfg.StrOpt('jwt_key_file',
help='The jwt_key_file must use the certificate key file which '
'has been registered with the Authorization Server. '
'When the auth_method is private_key_jwt, the value is '
'used, and otherwise the value is ignored.'),
cfg.StrOpt('jwt_algorithm',
help='The jwt_algorithm must use the algorithm specified by '
'the Authorization Server. When the auth_method is '
'client_secret_jwt, this value is often set to HS256,'
'when the auth_method is private_key_jwt, the value is '
'often set to RS256, and otherwise the value is ignored.'),
cfg.IntOpt('jwt_bearer_time_out', default=3600,
help='This value is used to calculate the expiration time. If '
'after the expiration time, the access token cannot be '
'accepted. When the auth_method is client_secret_jwt or '
'private_key_jwt, the value is used, and otherwise the '
'value is ignored.'),
]
def config_opts():
return [(_EXT_AUTH_CONFIG_GROUP_NAME,
_EXTERNAL_AUTH2_OPTS + _EXTERNAL_AUTH2_KEYSTONE_MIDDLEWARE_OPTS)]
cfg.CONF.register_opts(_EXTERNAL_AUTH2_OPTS,
group=_EXT_AUTH_CONFIG_GROUP_NAME)
class ExtOAuth2Auth(object):
"""Construct an Auth to fetch an access token for HTTP access."""
def __init__(self):
self._conf = cfg.CONF.ext_oauth2_auth
# Check whether the configuration parameter has been registered
if 'auth_method' not in self._conf:
LOG.debug('The relevant config parameters are not registered '
'and need to be registered before they can be used.')
cfg.CONF.register_opts(_EXTERNAL_AUTH2_KEYSTONE_MIDDLEWARE_OPTS,
group=_EXT_AUTH_CONFIG_GROUP_NAME)
self.token_endpoint = self._get_config_option(
'token_endpoint', is_required=True)
self.auth_method = self._get_config_option(
'auth_method', is_required=True)
self.client_id = self._get_config_option(
'client_id', is_required=True)
self.scope = self._get_config_option(
'scope', is_required=True)
self.access_token = None
def _get_config_option(self, key, is_required):
"""Read the value from config file by the config key."""
try:
value = getattr(self._conf, key)
except cfg.NoSuchOptError:
value = None
if not value:
if is_required:
LOG.error('The value is required for option %s '
'in group [%s]' % (key,
_EXT_AUTH_CONFIG_GROUP_NAME))
raise TackerException(
_('Configuration error. The parameter '
'is not set for "%s" in group [%s].') % (
key, _EXT_AUTH_CONFIG_GROUP_NAME))
else:
return None
else:
return value
def create_session(self, **kwargs):
"""Create session for HTTP access."""
kwargs.setdefault('cert', self._get_config_option(
'certfile', is_required=False))
kwargs.setdefault('key', self._get_config_option(
'keyfile', is_required=False))
kwargs.setdefault('cacert', self._get_config_option(
'cafile', is_required=False))
kwargs.setdefault('insecure', self._get_config_option(
'insecure', is_required=False))
kwargs.setdefault('timeout', self._get_config_option(
'http_connect_timeout', is_required=False))
kwargs.setdefault('user_agent', 'tacker service')
sess = session_loading.Session().load_from_options(**kwargs)
sess.auth = self
return sess
def get_connection_params(self, session, **kwargs):
"""Get connection params for HTTP access."""
return {}
def invalidate(self):
"""Invalidate the current authentication data."""
self.access_token = None
return True
def _get_token_by_client_secret_basic(self, session):
"""Access the access token API.
Access the access token API to get an access token by
the auth method 'client_secret_basic'.
"""
para = {
'scope': self.scope,
'grant_type': 'client_credentials'
}
auth = requests.auth.HTTPBasicAuth(
self.client_id, self._get_config_option(
'client_secret', is_required=True))
http_response = session.request(
self.token_endpoint,
'POST',
authenticated=False,
data=para,
requests_auth=auth)
return http_response
def _get_token_by_client_secret_post(self, session):
"""Access the access token API.
Access the access token API to get an access token by
the auth method 'client_secret_post'.
"""
para = {
'client_id': self.client_id,
'client_secret': self._get_config_option(
'client_secret', is_required=True),
'scope': self.scope,
'grant_type': 'client_credentials'
}
http_response = session.request(
self.token_endpoint,
'POST',
authenticated=False,
data=para)
return http_response
def _get_token_by_tls_client_auth(self, session):
"""Access the access token API.
Access the access token API to get an access token by
the auth method 'tls_client_auth'.
"""
para = {
'client_id': self.client_id,
'scope': self.scope,
'grant_type': 'client_credentials'
}
http_response = session.request(
self.token_endpoint,
'POST',
authenticated=False,
data=para)
return http_response
def _get_token_by_private_key_jwt(self, session):
"""Access the access token API.
Access the access token API to get an access token by
the auth method 'private_key_jwt'.
"""
jwt_key_file = self._get_config_option(
'jwt_key_file', is_required=True)
with open(jwt_key_file, 'r') as jwt_file:
jwt_key = jwt_file.read()
ita = round(time.time())
exp = ita + self._get_config_option(
'jwt_bearer_time_out', is_required=True)
alg = self._get_config_option('jwt_algorithm', is_required=True)
client_assertion = jwt.encode(
payload={
'jti': str(uuid.uuid4()),
'iat': str(ita),
'exp': str(exp),
'iss': self.client_id,
'sub': self.client_id,
'aud': self._get_config_option('audience', is_required=True)},
headers={
'typ': 'JWT',
'alg': alg},
key=jwt_key,
algorithm=alg)
para = {
'client_id': self.client_id,
'client_assertion_type':
'urn:ietf:params:oauth:client-assertion-type:jwt-bearer',
'client_assertion': client_assertion,
'scope': self.scope,
'grant_type': 'client_credentials'
}
http_response = session.request(
self.token_endpoint,
'POST',
authenticated=False,
data=para)
return http_response
def _get_token_by_client_secret_jwt(self, session):
"""Access the access token API.
Access the access token API to get an access token by
the auth method 'client_secret_jwt'.
"""
ita = round(time.time())
exp = ita + self._get_config_option(
'jwt_bearer_time_out', is_required=True)
alg = self._get_config_option('jwt_algorithm', is_required=True)
client_secret = self._get_config_option(
'client_secret', is_required=True)
client_assertion = jwt.encode(
payload={
'jti': str(uuid.uuid4()),
'iat': str(ita),
'exp': str(exp),
'iss': self.client_id,
'sub': self.client_id,
'aud': self._get_config_option('audience', is_required=True)},
headers={
'typ': 'JWT',
'alg': alg},
key=client_secret,
algorithm=alg)
para = {
'client_id': self.client_id,
'client_secret': client_secret,
'client_assertion_type':
'urn:ietf:params:oauth:client-assertion-type:jwt-bearer',
'client_assertion': client_assertion,
'scope': self.scope,
'grant_type': 'client_credentials'
}
http_response = session.request(
self.token_endpoint,
'POST',
authenticated=False,
data=para)
return http_response
def get_headers(self, session, **kwargs):
"""Get an access token and add to request header for HTTP access."""
if not self.access_token:
try:
if self.auth_method == 'tls_client_auth':
http_response = self._get_token_by_tls_client_auth(session)
elif self.auth_method == 'client_secret_post':
http_response = self._get_token_by_client_secret_post(
session)
elif self.auth_method == 'client_secret_basic':
http_response = self._get_token_by_client_secret_basic(
session)
elif self.auth_method == 'private_key_jwt':
http_response = self._get_token_by_private_key_jwt(
session)
elif self.auth_method == 'client_secret_jwt':
http_response = self._get_token_by_client_secret_jwt(
session)
else:
LOG.error('The value is incorrect for option '
'auth_method in group [%s]' %
_EXT_AUTH_CONFIG_GROUP_NAME)
raise TackerException(
_('The configuration parameter for '
'key "auth_method" in group [%s] is incorrect.') %
_EXT_AUTH_CONFIG_GROUP_NAME)
LOG.debug(http_response.text)
if http_response.status_code != 200:
LOG.error('The OAuth2.0 access token API returns an '
'incorrect response. '
'response_status: %s, response_text: %s' %
(http_response.status_code,
http_response.text))
raise TackerException(_('Failed to get an access token.'))
access_token = http_response.json().get('access_token')
if not access_token:
LOG.error('Failed to get an access token: %s',
http_response.text)
raise TackerException(_('Failed to get an access token.'))
self.access_token = access_token
except (ksa_exceptions.ConnectFailure,
ksa_exceptions.DiscoveryFailure,
ksa_exceptions.RequestTimeout) as error:
LOG.error('Unable to get an access token: %s', error)
raise TackerException(
_('The OAuth2.0 access token API service is '
'temporarily unavailable.'))
except TackerException:
raise
except Exception as error:
LOG.error('Unable to get an access token: %s', error)
raise TackerException(
_('An exception occurred during the processing '
'of getting an access token'))
header = {'Authorization': f'Bearer {self.access_token}'}
return header
# Copyright (C) 2023 Fujitsu
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Auth for External Server OAuth2.0 authentication
"""
import time
import uuid
import jwt.utils
from oslo_config import cfg
from oslo_log import log as logging
import requests.auth
from keystoneauth1 import exceptions as ksa_exceptions
from keystoneauth1.loading import session as session_loading
from tacker._i18n import _
from tacker.common.exceptions import TackerException
LOG = logging.getLogger(__name__)
_EXT_AUTH_CONFIG_GROUP_NAME = 'ext_oauth2_auth'
_EXTERNAL_AUTH2_OPTS = [
cfg.BoolOpt('use_ext_oauth2_auth', default=False,
help='Set True to use external Oauth2.0 auth server.'),
cfg.StrOpt('token_endpoint',
help='The endpoint for access token API.'),
cfg.StrOpt('scope',
help='The scope that the access token can access.'),
]
_EXTERNAL_AUTH2_KEYSTONE_MIDDLEWARE_OPTS = [
cfg.StrOpt('certfile',
help='Required if identity server requires client '
'certificate.'),
cfg.StrOpt('keyfile',
help='Required if identity server requires client '
'private key.'),
cfg.StrOpt('cafile',
help='A PEM encoded Certificate Authority to use when '
'verifying HTTPs connections. Defaults to system CAs.'),
cfg.BoolOpt('insecure', default=False, help='Verify HTTPS connections.'),
cfg.IntOpt('http_connect_timeout',
help='Request timeout value for communicating with Identity '
'API server.'),
cfg.StrOpt('audience',
help='The Audience should be the URL of the Authorization '
"Server's Token Endpoint. The Authorization Server will "
'verify that it is an intended audience for the token.'),
cfg.StrOpt('auth_method',
default='client_secret_basic',
choices=('client_secret_basic', 'client_secret_post',
'tls_client_auth', 'private_key_jwt',
'client_secret_jwt'),
help='The auth_method must use the authentication method '
'specified by the Authorization Server.'),
cfg.StrOpt('client_id',
help='The OAuth 2.0 Client Identifier valid at the '
'Authorization Server.'),
cfg.StrOpt('client_secret',
help='The OAuth 2.0 client secret. When the auth_method is '
'client_secret_basic, client_secret_post, or '
'client_secret_jwt, the value is used, and otherwise the '
'value is ignored.'),
cfg.StrOpt('jwt_key_file',
help='The jwt_key_file must use the certificate key file which '
'has been registered with the Authorization Server. '
'When the auth_method is private_key_jwt, the value is '
'used, and otherwise the value is ignored.'),
cfg.StrOpt('jwt_algorithm',
help='The jwt_algorithm must use the algorithm specified by '
'the Authorization Server. When the auth_method is '
'client_secret_jwt, this value is often set to HS256,'
'when the auth_method is private_key_jwt, the value is '
'often set to RS256, and otherwise the value is ignored.'),
cfg.IntOpt('jwt_bearer_time_out', default=3600,
help='This value is used to calculate the expiration time. If '
'after the expiration time, the access token cannot be '
'accepted. When the auth_method is client_secret_jwt or '
'private_key_jwt, the value is used, and otherwise the '
'value is ignored.'),
]
def config_opts():
return [(_EXT_AUTH_CONFIG_GROUP_NAME,
_EXTERNAL_AUTH2_OPTS + _EXTERNAL_AUTH2_KEYSTONE_MIDDLEWARE_OPTS)]
cfg.CONF.register_opts(_EXTERNAL_AUTH2_OPTS,
group=_EXT_AUTH_CONFIG_GROUP_NAME)
class ExtOAuth2Auth(object):
"""Construct an Auth to fetch an access token for HTTP access."""
def __init__(self):
self._conf = cfg.CONF.ext_oauth2_auth
# Check whether the configuration parameter has been registered
if 'auth_method' not in self._conf:
LOG.debug('The relevant config parameters are not registered '
'and need to be registered before they can be used.')
cfg.CONF.register_opts(_EXTERNAL_AUTH2_KEYSTONE_MIDDLEWARE_OPTS,
group=_EXT_AUTH_CONFIG_GROUP_NAME)
self.token_endpoint = self._get_config_option(
'token_endpoint', is_required=True)
self.auth_method = self._get_config_option(
'auth_method', is_required=True)
self.client_id = self._get_config_option(
'client_id', is_required=True)
self.scope = self._get_config_option(
'scope', is_required=True)
self.access_token = None
def _get_config_option(self, key, is_required):
"""Read the value from config file by the config key."""
try:
value = getattr(self._conf, key)
except cfg.NoSuchOptError:
value = None
if not value:
if is_required:
LOG.error('The value is required for option %s '
'in group [%s]' % (key,
_EXT_AUTH_CONFIG_GROUP_NAME))
raise TackerException(
_('Configuration error. The parameter '
'is not set for "%s" in group [%s].') % (
key, _EXT_AUTH_CONFIG_GROUP_NAME))
else:
return None
else:
return value
def create_session(self, **kwargs):
"""Create session for HTTP access."""
kwargs.setdefault('cert', self._get_config_option(
'certfile', is_required=False))
kwargs.setdefault('key', self._get_config_option(
'keyfile', is_required=False))
kwargs.setdefault('cacert', self._get_config_option(
'cafile', is_required=False))
kwargs.setdefault('insecure', self._get_config_option(
'insecure', is_required=False))
kwargs.setdefault('timeout', self._get_config_option(
'http_connect_timeout', is_required=False))
kwargs.setdefault('user_agent', 'tacker service')
sess = session_loading.Session().load_from_options(**kwargs)
sess.auth = self
return sess
def get_connection_params(self, session, **kwargs):
"""Get connection params for HTTP access."""
return {}
def invalidate(self):
"""Invalidate the current authentication data."""
self.access_token = None
return True
def _get_token_by_client_secret_basic(self, session):
"""Access the access token API.
Access the access token API to get an access token by
the auth method 'client_secret_basic'.
"""
para = {
'scope': self.scope,
'grant_type': 'client_credentials'
}
auth = requests.auth.HTTPBasicAuth(
self.client_id, self._get_config_option(
'client_secret', is_required=True))
http_response = session.request(
self.token_endpoint,
'POST',
authenticated=False,
data=para,
requests_auth=auth)
return http_response
def _get_token_by_client_secret_post(self, session):
"""Access the access token API.
Access the access token API to get an access token by
the auth method 'client_secret_post'.
"""
para = {
'client_id': self.client_id,
'client_secret': self._get_config_option(
'client_secret', is_required=True),
'scope': self.scope,
'grant_type': 'client_credentials'
}
http_response = session.request(
self.token_endpoint,
'POST',
authenticated=False,
data=para)
return http_response
def _get_token_by_tls_client_auth(self, session):
"""Access the access token API.
Access the access token API to get an access token by
the auth method 'tls_client_auth'.
"""
para = {
'client_id': self.client_id,
'scope': self.scope,
'grant_type': 'client_credentials'
}
http_response = session.request(
self.token_endpoint,
'POST',
authenticated=False,
data=para)
return http_response
def _get_token_by_private_key_jwt(self, session):
"""Access the access token API.
Access the access token API to get an access token by
the auth method 'private_key_jwt'.
"""
jwt_key_file = self._get_config_option(
'jwt_key_file', is_required=True)
with open(jwt_key_file, 'r') as jwt_file:
jwt_key = jwt_file.read()
ita = round(time.time())
exp = ita + self._get_config_option(
'jwt_bearer_time_out', is_required=True)
alg = self._get_config_option('jwt_algorithm', is_required=True)
client_assertion = jwt.encode(
payload={
'jti': str(uuid.uuid4()),
'iat': str(ita),
'exp': str(exp),
'iss': self.client_id,
'sub': self.client_id,
'aud': self._get_config_option('audience', is_required=True)},
headers={
'typ': 'JWT',
'alg': alg},
key=jwt_key,
algorithm=alg)
para = {
'client_id': self.client_id,
'client_assertion_type':
'urn:ietf:params:oauth:client-assertion-type:jwt-bearer',
'client_assertion': client_assertion,
'scope': self.scope,
'grant_type': 'client_credentials'
}
http_response = session.request(
self.token_endpoint,
'POST',
authenticated=False,
data=para)
return http_response
def _get_token_by_client_secret_jwt(self, session):
"""Access the access token API.
Access the access token API to get an access token by
the auth method 'client_secret_jwt'.
"""
ita = round(time.time())
exp = ita + self._get_config_option(
'jwt_bearer_time_out', is_required=True)
alg = self._get_config_option('jwt_algorithm', is_required=True)
client_secret = self._get_config_option(
'client_secret', is_required=True)
client_assertion = jwt.encode(
payload={
'jti': str(uuid.uuid4()),
'iat': str(ita),
'exp': str(exp),
'iss': self.client_id,
'sub': self.client_id,
'aud': self._get_config_option('audience', is_required=True)},
headers={
'typ': 'JWT',
'alg': alg},
key=client_secret,
algorithm=alg)
para = {
'client_id': self.client_id,
'client_secret': client_secret,
'client_assertion_type':
'urn:ietf:params:oauth:client-assertion-type:jwt-bearer',
'client_assertion': client_assertion,
'scope': self.scope,
'grant_type': 'client_credentials'
}
http_response = session.request(
self.token_endpoint,
'POST',
authenticated=False,
data=para)
return http_response
def get_headers(self, session, **kwargs):
"""Get an access token and add to request header for HTTP access."""
if not self.access_token:
try:
if self.auth_method == 'tls_client_auth':
http_response = self._get_token_by_tls_client_auth(session)
elif self.auth_method == 'client_secret_post':
http_response = self._get_token_by_client_secret_post(
session)
elif self.auth_method == 'client_secret_basic':
http_response = self._get_token_by_client_secret_basic(
session)
elif self.auth_method == 'private_key_jwt':
http_response = self._get_token_by_private_key_jwt(
session)
elif self.auth_method == 'client_secret_jwt':
http_response = self._get_token_by_client_secret_jwt(
session)
else:
LOG.error('The value is incorrect for option '
'auth_method in group [%s]' %
_EXT_AUTH_CONFIG_GROUP_NAME)
raise TackerException(
_('The configuration parameter for '
'key "auth_method" in group [%s] is incorrect.') %
_EXT_AUTH_CONFIG_GROUP_NAME)
LOG.debug(http_response.text)
if http_response.status_code != 200:
LOG.error('The OAuth2.0 access token API returns an '
'incorrect response. '
'response_status: %s, response_text: %s' %
(http_response.status_code,
http_response.text))
raise TackerException(_('Failed to get an access token.'))
access_token = http_response.json().get('access_token')
if not access_token:
LOG.error('Failed to get an access token: %s',
http_response.text)
raise TackerException(_('Failed to get an access token.'))
self.access_token = access_token
except (ksa_exceptions.ConnectFailure,
ksa_exceptions.DiscoveryFailure,
ksa_exceptions.RequestTimeout) as error:
LOG.error('Unable to get an access token: %s', error)
raise TackerException(
_('The OAuth2.0 access token API service is '
'temporarily unavailable.'))
except TackerException:
raise
except Exception as error:
LOG.error('Unable to get an access token: %s', error)
raise TackerException(
_('An exception occurred during the processing '
'of getting an access token'))
header = {'Authorization': f'Bearer {self.access_token}'}
return header

View File

@ -1,62 +1,62 @@
# Copyright (C) 2022 Fujitsu
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tacker.sol_refactored.objects import base
from tacker.sol_refactored.objects import fields
# NFV-SOL 003
# - v3.3.1 6.5.2.5 (API version: 2.1.0)
@base.TackerObjectRegistry.register
class PerformanceInformationAvailableNotificationV2(
base.TackerObject,
base.TackerObjectDictCompat
):
# Version 1.0: Initial version
VERSION = '1.0'
fields = {
'id': fields.StringField(nullable=False),
'notificationType': fields.StringField(nullable=False),
'timeStamp': fields.DateTimeField(nullable=False),
'pmJobId': fields.StringField(nullable=False),
'objectType': fields.StringField(nullable=False),
'objectInstanceId': fields.StringField(nullable=False),
'subObjectInstanceIds': fields.ListOfStringsField(nullable=True),
'_links': fields.ObjectField(
'PerformanceInformationAvailableNotificationV2_Links',
nullable=False),
}
@base.TackerObjectRegistry.register
class PerformanceInformationAvailableNotificationV2_Links(
base.TackerObject,
base.TackerObjectDictCompat
):
# Version 1.0: Initial version
VERSION = '1.0'
fields = {
'objectInstance': fields.ObjectField(
'NotificationLink', nullable=True),
'pmJob': fields.ObjectField(
'NotificationLink', nullable=False),
'performanceReport': fields.ObjectField(
'NotificationLink', nullable=False),
}
# Copyright (C) 2022 Fujitsu
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tacker.sol_refactored.objects import base
from tacker.sol_refactored.objects import fields
# NFV-SOL 003
# - v3.3.1 6.5.2.5 (API version: 2.1.0)
@base.TackerObjectRegistry.register
class PerformanceInformationAvailableNotificationV2(
base.TackerObject,
base.TackerObjectDictCompat
):
# Version 1.0: Initial version
VERSION = '1.0'
fields = {
'id': fields.StringField(nullable=False),
'notificationType': fields.StringField(nullable=False),
'timeStamp': fields.DateTimeField(nullable=False),
'pmJobId': fields.StringField(nullable=False),
'objectType': fields.StringField(nullable=False),
'objectInstanceId': fields.StringField(nullable=False),
'subObjectInstanceIds': fields.ListOfStringsField(nullable=True),
'_links': fields.ObjectField(
'PerformanceInformationAvailableNotificationV2_Links',
nullable=False),
}
@base.TackerObjectRegistry.register
class PerformanceInformationAvailableNotificationV2_Links(
base.TackerObject,
base.TackerObjectDictCompat
):
# Version 1.0: Initial version
VERSION = '1.0'
fields = {
'objectInstance': fields.ObjectField(
'NotificationLink', nullable=True),
'pmJob': fields.ObjectField(
'NotificationLink', nullable=False),
'performanceReport': fields.ObjectField(
'NotificationLink', nullable=False),
}

View File

@ -1,34 +1,34 @@
# Copyright (C) 2022 Fujitsu
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tacker.sol_refactored.objects import base
from tacker.sol_refactored.objects import fields
# NFV-SOL 003
# - v3.3.1 6.5.3.3 (API version: 2.0.0)
@base.TackerObjectRegistry.register
class VnfPmJobCriteriaV2(base.TackerObject, base.TackerObjectDictCompat):
# Version 1.0: Initial version
VERSION = '1.0'
fields = {
'performanceMetric': fields.ListOfStringsField(nullable=True),
'performanceMetricGroup': fields.ListOfStringsField(nullable=True),
'collectionPeriod': fields.IntegerField(nullable=False),
'reportingPeriod': fields.IntegerField(nullable=False),
'reportingBoundary': fields.DateTimeField(nullable=True),
}
# Copyright (C) 2022 Fujitsu
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tacker.sol_refactored.objects import base
from tacker.sol_refactored.objects import fields
# NFV-SOL 003
# - v3.3.1 6.5.3.3 (API version: 2.0.0)
@base.TackerObjectRegistry.register
class VnfPmJobCriteriaV2(base.TackerObject, base.TackerObjectDictCompat):
# Version 1.0: Initial version
VERSION = '1.0'
fields = {
'performanceMetric': fields.ListOfStringsField(nullable=True),
'performanceMetricGroup': fields.ListOfStringsField(nullable=True),
'collectionPeriod': fields.IntegerField(nullable=False),
'reportingPeriod': fields.IntegerField(nullable=False),
'reportingBoundary': fields.DateTimeField(nullable=True),
}

View File

@ -1,32 +1,32 @@
# Copyright (C) 2022 Fujitsu
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tacker.sol_refactored.objects import base
from tacker.sol_refactored.objects import fields
# NFV-SOL 003
# - v3.3.1 6.5.2.12 (API version: 2.1.0)
@base.TackerObjectRegistry.register
class PmJobModificationsV2(base.TackerObject, base.TackerObjectDictCompat):
# Version 1.0: Initial version
VERSION = '1.0'
fields = {
'callbackUri': fields.StringField(nullable=True),
'authentication': fields.ObjectField(
'SubscriptionAuthentication', nullable=True),
}
# Copyright (C) 2022 Fujitsu
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tacker.sol_refactored.objects import base
from tacker.sol_refactored.objects import fields
# NFV-SOL 003
# - v3.3.1 6.5.2.12 (API version: 2.1.0)
@base.TackerObjectRegistry.register
class PmJobModificationsV2(base.TackerObject, base.TackerObjectDictCompat):
# Version 1.0: Initial version
VERSION = '1.0'
fields = {
'callbackUri': fields.StringField(nullable=True),
'authentication': fields.ObjectField(
'SubscriptionAuthentication', nullable=True),
}

View File

@ -1,65 +1,65 @@
# Copyright (C) 2022 Fujitsu
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tacker.sol_refactored.objects import base
from tacker.sol_refactored.objects import fields
# NFV-SOL 003
# - v3.3.1 6.5.2.10 (API version: 2.1.0)
@base.TackerObjectRegistry.register
class PerformanceReportV2(base.TackerPersistentObject,
base.TackerObjectDictCompat):
# Version 1.0: Initial version
VERSION = '1.0'
# PerformanceReportV2 need 'id' and 'jobId'
fields = {
'id': fields.StringField(nullable=False),
'jobId': fields.StringField(nullable=False),
'entries': fields.ListOfObjectsField(
'VnfPmReportV2_Entries', nullable=False),
}
@base.TackerObjectRegistry.register
class VnfPmReportV2_Entries(base.TackerObject, base.TackerObjectDictCompat):
# Version 1.0: Initial version
VERSION = '1.0'
fields = {
'objectType': fields.StringField(nullable=False),
'objectInstanceId': fields.StringField(nullable=False),
'subObjectInstanceId': fields.StringField(nullable=True),
'performanceMetric': fields.StringField(nullable=False),
'performanceValues': fields.ListOfObjectsField(
'VnfPmReportV2_Entries_PerformanceValues', nullable=False),
}
@base.TackerObjectRegistry.register
class VnfPmReportV2_Entries_PerformanceValues(base.TackerObject,
base.TackerObjectDictCompat):
# Version 1.0: Initial version
VERSION = '1.0'
fields = {
'timeStamp': fields.DateTimeField(nullable=False),
'value': fields.StringField(nullable=False),
'context': fields.KeyValuePairsField(nullable=True),
}
# Copyright (C) 2022 Fujitsu
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tacker.sol_refactored.objects import base
from tacker.sol_refactored.objects import fields
# NFV-SOL 003
# - v3.3.1 6.5.2.10 (API version: 2.1.0)
@base.TackerObjectRegistry.register
class PerformanceReportV2(base.TackerPersistentObject,
base.TackerObjectDictCompat):
# Version 1.0: Initial version
VERSION = '1.0'
# PerformanceReportV2 need 'id' and 'jobId'
fields = {
'id': fields.StringField(nullable=False),
'jobId': fields.StringField(nullable=False),
'entries': fields.ListOfObjectsField(
'VnfPmReportV2_Entries', nullable=False),
}
@base.TackerObjectRegistry.register
class VnfPmReportV2_Entries(base.TackerObject, base.TackerObjectDictCompat):
# Version 1.0: Initial version
VERSION = '1.0'
fields = {
'objectType': fields.StringField(nullable=False),
'objectInstanceId': fields.StringField(nullable=False),
'subObjectInstanceId': fields.StringField(nullable=True),
'performanceMetric': fields.StringField(nullable=False),
'performanceValues': fields.ListOfObjectsField(
'VnfPmReportV2_Entries_PerformanceValues', nullable=False),
}
@base.TackerObjectRegistry.register
class VnfPmReportV2_Entries_PerformanceValues(base.TackerObject,
base.TackerObjectDictCompat):
# Version 1.0: Initial version
VERSION = '1.0'
fields = {
'timeStamp': fields.DateTimeField(nullable=False),
'value': fields.StringField(nullable=False),
'context': fields.KeyValuePairsField(nullable=True),
}

View File

@ -1,457 +1,457 @@
# Copyright (C) 2023 Fujitsu
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import base64
import copy
import os
from unittest import mock
import uuid
from oslo_config import cfg
from requests_mock.contrib import fixture as rm_fixture
from keystoneauth1 import exceptions as ksa_exceptions
from tacker.common.exceptions import TackerException
from tacker import context
from tacker.tests.unit import base
JWT_KEY_FILE = 'jwt_private.key'
def _get_sample_key(name):
filename = os.path.join(
os.path.dirname(os.path.abspath(__file__)),
"./sample_keys/", name)
with open(filename, "r") as f:
content = f.read()
return content
def get_mock_conf_effect(audience=None, token_endpoint=None,
auth_method=None, client_id=None, client_secret=None,
scope=None, jwt_key_file=None, jwt_algorithm=None,
jwt_bearer_time_out=None, certfile=None, keyfile=None,
cafile=None, http_connect_timeout=None, insecure=None):
def mock_conf_key_effect(name):
if name == 'keystone_authtoken':
return MockConfig(conf=None)
elif name == 'ext_oauth2_auth':
config = {'use_ext_oauth2_auth': True}
if audience:
config['audience'] = audience
if token_endpoint:
config['token_endpoint'] = token_endpoint
if auth_method:
config['auth_method'] = auth_method
if client_id:
config['client_id'] = client_id
if client_secret:
config['client_secret'] = client_secret
if scope:
config['scope'] = scope
if jwt_key_file:
config['jwt_key_file'] = jwt_key_file
if jwt_algorithm:
config['jwt_algorithm'] = jwt_algorithm
if jwt_bearer_time_out:
config['jwt_bearer_time_out'] = jwt_bearer_time_out
if certfile:
config['certfile'] = certfile
if keyfile:
config['keyfile'] = keyfile
if cafile:
config['cafile'] = cafile
if cafile:
config['http_connect_timeout'] = http_connect_timeout
if cafile:
config['insecure'] = insecure
return MockConfig(
conf=config)
else:
return cfg.CONF._get(name)
return mock_conf_key_effect
class MockConfig(object):
def __init__(self, conf=None):
self.conf = conf
def __getattr__(self, name):
if not self.conf or name not in self.conf:
raise cfg.NoSuchOptError(f'not found {name}')
return self.conf.get(name)
def __contains__(self, key):
return key in self.conf
class MockSession(object):
def __init__(self, ):
self.auth = None
class TestExtOAuth2Auth(base.TestCase):
def setUp(self):
super(TestExtOAuth2Auth, self).setUp()
self.requests_mock = self.useFixture(rm_fixture.Fixture())
self.token_endpoint = 'http://demo/token_endpoint'
self.auth_method = 'client_secret_post'
self.client_id = 'test_client_id'
self.client_secret = 'test_client_secret'
self.scope = 'tacker_api'
self.access_token = f'access_token_{str(uuid.uuid4())}'
self.audience = 'http://demo/audience'
self.jwt_bearer_time_out = 2800
self.addCleanup(mock.patch.stopall)
def _get_access_token_response(self, request, context,
auth_method=None,
client_id=None,
client_secret=None,
scope=None,
access_token=None,
status_code=200,
raise_error=None,
resp=None
):
if raise_error:
raise raise_error
if auth_method == 'tls_client_auth':
body = (f'client_id={client_id}&scope={scope}'
f'&grant_type=client_credentials')
self.assertEqual(request.text, body)
elif auth_method == 'client_secret_post':
body = (f'client_id={client_id}&client_secret={client_secret}'
f'&scope={scope}&grant_type=client_credentials')
self.assertEqual(request.text, body)
elif auth_method == 'client_secret_basic':
body = f'scope={scope}&grant_type=client_credentials'
self.assertEqual(request.text, body)
auth_basic = request._request.headers.get('Authorization')
self.assertIsNotNone(auth_basic)
auth = 'Basic ' + base64.standard_b64encode(
f'{client_id}:{client_secret}'.encode('ascii')).decode('ascii')
self.assertEqual(auth_basic, auth)
elif auth_method == 'private_key_jwt':
self.assertIn(f'client_id={client_id}', request.text)
self.assertIn(('client_assertion_type=urn%3Aietf%3Aparams%3A'
'oauth%3Aclient-assertion-type%3Ajwt-bearer'),
request.text)
self.assertIn('client_assertion=', request.text)
self.assertIn(f'scope={scope}', request.text)
self.assertIn('grant_type=client_credentials', request.text)
elif auth_method == 'client_secret_jwt':
self.assertIn(f'client_id={client_id}', request.text)
self.assertIn(('client_assertion_type=urn%3Aietf%3Aparams%3A'
'oauth%3Aclient-assertion-type%3Ajwt-bearer'),
request.text)
self.assertIn('client_assertion=', request.text)
self.assertIn(f'scope={scope}', request.text)
self.assertIn('grant_type=client_credentials', request.text)
if not access_token:
access_token = f'access_token{str(uuid.uuid4())}'
if not resp:
if status_code == 200:
response = {
'access_token': access_token,
'expires_in': 1800,
'refresh_expires_in': 0,
'token_type': 'Bearer',
'not-before-policy': 0,
'scope': scope
}
else:
response = {'error': 'error_title',
'error_description': 'error message'}
else:
response = copy.deepcopy(resp)
context.status_code = status_code
return response
def _get_default_mock_conf_effect(self):
return get_mock_conf_effect(
token_endpoint=self.token_endpoint,
auth_method=self.auth_method,
client_id=self.client_id,
client_secret=self.client_secret,
scope=self.scope)
def _check_authorization_header(self):
auth_context = context.generate_tacker_service_context()
session = auth_context.create_session()
headers = auth_context.get_headers(session)
bearer = f'Bearer {self.access_token}'
self.assertIn('Authorization', headers)
self.assertEqual(bearer, headers.get('Authorization'))
return auth_context
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
def test_init_without_token_endpoint(self, mock_get_conf_key):
mock_get_conf_key.side_effect = get_mock_conf_effect(
token_endpoint='',
auth_method=self.auth_method,
client_id=self.client_id,
client_secret=self.client_secret,
scope=self.scope
)
self.assertRaises(TackerException,
context.generate_tacker_service_context)
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
def test_init_without_scope(self, mock_get_conf_key):
mock_get_conf_key.side_effect = get_mock_conf_effect(
token_endpoint=self.token_endpoint,
auth_method=self.auth_method,
client_id=self.client_id,
client_secret=self.client_secret)
self.assertRaises(TackerException,
context.generate_tacker_service_context)
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
def test_init_without_keystone_middleware_opts(self, mock_get_conf_key):
mock_get_conf_key.side_effect = get_mock_conf_effect(
token_endpoint=self.token_endpoint,
client_id=self.client_id,
client_secret=self.client_secret,
scope=self.scope)
self.assertRaises(TackerException,
context.generate_tacker_service_context)
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
@mock.patch('keystoneauth1.loading.session.Session.load_from_options')
def test_create_session(self, mock_load_from_options, mock_get_conf_key):
certfile = f'/demo/certfile{str(uuid.uuid4())}'
keyfile = f'/demo/keyfile{str(uuid.uuid4())}'
cafile = f'/demo/cafile{str(uuid.uuid4())}'
conf_insecure = True
http_connect_timeout = 1000
def load_side_effect(**kwargs):
self.assertEqual(conf_insecure, kwargs.get('insecure'))
self.assertEqual(cafile, kwargs.get('cacert'))
self.assertEqual(certfile, kwargs.get('cert'))
self.assertEqual(keyfile, kwargs.get('key'))
self.assertEqual(http_connect_timeout, kwargs.get('timeout'))
return MockSession()
mock_load_from_options.side_effect = load_side_effect
mock_get_conf_key.side_effect = get_mock_conf_effect(
token_endpoint=self.token_endpoint,
auth_method='tls_client_auth',
client_id=self.client_id,
scope=self.scope,
certfile=certfile,
keyfile=keyfile,
cafile=cafile,
insecure=conf_insecure,
http_connect_timeout=http_connect_timeout)
auth_context = context.generate_tacker_service_context()
auth_context.create_session()
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
def test_get_connection_params(self, mock_get_conf_key):
mock_get_conf_key.side_effect = self._get_default_mock_conf_effect()
auth_context = context.generate_tacker_service_context()
session = auth_context.create_session()
params = auth_context.get_connection_params(session)
self.assertDictEqual(params, {})
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
def test_get_headers_tls_client_auth(self, mock_get_conf_key):
def mock_resp(request, context):
return self._get_access_token_response(
request, context,
auth_method='tls_client_auth',
client_id=self.client_id,
scope=self.scope,
access_token=self.access_token)
self.requests_mock.post(self.token_endpoint, json=mock_resp)
mock_get_conf_key.side_effect = get_mock_conf_effect(
token_endpoint=self.token_endpoint,
auth_method='tls_client_auth',
client_id=self.client_id,
scope=self.scope)
auth_context = self._check_authorization_header()
result = auth_context.invalidate()
self.assertEqual(True, result)
self.assertIsNone(auth_context.access_token)
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
def test_get_headers_client_secret_post(self, mock_get_conf_key):
def mock_resp(request, context):
return self._get_access_token_response(
request, context,
auth_method='client_secret_post',
client_id=self.client_id,
client_secret=self.client_secret,
scope=self.scope,
access_token=self.access_token
)
self.requests_mock.post(self.token_endpoint, json=mock_resp)
mock_get_conf_key.side_effect = get_mock_conf_effect(
token_endpoint=self.token_endpoint,
auth_method='client_secret_post',
client_id=self.client_id,
client_secret=self.client_secret,
scope=self.scope)
self._check_authorization_header()
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
def test_get_headers_client_secret_basic(self, mock_get_conf_key):
def mock_resp(request, context):
return self._get_access_token_response(
request, context,
auth_method='client_secret_basic',
client_id=self.client_id,
client_secret=self.client_secret,
scope=self.scope,
access_token=self.access_token)
self.requests_mock.post(self.token_endpoint, json=mock_resp)
mock_get_conf_key.side_effect = get_mock_conf_effect(
token_endpoint=self.token_endpoint,
auth_method='client_secret_basic',
client_id=self.client_id,
client_secret=self.client_secret,
scope=self.scope)
self._check_authorization_header()
@mock.patch('builtins.open', mock.mock_open(read_data=_get_sample_key(
JWT_KEY_FILE)))
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
def test_get_headers_private_key_jwt(self, mock_get_conf_key):
def mock_resp(request, context):
return self._get_access_token_response(
request, context,
auth_method='private_key_jwt',
client_id=self.client_id,
scope=self.scope,
access_token=self.access_token)
self.requests_mock.post(self.token_endpoint, json=mock_resp)
mock_get_conf_key.side_effect = get_mock_conf_effect(
token_endpoint=self.token_endpoint,
auth_method='private_key_jwt',
client_id=self.client_id,
audience=self.audience,
jwt_key_file=f'/demo/jwt_key_file{str(uuid.uuid4())}',
jwt_algorithm='RS256',
jwt_bearer_time_out=self.jwt_bearer_time_out,
scope=self.scope)
self._check_authorization_header()
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
def test_get_headers_client_secret_jwt(self, mock_get_conf_key):
def mock_resp(request, context):
return self._get_access_token_response(
request, context,
auth_method='client_secret_jwt',
client_id=self.client_id,
client_secret=self.client_secret,
scope=self.scope,
access_token=self.access_token)
self.requests_mock.post(self.token_endpoint, json=mock_resp)
mock_get_conf_key.side_effect = get_mock_conf_effect(
token_endpoint=self.token_endpoint,
auth_method='client_secret_jwt',
client_id=self.client_id,
audience=self.audience,
client_secret=self.client_secret,
jwt_algorithm='HS256',
jwt_bearer_time_out=self.jwt_bearer_time_out,
scope=self.scope)
self._check_authorization_header()
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
def test_get_headers_invalid_auth_method(self, mock_get_conf_key):
mock_get_conf_key.side_effect = get_mock_conf_effect(
token_endpoint=self.token_endpoint,
auth_method='client_secret_other',
client_id=self.client_id,
client_secret=self.client_secret,
scope=self.scope
)
auth_context = context.generate_tacker_service_context()
session = auth_context.create_session()
self.assertRaises(TackerException, auth_context.get_headers, session)
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
def test_get_headers_connect_fail(self, mock_get_conf_key):
def mock_resp(request, context):
return self._get_access_token_response(
request, context,
auth_method=self.auth_method,
client_id=self.client_id,
client_secret=self.client_secret,
scope=self.scope,
access_token=self.access_token,
raise_error=ksa_exceptions.RequestTimeout('connect time out.'))
self.requests_mock.post(self.token_endpoint, json=mock_resp)
mock_get_conf_key.side_effect = self._get_default_mock_conf_effect()
auth_context = context.generate_tacker_service_context()
session = auth_context.create_session()
self.assertRaises(TackerException, auth_context.get_headers, session)
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
def test_get_headers_is_not_200(self, mock_get_conf_key):
def mock_resp(request, context):
return self._get_access_token_response(
request, context,
auth_method=self.auth_method,
client_id=self.client_id,
client_secret=self.client_secret,
scope=self.scope,
access_token=self.access_token,
status_code=201)
self.requests_mock.post(self.token_endpoint, json=mock_resp)
mock_get_conf_key.side_effect = self._get_default_mock_conf_effect()
auth_context = context.generate_tacker_service_context()
session = auth_context.create_session()
self.assertRaises(TackerException, auth_context.get_headers, session)
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
def test_get_headers_not_include_access_token(self, mock_get_conf_key):
def mock_resp(request, context):
return self._get_access_token_response(
request, context,
auth_method=self.auth_method,
client_id=self.client_id,
client_secret=self.client_secret,
scope=self.scope,
access_token=self.access_token,
status_code=200,
resp={'error': 'invalid_client',
'error_description': 'The client is not found.'})
self.requests_mock.post(self.token_endpoint, json=mock_resp)
mock_get_conf_key.side_effect = self._get_default_mock_conf_effect()
auth_context = context.generate_tacker_service_context()
session = auth_context.create_session()
self.assertRaises(TackerException, auth_context.get_headers, session)
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
def test_get_headers_unknown_error(self, mock_get_conf_key):
def mock_resp(request, context):
return self._get_access_token_response(
request, context,
auth_method=self.auth_method,
client_id=self.client_id,
client_secret=self.client_secret,
scope=self.scope,
access_token=self.access_token,
raise_error=Exception('unknown error occurred.'))
self.requests_mock.post(self.token_endpoint, json=mock_resp)
mock_get_conf_key.side_effect = self._get_default_mock_conf_effect()
auth_context = context.generate_tacker_service_context()
session = auth_context.create_session()
self.assertRaises(TackerException, auth_context.get_headers, session)
# Copyright (C) 2023 Fujitsu
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import base64
import copy
import os
from unittest import mock
import uuid
from oslo_config import cfg
from requests_mock.contrib import fixture as rm_fixture
from keystoneauth1 import exceptions as ksa_exceptions
from tacker.common.exceptions import TackerException
from tacker import context
from tacker.tests.unit import base
JWT_KEY_FILE = 'jwt_private.key'
def _get_sample_key(name):
filename = os.path.join(
os.path.dirname(os.path.abspath(__file__)),
"./sample_keys/", name)
with open(filename, "r") as f:
content = f.read()
return content
def get_mock_conf_effect(audience=None, token_endpoint=None,
auth_method=None, client_id=None, client_secret=None,
scope=None, jwt_key_file=None, jwt_algorithm=None,
jwt_bearer_time_out=None, certfile=None, keyfile=None,
cafile=None, http_connect_timeout=None, insecure=None):
def mock_conf_key_effect(name):
if name == 'keystone_authtoken':
return MockConfig(conf=None)
elif name == 'ext_oauth2_auth':
config = {'use_ext_oauth2_auth': True}
if audience:
config['audience'] = audience
if token_endpoint:
config['token_endpoint'] = token_endpoint
if auth_method:
config['auth_method'] = auth_method
if client_id:
config['client_id'] = client_id
if client_secret:
config['client_secret'] = client_secret
if scope:
config['scope'] = scope
if jwt_key_file:
config['jwt_key_file'] = jwt_key_file
if jwt_algorithm:
config['jwt_algorithm'] = jwt_algorithm
if jwt_bearer_time_out:
config['jwt_bearer_time_out'] = jwt_bearer_time_out
if certfile:
config['certfile'] = certfile
if keyfile:
config['keyfile'] = keyfile
if cafile:
config['cafile'] = cafile
if cafile:
config['http_connect_timeout'] = http_connect_timeout
if cafile:
config['insecure'] = insecure
return MockConfig(
conf=config)
else:
return cfg.CONF._get(name)
return mock_conf_key_effect
class MockConfig(object):
def __init__(self, conf=None):
self.conf = conf
def __getattr__(self, name):
if not self.conf or name not in self.conf:
raise cfg.NoSuchOptError(f'not found {name}')
return self.conf.get(name)
def __contains__(self, key):
return key in self.conf
class MockSession(object):
def __init__(self, ):
self.auth = None
class TestExtOAuth2Auth(base.TestCase):
def setUp(self):
super(TestExtOAuth2Auth, self).setUp()
self.requests_mock = self.useFixture(rm_fixture.Fixture())
self.token_endpoint = 'http://demo/token_endpoint'
self.auth_method = 'client_secret_post'
self.client_id = 'test_client_id'
self.client_secret = 'test_client_secret'
self.scope = 'tacker_api'
self.access_token = f'access_token_{str(uuid.uuid4())}'
self.audience = 'http://demo/audience'
self.jwt_bearer_time_out = 2800
self.addCleanup(mock.patch.stopall)
def _get_access_token_response(self, request, context,
auth_method=None,
client_id=None,
client_secret=None,
scope=None,
access_token=None,
status_code=200,
raise_error=None,
resp=None
):
if raise_error:
raise raise_error
if auth_method == 'tls_client_auth':
body = (f'client_id={client_id}&scope={scope}'
f'&grant_type=client_credentials')
self.assertEqual(request.text, body)
elif auth_method == 'client_secret_post':
body = (f'client_id={client_id}&client_secret={client_secret}'
f'&scope={scope}&grant_type=client_credentials')
self.assertEqual(request.text, body)
elif auth_method == 'client_secret_basic':
body = f'scope={scope}&grant_type=client_credentials'
self.assertEqual(request.text, body)
auth_basic = request._request.headers.get('Authorization')
self.assertIsNotNone(auth_basic)
auth = 'Basic ' + base64.standard_b64encode(
f'{client_id}:{client_secret}'.encode('ascii')).decode('ascii')
self.assertEqual(auth_basic, auth)
elif auth_method == 'private_key_jwt':
self.assertIn(f'client_id={client_id}', request.text)
self.assertIn(('client_assertion_type=urn%3Aietf%3Aparams%3A'
'oauth%3Aclient-assertion-type%3Ajwt-bearer'),
request.text)
self.assertIn('client_assertion=', request.text)
self.assertIn(f'scope={scope}', request.text)
self.assertIn('grant_type=client_credentials', request.text)
elif auth_method == 'client_secret_jwt':
self.assertIn(f'client_id={client_id}', request.text)
self.assertIn(('client_assertion_type=urn%3Aietf%3Aparams%3A'
'oauth%3Aclient-assertion-type%3Ajwt-bearer'),
request.text)
self.assertIn('client_assertion=', request.text)
self.assertIn(f'scope={scope}', request.text)
self.assertIn('grant_type=client_credentials', request.text)
if not access_token:
access_token = f'access_token{str(uuid.uuid4())}'
if not resp:
if status_code == 200:
response = {
'access_token': access_token,
'expires_in': 1800,
'refresh_expires_in': 0,
'token_type': 'Bearer',
'not-before-policy': 0,
'scope': scope
}
else:
response = {'error': 'error_title',
'error_description': 'error message'}
else:
response = copy.deepcopy(resp)
context.status_code = status_code
return response
def _get_default_mock_conf_effect(self):
return get_mock_conf_effect(
token_endpoint=self.token_endpoint,
auth_method=self.auth_method,
client_id=self.client_id,
client_secret=self.client_secret,
scope=self.scope)
def _check_authorization_header(self):
auth_context = context.generate_tacker_service_context()
session = auth_context.create_session()
headers = auth_context.get_headers(session)
bearer = f'Bearer {self.access_token}'
self.assertIn('Authorization', headers)
self.assertEqual(bearer, headers.get('Authorization'))
return auth_context
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
def test_init_without_token_endpoint(self, mock_get_conf_key):
mock_get_conf_key.side_effect = get_mock_conf_effect(
token_endpoint='',
auth_method=self.auth_method,
client_id=self.client_id,
client_secret=self.client_secret,
scope=self.scope
)
self.assertRaises(TackerException,
context.generate_tacker_service_context)
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
def test_init_without_scope(self, mock_get_conf_key):
mock_get_conf_key.side_effect = get_mock_conf_effect(
token_endpoint=self.token_endpoint,
auth_method=self.auth_method,
client_id=self.client_id,
client_secret=self.client_secret)
self.assertRaises(TackerException,
context.generate_tacker_service_context)
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
def test_init_without_keystone_middleware_opts(self, mock_get_conf_key):
mock_get_conf_key.side_effect = get_mock_conf_effect(
token_endpoint=self.token_endpoint,
client_id=self.client_id,
client_secret=self.client_secret,
scope=self.scope)
self.assertRaises(TackerException,
context.generate_tacker_service_context)
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
@mock.patch('keystoneauth1.loading.session.Session.load_from_options')
def test_create_session(self, mock_load_from_options, mock_get_conf_key):
certfile = f'/demo/certfile{str(uuid.uuid4())}'
keyfile = f'/demo/keyfile{str(uuid.uuid4())}'
cafile = f'/demo/cafile{str(uuid.uuid4())}'
conf_insecure = True
http_connect_timeout = 1000
def load_side_effect(**kwargs):
self.assertEqual(conf_insecure, kwargs.get('insecure'))
self.assertEqual(cafile, kwargs.get('cacert'))
self.assertEqual(certfile, kwargs.get('cert'))
self.assertEqual(keyfile, kwargs.get('key'))
self.assertEqual(http_connect_timeout, kwargs.get('timeout'))
return MockSession()
mock_load_from_options.side_effect = load_side_effect
mock_get_conf_key.side_effect = get_mock_conf_effect(
token_endpoint=self.token_endpoint,
auth_method='tls_client_auth',
client_id=self.client_id,
scope=self.scope,
certfile=certfile,
keyfile=keyfile,
cafile=cafile,
insecure=conf_insecure,
http_connect_timeout=http_connect_timeout)
auth_context = context.generate_tacker_service_context()
auth_context.create_session()
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
def test_get_connection_params(self, mock_get_conf_key):
mock_get_conf_key.side_effect = self._get_default_mock_conf_effect()
auth_context = context.generate_tacker_service_context()
session = auth_context.create_session()
params = auth_context.get_connection_params(session)
self.assertDictEqual(params, {})
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
def test_get_headers_tls_client_auth(self, mock_get_conf_key):
def mock_resp(request, context):
return self._get_access_token_response(
request, context,
auth_method='tls_client_auth',
client_id=self.client_id,
scope=self.scope,
access_token=self.access_token)
self.requests_mock.post(self.token_endpoint, json=mock_resp)
mock_get_conf_key.side_effect = get_mock_conf_effect(
token_endpoint=self.token_endpoint,
auth_method='tls_client_auth',
client_id=self.client_id,
scope=self.scope)
auth_context = self._check_authorization_header()
result = auth_context.invalidate()
self.assertEqual(True, result)
self.assertIsNone(auth_context.access_token)
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
def test_get_headers_client_secret_post(self, mock_get_conf_key):
def mock_resp(request, context):
return self._get_access_token_response(
request, context,
auth_method='client_secret_post',
client_id=self.client_id,
client_secret=self.client_secret,
scope=self.scope,
access_token=self.access_token
)
self.requests_mock.post(self.token_endpoint, json=mock_resp)
mock_get_conf_key.side_effect = get_mock_conf_effect(
token_endpoint=self.token_endpoint,
auth_method='client_secret_post',
client_id=self.client_id,
client_secret=self.client_secret,
scope=self.scope)
self._check_authorization_header()
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
def test_get_headers_client_secret_basic(self, mock_get_conf_key):
def mock_resp(request, context):
return self._get_access_token_response(
request, context,
auth_method='client_secret_basic',
client_id=self.client_id,
client_secret=self.client_secret,
scope=self.scope,
access_token=self.access_token)
self.requests_mock.post(self.token_endpoint, json=mock_resp)
mock_get_conf_key.side_effect = get_mock_conf_effect(
token_endpoint=self.token_endpoint,
auth_method='client_secret_basic',
client_id=self.client_id,
client_secret=self.client_secret,
scope=self.scope)
self._check_authorization_header()
@mock.patch('builtins.open', mock.mock_open(read_data=_get_sample_key(
JWT_KEY_FILE)))
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
def test_get_headers_private_key_jwt(self, mock_get_conf_key):
def mock_resp(request, context):
return self._get_access_token_response(
request, context,
auth_method='private_key_jwt',
client_id=self.client_id,
scope=self.scope,
access_token=self.access_token)
self.requests_mock.post(self.token_endpoint, json=mock_resp)
mock_get_conf_key.side_effect = get_mock_conf_effect(
token_endpoint=self.token_endpoint,
auth_method='private_key_jwt',
client_id=self.client_id,
audience=self.audience,
jwt_key_file=f'/demo/jwt_key_file{str(uuid.uuid4())}',
jwt_algorithm='RS256',
jwt_bearer_time_out=self.jwt_bearer_time_out,
scope=self.scope)
self._check_authorization_header()
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
def test_get_headers_client_secret_jwt(self, mock_get_conf_key):
def mock_resp(request, context):
return self._get_access_token_response(
request, context,
auth_method='client_secret_jwt',
client_id=self.client_id,
client_secret=self.client_secret,
scope=self.scope,
access_token=self.access_token)
self.requests_mock.post(self.token_endpoint, json=mock_resp)
mock_get_conf_key.side_effect = get_mock_conf_effect(
token_endpoint=self.token_endpoint,
auth_method='client_secret_jwt',
client_id=self.client_id,
audience=self.audience,
client_secret=self.client_secret,
jwt_algorithm='HS256',
jwt_bearer_time_out=self.jwt_bearer_time_out,
scope=self.scope)
self._check_authorization_header()
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
def test_get_headers_invalid_auth_method(self, mock_get_conf_key):
mock_get_conf_key.side_effect = get_mock_conf_effect(
token_endpoint=self.token_endpoint,
auth_method='client_secret_other',
client_id=self.client_id,
client_secret=self.client_secret,
scope=self.scope
)
auth_context = context.generate_tacker_service_context()
session = auth_context.create_session()
self.assertRaises(TackerException, auth_context.get_headers, session)
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
def test_get_headers_connect_fail(self, mock_get_conf_key):
def mock_resp(request, context):
return self._get_access_token_response(
request, context,
auth_method=self.auth_method,
client_id=self.client_id,
client_secret=self.client_secret,
scope=self.scope,
access_token=self.access_token,
raise_error=ksa_exceptions.RequestTimeout('connect time out.'))
self.requests_mock.post(self.token_endpoint, json=mock_resp)
mock_get_conf_key.side_effect = self._get_default_mock_conf_effect()
auth_context = context.generate_tacker_service_context()
session = auth_context.create_session()
self.assertRaises(TackerException, auth_context.get_headers, session)
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
def test_get_headers_is_not_200(self, mock_get_conf_key):
def mock_resp(request, context):
return self._get_access_token_response(
request, context,
auth_method=self.auth_method,
client_id=self.client_id,
client_secret=self.client_secret,
scope=self.scope,
access_token=self.access_token,
status_code=201)
self.requests_mock.post(self.token_endpoint, json=mock_resp)
mock_get_conf_key.side_effect = self._get_default_mock_conf_effect()
auth_context = context.generate_tacker_service_context()
session = auth_context.create_session()
self.assertRaises(TackerException, auth_context.get_headers, session)
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
def test_get_headers_not_include_access_token(self, mock_get_conf_key):
def mock_resp(request, context):
return self._get_access_token_response(
request, context,
auth_method=self.auth_method,
client_id=self.client_id,
client_secret=self.client_secret,
scope=self.scope,
access_token=self.access_token,
status_code=200,
resp={'error': 'invalid_client',
'error_description': 'The client is not found.'})
self.requests_mock.post(self.token_endpoint, json=mock_resp)
mock_get_conf_key.side_effect = self._get_default_mock_conf_effect()
auth_context = context.generate_tacker_service_context()
session = auth_context.create_session()
self.assertRaises(TackerException, auth_context.get_headers, session)
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
def test_get_headers_unknown_error(self, mock_get_conf_key):
def mock_resp(request, context):
return self._get_access_token_response(
request, context,
auth_method=self.auth_method,
client_id=self.client_id,
client_secret=self.client_secret,
scope=self.scope,
access_token=self.access_token,
raise_error=Exception('unknown error occurred.'))
self.requests_mock.post(self.token_endpoint, json=mock_resp)
mock_get_conf_key.side_effect = self._get_default_mock_conf_effect()
auth_context = context.generate_tacker_service_context()
session = auth_context.create_session()
self.assertRaises(TackerException, auth_context.get_headers, session)

View File

@ -1,255 +1,255 @@
# Copyright (C) 2023 Fujitsu
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from unittest import mock
import uuid
from oslo_config import cfg
from requests_mock.contrib import fixture as rm_fixture
from tacker import context as t_context
from tacker.keymgr.barbican_key_manager import BarbicanKeyManager
from tacker.keymgr import exception
from tacker.tests.unit import base
def get_mock_conf_key_effect(barbican_endpoint=None):
def mock_conf_key_effect(name):
if name == 'ext_oauth2_auth':
return MockConfig(
conf={
'use_ext_oauth2_auth': True,
'token_endpoint': 'http://demo/token_endpoint',
'auth_method': 'client_secret_post',
'client_id': 'client_id',
'client_secret': 'client_secret',
'scope': 'client_secret'
})
elif name == 'key_manager':
conf = {
'api_class': 'tacker.keymgr.barbican_key_manager'
'.BarbicanKeyManager',
'barbican_version': 'v1',
'barbican_endpoint': barbican_endpoint
}
return MockConfig(conf=conf)
elif name == 'k8s_vim':
return MockConfig(
conf={
'use_barbican': True
})
else:
return cfg.CONF._get(name)
return mock_conf_key_effect
class MockConfig(object):
def __init__(self, conf=None):
self.conf = conf
def __getattr__(self, name):
if not self.conf and name not in self.conf:
raise cfg.NoSuchOptError(f'not found {name}')
return self.conf.get(name)
def __contains__(self, key):
return key in self.conf
class TestBarbicanKeyManager(base.TestCase):
def setUp(self):
super(TestBarbicanKeyManager, self).setUp()
self.requests_mock = self.useFixture(rm_fixture.Fixture())
self.token_endpoint = 'http://demo/token_endpoint'
self.auth_method = 'client_secret_post'
self.client_id = 'test_client_id'
self.client_secret = 'test_client_secret'
self.scope = 'tacker_api'
self.access_token = f'access_token_{str(uuid.uuid4())}'
self.audience = 'http://demo/audience'
self.jwt_bearer_time_out = 2800
self.addCleanup(mock.patch.stopall)
def _mock_external_token_api(self):
def mock_token_resp(request, context):
response = {
'access_token': self.access_token,
'expires_in': 1800,
'refresh_expires_in': 0,
'token_type': 'Bearer',
'not-before-policy': 0,
'scope': 'tacker_api'
}
context.status_code = 200
return response
self.requests_mock.post('http://demo/token_endpoint',
json=mock_token_resp)
def _mock_barbican_get_version_resp(self):
def mock_barbican_get_resp(request, context):
auth_value = f'Bearer {self.access_token}'
req_auth = request._request.headers.get('Authorization')
self.assertEqual(auth_value, req_auth)
context.status_code = 200
response = {
"versions": {
"values": [
{
"id": "v1",
"status": "stable",
"links": [
{
"rel": "self",
"href": "http://demo/barbican/v1/"
},
{
"rel": "describedby",
"type": "text/html",
"href": "https://docs.openstack.org/"}
],
"media-types": [
{
"base": "application/json",
"type": "application/"
"vnd.openstack.key-manager-v1+json"
}
]
}
]
}
}
return response
return mock_barbican_get_resp
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
@mock.patch('barbicanclient.base.validate_ref_and_return_uuid')
def test_delete_ext_oauth2_auth(self, mock_validate, mock_get_conf_key):
mock_get_conf_key.side_effect = get_mock_conf_key_effect(
barbican_endpoint='http://demo/barbican/')
self._mock_external_token_api()
mock_validate.return_value = True
def mock_barbican_delete_resp(request, context):
auth_value = f'Bearer {self.access_token}'
req_auth = request._request.headers.get('Authorization')
self.assertEqual(auth_value, req_auth)
context.status_code = 204
return ''
def mock_barbican_get_for_check_resp(request, context):
auth_value = f'Bearer {self.access_token}'
req_auth = request._request.headers.get('Authorization')
self.assertEqual(auth_value, req_auth)
context.status_code = 200
return {}
self.requests_mock.get(
'http://demo/barbican',
json=self._mock_barbican_get_version_resp())
self.requests_mock.delete(
'http://demo/barbican/v1/secrets/True',
json=mock_barbican_delete_resp)
self.requests_mock.get(
'http://demo/barbican/v1/secrets/True',
json=mock_barbican_get_for_check_resp)
auth = t_context.generate_tacker_service_context()
keymgr = BarbicanKeyManager(auth.token_endpoint)
keymgr.delete(auth, 'test')
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
@mock.patch('tacker.keymgr.barbican_key_manager.'
'BarbicanKeyManager._retrieve_secret_uuid')
def test_store_ext_oauth2_auth(self, mock_secret_uuid,
mock_get_conf_key):
mock_get_conf_key.side_effect = get_mock_conf_key_effect(
barbican_endpoint='http://demo/barbican')
secret_id = 'store_secret_uuid'
mock_secret_uuid.return_value = secret_id
self._mock_external_token_api()
def mock_barbican_post_resp(request, context):
auth_value = f'Bearer {self.access_token}'
req_auth = request._request.headers.get('Authorization')
self.assertEqual(auth_value, req_auth)
response = {
'name': 'AES key',
'expiration': '2023-01-13T19:14:44.180394',
'algorithm': 'aes',
'bit_length': 256,
'mode': 'cbc',
'payload': 'YmVlcg==',
'payload_content_type': 'application/octet-stream',
'payload_content_encoding': 'base64'
}
context.status_code = 201
return response
self.requests_mock.get(
'http://demo/barbican',
json=self._mock_barbican_get_version_resp())
self.requests_mock.post('http://demo/barbican/v1/secrets/',
json=mock_barbican_post_resp)
auth = t_context.generate_tacker_service_context()
keymgr = BarbicanKeyManager(auth.token_endpoint)
result = keymgr.store(auth, 'test')
self.assertEqual(result, secret_id)
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
@mock.patch('barbicanclient.base.validate_ref_and_return_uuid')
def test_get_ext_oauth2_auth(self, mock_validate, mock_get_conf_key):
mock_get_conf_key.side_effect = get_mock_conf_key_effect(
barbican_endpoint='http://demo/barbican/')
self._mock_external_token_api()
mock_validate.return_value = True
def mock_barbican_get_resp(request, context):
auth_value = f'Bearer {self.access_token}'
req_auth = request._request.headers.get('Authorization')
self.assertEqual(auth_value, req_auth)
context.status_code = 200
response = {
'id': 'test001'
}
return response
self.requests_mock.get(
'http://demo/barbican',
json=self._mock_barbican_get_version_resp())
self.requests_mock.get(
'http://demo/barbican/v1/secrets/True',
json=mock_barbican_get_resp)
auth = t_context.generate_tacker_service_context()
keymgr = BarbicanKeyManager(auth.token_endpoint)
result = keymgr.get(auth, 'test001')
self.assertEqual(result.secret_ref,
'http://demo/barbican/v1/secrets/test001')
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
def test_get_ext_oauth2_auth_no_endpoint(self, mock_get_conf_key):
mock_get_conf_key.side_effect = get_mock_conf_key_effect(
barbican_endpoint='')
self._mock_external_token_api()
auth = t_context.generate_tacker_service_context()
keymgr = BarbicanKeyManager(auth.token_endpoint)
self.assertRaises(exception.KeyManagerError, keymgr.get, auth, 'test')
# Copyright (C) 2023 Fujitsu
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from unittest import mock
import uuid
from oslo_config import cfg
from requests_mock.contrib import fixture as rm_fixture
from tacker import context as t_context
from tacker.keymgr.barbican_key_manager import BarbicanKeyManager
from tacker.keymgr import exception
from tacker.tests.unit import base
def get_mock_conf_key_effect(barbican_endpoint=None):
def mock_conf_key_effect(name):
if name == 'ext_oauth2_auth':
return MockConfig(
conf={
'use_ext_oauth2_auth': True,
'token_endpoint': 'http://demo/token_endpoint',
'auth_method': 'client_secret_post',
'client_id': 'client_id',
'client_secret': 'client_secret',
'scope': 'client_secret'
})
elif name == 'key_manager':
conf = {
'api_class': 'tacker.keymgr.barbican_key_manager'
'.BarbicanKeyManager',
'barbican_version': 'v1',
'barbican_endpoint': barbican_endpoint
}
return MockConfig(conf=conf)
elif name == 'k8s_vim':
return MockConfig(
conf={
'use_barbican': True
})
else:
return cfg.CONF._get(name)
return mock_conf_key_effect
class MockConfig(object):
def __init__(self, conf=None):
self.conf = conf
def __getattr__(self, name):
if not self.conf and name not in self.conf:
raise cfg.NoSuchOptError(f'not found {name}')
return self.conf.get(name)
def __contains__(self, key):
return key in self.conf
class TestBarbicanKeyManager(base.TestCase):
def setUp(self):
super(TestBarbicanKeyManager, self).setUp()
self.requests_mock = self.useFixture(rm_fixture.Fixture())
self.token_endpoint = 'http://demo/token_endpoint'
self.auth_method = 'client_secret_post'
self.client_id = 'test_client_id'
self.client_secret = 'test_client_secret'
self.scope = 'tacker_api'
self.access_token = f'access_token_{str(uuid.uuid4())}'
self.audience = 'http://demo/audience'
self.jwt_bearer_time_out = 2800
self.addCleanup(mock.patch.stopall)
def _mock_external_token_api(self):
def mock_token_resp(request, context):
response = {
'access_token': self.access_token,
'expires_in': 1800,
'refresh_expires_in': 0,
'token_type': 'Bearer',
'not-before-policy': 0,
'scope': 'tacker_api'
}
context.status_code = 200
return response
self.requests_mock.post('http://demo/token_endpoint',
json=mock_token_resp)
def _mock_barbican_get_version_resp(self):
def mock_barbican_get_resp(request, context):
auth_value = f'Bearer {self.access_token}'
req_auth = request._request.headers.get('Authorization')
self.assertEqual(auth_value, req_auth)
context.status_code = 200
response = {
"versions": {
"values": [
{
"id": "v1",
"status": "stable",
"links": [
{
"rel": "self",
"href": "http://demo/barbican/v1/"
},
{
"rel": "describedby",
"type": "text/html",
"href": "https://docs.openstack.org/"}
],
"media-types": [
{
"base": "application/json",
"type": "application/"
"vnd.openstack.key-manager-v1+json"
}
]
}
]
}
}
return response
return mock_barbican_get_resp
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
@mock.patch('barbicanclient.base.validate_ref_and_return_uuid')
def test_delete_ext_oauth2_auth(self, mock_validate, mock_get_conf_key):
mock_get_conf_key.side_effect = get_mock_conf_key_effect(
barbican_endpoint='http://demo/barbican/')
self._mock_external_token_api()
mock_validate.return_value = True
def mock_barbican_delete_resp(request, context):
auth_value = f'Bearer {self.access_token}'
req_auth = request._request.headers.get('Authorization')
self.assertEqual(auth_value, req_auth)
context.status_code = 204
return ''
def mock_barbican_get_for_check_resp(request, context):
auth_value = f'Bearer {self.access_token}'
req_auth = request._request.headers.get('Authorization')
self.assertEqual(auth_value, req_auth)
context.status_code = 200
return {}
self.requests_mock.get(
'http://demo/barbican',
json=self._mock_barbican_get_version_resp())
self.requests_mock.delete(
'http://demo/barbican/v1/secrets/True',
json=mock_barbican_delete_resp)
self.requests_mock.get(
'http://demo/barbican/v1/secrets/True',
json=mock_barbican_get_for_check_resp)
auth = t_context.generate_tacker_service_context()
keymgr = BarbicanKeyManager(auth.token_endpoint)
keymgr.delete(auth, 'test')
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
@mock.patch('tacker.keymgr.barbican_key_manager.'
'BarbicanKeyManager._retrieve_secret_uuid')
def test_store_ext_oauth2_auth(self, mock_secret_uuid,
mock_get_conf_key):
mock_get_conf_key.side_effect = get_mock_conf_key_effect(
barbican_endpoint='http://demo/barbican')
secret_id = 'store_secret_uuid'
mock_secret_uuid.return_value = secret_id
self._mock_external_token_api()
def mock_barbican_post_resp(request, context):
auth_value = f'Bearer {self.access_token}'
req_auth = request._request.headers.get('Authorization')
self.assertEqual(auth_value, req_auth)
response = {
'name': 'AES key',
'expiration': '2023-01-13T19:14:44.180394',
'algorithm': 'aes',
'bit_length': 256,
'mode': 'cbc',
'payload': 'YmVlcg==',
'payload_content_type': 'application/octet-stream',
'payload_content_encoding': 'base64'
}
context.status_code = 201
return response
self.requests_mock.get(
'http://demo/barbican',
json=self._mock_barbican_get_version_resp())
self.requests_mock.post('http://demo/barbican/v1/secrets/',
json=mock_barbican_post_resp)
auth = t_context.generate_tacker_service_context()
keymgr = BarbicanKeyManager(auth.token_endpoint)
result = keymgr.store(auth, 'test')
self.assertEqual(result, secret_id)
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
@mock.patch('barbicanclient.base.validate_ref_and_return_uuid')
def test_get_ext_oauth2_auth(self, mock_validate, mock_get_conf_key):
mock_get_conf_key.side_effect = get_mock_conf_key_effect(
barbican_endpoint='http://demo/barbican/')
self._mock_external_token_api()
mock_validate.return_value = True
def mock_barbican_get_resp(request, context):
auth_value = f'Bearer {self.access_token}'
req_auth = request._request.headers.get('Authorization')
self.assertEqual(auth_value, req_auth)
context.status_code = 200
response = {
'id': 'test001'
}
return response
self.requests_mock.get(
'http://demo/barbican',
json=self._mock_barbican_get_version_resp())
self.requests_mock.get(
'http://demo/barbican/v1/secrets/True',
json=mock_barbican_get_resp)
auth = t_context.generate_tacker_service_context()
keymgr = BarbicanKeyManager(auth.token_endpoint)
result = keymgr.get(auth, 'test001')
self.assertEqual(result.secret_ref,
'http://demo/barbican/v1/secrets/test001')
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
def test_get_ext_oauth2_auth_no_endpoint(self, mock_get_conf_key):
mock_get_conf_key.side_effect = get_mock_conf_key_effect(
barbican_endpoint='')
self._mock_external_token_api()
auth = t_context.generate_tacker_service_context()
keymgr = BarbicanKeyManager(auth.token_endpoint)
self.assertRaises(exception.KeyManagerError, keymgr.get, auth, 'test')