Merge "NSX Policy resources"

This commit is contained in:
Jenkins 2017-04-19 10:44:25 +00:00 committed by Gerrit Code Review
commit ec1eb444a3
9 changed files with 2552 additions and 4 deletions

View File

@ -17,6 +17,7 @@ import copy
import mock
import unittest
from oslo_serialization import jsonutils
from oslo_utils import uuidutils
from requests import exceptions as requests_exceptions
@ -366,3 +367,15 @@ class NsxClientTestCase(NsxLibTestCase):
nsxlib_config.nsx_api_managers = conf_managers
return nsx_cluster.NSXClusteredAPI(nsxlib_config)
def assert_json_call(self, method, client, url,
headers=nsx_client.JSONRESTClient._DEFAULT_HEADERS,
timeout=(NSX_HTTP_TIMEOUT, NSX_HTTP_READ_TIMEOUT),
data=None):
cluster = client._conn
if data:
data = jsonutils.dumps(data, sort_keys=True)
cluster.assert_called_once(
method,
**{'url': url, 'verify': NSX_CERT, 'body': data,
'headers': headers, 'cert': None, 'timeout': timeout})

View File

@ -0,0 +1,331 @@
# Copyright 2017 VMware, Inc.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
from vmware_nsxlib.tests.unit.v3 import nsxlib_testcase
from vmware_nsxlib.v3 import client
from vmware_nsxlib.v3 import policy_constants
from vmware_nsxlib.v3 import policy_defs as policy
BASE_POLICY_URI = "https://1.2.3.4/api/v1/"
class TestPolicyApi(nsxlib_testcase.NsxClientTestCase):
def setUp(self):
self.client = self.new_mocked_client(client.NSX3Client,
url_prefix='api/v1/')
self.policy_api = policy.NsxPolicyApi(self.client)
super(TestPolicyApi, self).setUp()
def assert_json_call(self, method, client, url, data=None):
url = BASE_POLICY_URI + url
return super(TestPolicyApi, self).assert_json_call(
method, client, url, data=data)
class TestPolicyDomain(TestPolicyApi):
def test_create(self):
domain_def = policy.DomainDef(
'archaea',
'prokaryotic cells',
'typically characterized by membrane lipids')
self.policy_api.create(domain_def)
self.assert_json_call('PUT', self.client,
'infra/domains/archaea',
data=domain_def.get_obj_dict())
def test_delete(self):
domain_def = policy.DomainDef('bacteria')
self.policy_api.delete(domain_def)
self.assert_json_call('DELETE', self.client,
'infra/domains/bacteria')
def test_get(self):
domain_def = policy.DomainDef('eukarya')
self.policy_api.get(domain_def)
self.assert_json_call('GET', self.client,
'infra/domains/eukarya')
def test_list(self):
domain_def = policy.DomainDef()
self.policy_api.list(domain_def)
self.assert_json_call('GET', self.client, 'infra/domains')
class TestPolicyGroup(TestPolicyApi):
def test_create(self):
group_def = policy.GroupDef(
'eukarya',
'cats',
'felis catus')
self.policy_api.create(group_def)
self.assert_json_call('PUT', self.client,
'infra/domains/eukarya/groups/cats',
data=group_def.get_obj_dict())
def test_create_with_domain(self):
domain_def = policy.DomainDef('eukarya',
'eukarya',
'dude with cell membranes')
group_def = policy.GroupDef('eukarya',
'cats',
'Ailuropoda melanoleuca')
self.policy_api.create_with_parent(domain_def, group_def)
data = domain_def.get_obj_dict()
data['groups'] = [group_def.get_obj_dict()]
self.assert_json_call('PUT', self.client,
'infra/domains/eukarya',
data=data)
def test_create_with_single_tag(self):
domain_def = policy.DomainDef('eukarya')
group_def = policy.GroupDef('eukarya', 'dogs',
conditions=policy.Condition('spaniel'))
self.policy_api.create_with_parent(domain_def, group_def)
data = domain_def.get_obj_dict()
data['groups'] = [group_def.get_obj_dict()]
# validate body structure and defaults
expected_condition = {'value': 'spaniel',
'operator': 'EQUALS',
'member_type': 'LogicalPort',
'resource_type': 'Condition',
'key': 'Tag'}
expected_group = {'id': 'dogs',
'display_name': None,
'description': None,
'expression': [expected_condition],
'_revision': 0}
expected_data = {'id': 'eukarya',
'display_name': None,
'description': None,
'groups': [expected_group],
'_revision': 0}
self.assert_json_call('PUT', self.client,
'infra/domains/eukarya',
data=expected_data)
def test_create_with_multi_tag(self):
domain_def = policy.DomainDef('eukarya')
pines = policy.Condition(
'pine',
operator=policy_constants.CONDITION_OP_CONTAINS)
maples = policy.Condition(
'maple',
operator=policy_constants.CONDITION_OP_STARTS_WITH)
group_def = policy.GroupDef('eukarya', 'trees',
conditions=[pines, maples])
self.policy_api.create_with_parent(domain_def, group_def)
data = domain_def.get_obj_dict()
data['groups'] = [group_def.get_obj_dict()]
self.assert_json_call('PUT', self.client,
'infra/domains/eukarya',
data=data)
def test_delete(self):
group_def = policy.GroupDef(domain_id='eukarya', group_id='giraffe')
self.policy_api.delete(group_def)
self.assert_json_call('DELETE', self.client,
'infra/domains/eukarya/groups/giraffe')
class TestPolicyService(TestPolicyApi):
def test_create(self):
service_def = policy.ServiceDef('roomservice')
self.policy_api.create(service_def)
self.assert_json_call('PUT', self.client,
'infra/services/roomservice',
data=service_def.get_obj_dict())
def test_create_with_parent(self):
service_def = policy.ServiceDef('roomservice')
entry_def = policy.L4ServiceEntryDef('roomservice',
'http',
name='room http',
dest_ports=[80, 8080])
self.policy_api.create_with_parent(service_def, entry_def)
expected_entry = {'id': 'http',
'resource_type': 'L4PortSetServiceEntry',
'display_name': 'room http',
'description': None,
'l4_protocol': 'TCP',
'destination_ports': [80, 8080],
'_revision': 0}
expected_data = {'id': 'roomservice',
'display_name': None,
'description': None,
'service_entries': [expected_entry],
'_revision': 0}
self.assert_json_call('PUT', self.client,
'infra/services/roomservice',
data=expected_data)
class TestPolicyCommunicationProfile(TestPolicyApi):
def test_create(self):
profile_def = policy.CommunicationProfileDef('rental')
self.policy_api.create(profile_def)
self.assert_json_call('PUT', self.client,
'infra/communication-profiles/rental',
data=profile_def.get_obj_dict())
def test_create_with_parent(self):
profile_def = policy.CommunicationProfileDef('rental')
entry_def = policy.CommunicationProfileEntryDef(
'rental',
'room1',
description='includes roomservice',
services=["roomservice"])
self.policy_api.create_with_parent(profile_def, entry_def)
expected_entry = {'id': 'room1',
'display_name': None,
'description': 'includes roomservice',
'services': ["roomservice"],
'action': 'ALLOW',
'_revision': 0}
expected_data = {'id': 'rental',
'display_name': None,
'description': None,
'communication_profile_entries': [expected_entry],
'_revision': 0}
self.assert_json_call('PUT', self.client,
'infra/communication-profiles/rental',
data=expected_data)
class TestPolicyCommunicationMap(TestPolicyApi):
def setUp(self):
super(TestPolicyCommunicationMap, self).setUp()
self.entry1 = policy.CommunicationMapEntryDef(
'd1', 'cm1',
sequence_number=12,
source_groups=["group1",
"group2"],
dest_groups=["group1"],
profile_id="profile1")
self.entry2 = policy.CommunicationMapEntryDef(
'd1', 'cm2',
sequence_number=13,
source_groups=["group1",
"group2"],
dest_groups=["group3"],
profile_id="profile2")
self.expected_data1 = {'_revision': 0,
'id': 'cm1',
'display_name': None,
'description': None,
'sequence_number': 12,
'source_groups':
['/infra/domains/d1/groups/group1',
'/infra/domains/d1/groups/group2'],
'destination_groups':
['/infra/domains/d1/groups/group1'],
'communication_profile_path':
'/infra/communication-profiles/profile1'}
self.expected_data2 = {'_revision': 0,
'id': 'cm2',
'display_name': None,
'description': None,
'sequence_number': 13,
'source_groups':
['/infra/domains/d1/groups/group1',
'/infra/domains/d1/groups/group2'],
'destination_groups':
['/infra/domains/d1/groups/group3'],
'communication_profile_path':
'/infra/communication-profiles/profile2'}
def test_create_with_one_entry(self):
map_def = policy.CommunicationMapDef('d1')
self.policy_api.create_with_parent(map_def, self.entry1)
expected_data = map_def.get_obj_dict()
expected_data['communication_entries'] = [self.expected_data1]
self.assert_json_call('PUT', self.client,
'infra/domains/d1/communication-map',
data=expected_data)
def test_create_with_two_entries(self):
map_def = policy.CommunicationMapDef('d1')
self.policy_api.create_with_parent(map_def,
[self.entry1, self.entry2])
expected_data = map_def.get_obj_dict()
expected_data['communication_entries'] = [self.expected_data1,
self.expected_data2]
self.assert_json_call('PUT', self.client,
'infra/domains/d1/communication-map',
data=expected_data)
def test_update_entry(self):
self.policy_api.create(self.entry1)
self.assert_json_call('PUT', self.client,
'infra/domains/d1/communication-map/'
'communication-entries/cm1',
data=self.expected_data1)
def test_delete_entry(self):
self.policy_api.delete(self.entry2)
self.assert_json_call('DELETE', self.client,
'infra/domains/d1/communication-map/'
'communication-entries/cm2')
class TestPolicyEnforcementPoint(TestPolicyApi):
def test_create(self):
ep_def = policy.EnforcementPointDef('ep1', name='The Point',
ip_address='1.1.1.1',
username='admin',
password='a')
self.policy_api.create(ep_def)
ep_path = policy.EnforcementPointDef('ep1').get_resource_path()
self.assert_json_call('PUT', self.client,
ep_path,
data=ep_def.get_obj_dict())
class TestPolicyDeploymentMap(TestPolicyApi):
def test_create(self):
map_def = policy.DeploymentMapDef('dm1', domain_id='d1', ep_id='ep1')
self.policy_api.create(map_def)
ep_path = policy.EnforcementPointDef('ep1').get_resource_full_path()
expected_data = {'_revision': 0,
'id': 'dm1',
'display_name': None,
'description': None,
'domain_path': '/infra/domains/d1',
'enforcement_point_paths': [ep_path]}
self.assert_json_call('PUT', self.client,
'infra/domaindeploymentmap/dm1',
data=expected_data)

View File

@ -0,0 +1,969 @@
# Copyright 2017 VMware, Inc.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import mock
import unittest
from vmware_nsxlib.tests.unit.v3 import nsxlib_testcase
from vmware_nsxlib import v3
from vmware_nsxlib.v3 import policy_constants
from vmware_nsxlib.v3 import policy_defs
TEST_TENANT = 'test'
class NsxPolicyLibTestCase(unittest.TestCase):
def setUp(self, *args, **kwargs):
super(NsxPolicyLibTestCase, self).setUp()
nsxlib_config = nsxlib_testcase.get_default_nsxlib_config()
self.policy_lib = v3.NsxPolicyLib(nsxlib_config)
self.policy_api = self.policy_lib.policy_api
self.maxDiff = None
def _compare_def(self, expected_def, actual_def):
# verify the resource definition class
self.assertEqual(expected_def.__class__, actual_def.__class__)
# verify the resource definition tenant
self.assertEqual(expected_def.tenant, actual_def.tenant)
# verify the resource definition values
self.assertEqual(expected_def.get_obj_dict(),
actual_def.get_obj_dict())
def assert_called_with_def(self, mock_api, expected_def, call_num=0):
# verify the api was called
mock_api.assert_called()
actual_def = mock_api.call_args_list[call_num][0][0]
self._compare_def(expected_def, actual_def)
def assert_called_with_defs(self, mock_api, expected_defs, call_num=0):
# verify the api & first resource definition
self.assert_called_with_def(mock_api, expected_defs[0],
call_num=call_num)
# compare the 2nd resource definition class & values
actual_def = mock_api.call_args_list[call_num][0][1]
expected_def = expected_defs[1]
self._compare_def(expected_def, actual_def)
def assert_called_with_def_and_dict(self, mock_api,
expected_def, expected_dict,
call_num=0):
# verify the api & resource definition
self.assert_called_with_def(mock_api, expected_def,
call_num=call_num)
# compare the 2nd api parameter which is a dictionary
actual_dict = mock_api.call_args_list[call_num][0][0].body
self.assertEqual(expected_dict, actual_dict)
class TestPolicyDomain(NsxPolicyLibTestCase):
def setUp(self, *args, **kwargs):
super(TestPolicyDomain, self).setUp()
self.resourceApi = self.policy_lib.domain
def test_create_with_id(self):
name = 'd1'
description = 'desc'
id = '111'
with mock.patch.object(self.policy_api, "create") as api_call:
self.resourceApi.create(name,
domain_id=id,
description=description,
tenant=TEST_TENANT)
expected_def = policy_defs.DomainDef(domain_id=id,
name=name,
description=description,
tenant=TEST_TENANT)
self.assert_called_with_def(api_call, expected_def)
def test_create_without_id(self):
name = 'd1'
description = 'desc'
with mock.patch.object(self.policy_api, "create") as api_call:
self.resourceApi.create(name, description=description,
tenant=TEST_TENANT)
expected_def = policy_defs.DomainDef(domain_id=mock.ANY,
name=name,
description=description,
tenant=TEST_TENANT)
self.assert_called_with_def(api_call, expected_def)
def test_delete(self):
id = '111'
with mock.patch.object(self.policy_api, "delete") as api_call:
self.resourceApi.delete(id, tenant=TEST_TENANT)
expected_def = policy_defs.DomainDef(domain_id=id,
tenant=TEST_TENANT)
self.assert_called_with_def(api_call, expected_def)
def test_get(self):
id = '111'
with mock.patch.object(self.policy_api, "get") as api_call:
self.resourceApi.get(id, tenant=TEST_TENANT)
expected_def = policy_defs.DomainDef(domain_id=id,
tenant=TEST_TENANT)
self.assert_called_with_def(api_call, expected_def)
def test_get_by_name(self):
name = 'd1'
with mock.patch.object(
self.policy_api, "list",
return_value={'results': [{'display_name': name}]}) as api_call:
obj = self.resourceApi.get_by_name(name, tenant=TEST_TENANT)
self.assertIsNotNone(obj)
expected_def = policy_defs.DomainDef(tenant=TEST_TENANT)
self.assert_called_with_def(api_call, expected_def)
def test_list(self):
with mock.patch.object(self.policy_api, "list") as api_call:
self.resourceApi.list(tenant=TEST_TENANT)
expected_def = policy_defs.DomainDef(tenant=TEST_TENANT)
self.assert_called_with_def(api_call, expected_def)
def test_update(self):
id = '111'
name = 'new name'
description = 'new desc'
with mock.patch.object(self.policy_api, "get",
return_value={}) as get_call,\
mock.patch.object(self.policy_api, "update") as update_call:
self.resourceApi.update(id,
name=name,
description=description,
tenant=TEST_TENANT)
expected_def = policy_defs.DomainDef(domain_id=id,
tenant=TEST_TENANT)
expected_dict = {'display_name': name,
'description': description}
self.assert_called_with_def(get_call, expected_def)
self.assert_called_with_def_and_dict(
update_call, expected_def, expected_dict)
class TestPolicyGroup(NsxPolicyLibTestCase):
def setUp(self, *args, **kwargs):
super(TestPolicyGroup, self).setUp()
self.resourceApi = self.policy_lib.group
def test_create_with_id(self):
domain_id = '111'
name = 'g1'
description = 'desc'
id = '222'
with mock.patch.object(self.policy_api, "create") as api_call:
self.resourceApi.create(name,
domain_id,
group_id=id,
description=description,
tenant=TEST_TENANT)
expected_def = policy_defs.GroupDef(domain_id=domain_id,
group_id=id,
name=name,
description=description,
conditions=[],
tenant=TEST_TENANT)
self.assert_called_with_def(api_call, expected_def)
def test_create_without_id(self):
domain_id = '111'
name = 'g1'
description = 'desc'
with mock.patch.object(self.policy_api, "create") as api_call:
self.resourceApi.create(name, domain_id, description=description,
tenant=TEST_TENANT)
expected_def = policy_defs.GroupDef(domain_id=domain_id,
group_id=mock.ANY,
name=name,
description=description,
conditions=[],
tenant=TEST_TENANT)
self.assert_called_with_def(api_call, expected_def)
def test_create_with_condition(self):
domain_id = '111'
name = 'g1'
description = 'desc'
cond_val = '123'
cond_op = policy_constants.CONDITION_OP_EQUALS
cond_member_type = policy_constants.CONDITION_MEMBER_NET
cond_key = policy_constants.CONDITION_KEY_TAG
with mock.patch.object(self.policy_api, "create") as api_call:
self.resourceApi.create(
name, domain_id, description=description,
cond_val=cond_val,
cond_op=cond_op,
cond_member_type=cond_member_type,
cond_key=cond_key,
tenant=TEST_TENANT)
exp_cond = policy_defs.Condition(value=cond_val,
key=cond_key,
operator=cond_op,
member_type=cond_member_type)
expected_def = policy_defs.GroupDef(domain_id=domain_id,
group_id=mock.ANY,
name=name,
description=description,
conditions=[exp_cond],
tenant=TEST_TENANT)
self.assert_called_with_def(api_call, expected_def)
def test_delete(self):
domain_id = '111'
id = '222'
with mock.patch.object(self.policy_api, "delete") as api_call:
self.resourceApi.delete(domain_id, id, tenant=TEST_TENANT)
expected_def = policy_defs.GroupDef(domain_id=domain_id,
group_id=id,
tenant=TEST_TENANT)
self.assert_called_with_def(api_call, expected_def)
def test_get(self):
domain_id = '111'
id = '222'
with mock.patch.object(self.policy_api, "get") as api_call:
self.resourceApi.get(domain_id, id, tenant=TEST_TENANT)
expected_def = policy_defs.GroupDef(domain_id=domain_id,
group_id=id,
tenant=TEST_TENANT)
self.assert_called_with_def(api_call, expected_def)
def test_get_by_name(self):
domain_id = '111'
name = 'g1'
with mock.patch.object(
self.policy_api, "list",
return_value={'results': [{'display_name': name}]}) as api_call:
obj = self.resourceApi.get_by_name(domain_id, name,
tenant=TEST_TENANT)
self.assertIsNotNone(obj)
expected_def = policy_defs.GroupDef(domain_id, tenant=TEST_TENANT)
self.assert_called_with_def(api_call, expected_def)
def test_list(self):
domain_id = '111'
with mock.patch.object(self.policy_api, "list") as api_call:
self.resourceApi.list(domain_id, tenant=TEST_TENANT)
expected_def = policy_defs.GroupDef(domain_id=domain_id,
tenant=TEST_TENANT)
self.assert_called_with_def(api_call, expected_def)
def test_update(self):
domain_id = '111'
id = '222'
name = 'new name'
description = 'new desc'
with mock.patch.object(self.policy_api, "get",
return_value={}) as get_call,\
mock.patch.object(self.policy_api, "update") as update_call:
self.resourceApi.update(domain_id, id,
name=name,
description=description,
tenant=TEST_TENANT)
expected_def = policy_defs.GroupDef(domain_id=domain_id,
group_id=id,
tenant=TEST_TENANT)
expected_dict = {'display_name': name,
'description': description}
self.assert_called_with_def(get_call, expected_def)
self.assert_called_with_def_and_dict(
update_call, expected_def, expected_dict)
def test_update_condition(self):
domain_id = '111'
id = '222'
cond_val = '123'
with mock.patch.object(self.policy_api, "get",
return_value={}) as get_call,\
mock.patch.object(self.policy_api, "update") as update_call:
self.resourceApi.update_condition(domain_id, id,
cond_val=cond_val,
tenant=TEST_TENANT)
expected_def = policy_defs.GroupDef(domain_id=domain_id,
group_id=id,
tenant=TEST_TENANT)
exp_cond = {'resource_type': 'Condition',
'member_type': policy_constants.CONDITION_MEMBER_PORT,
'key': policy_constants.CONDITION_KEY_TAG,
'value': cond_val,
'operator': policy_constants.CONDITION_OP_EQUALS}
expected_dict = {'expression': [exp_cond]}
self.assert_called_with_def(get_call, expected_def)
self.assert_called_with_def_and_dict(
update_call, expected_def, expected_dict)
def test_remove_condition(self):
domain_id = '111'
id = '222'
old_cond = {'resource_type': 'Condition',
'member_type': policy_constants.CONDITION_MEMBER_PORT,
'key': policy_constants.CONDITION_KEY_TAG,
'value': 'abc',
'operator': policy_constants.CONDITION_OP_EQUALS}
with mock.patch.object(self.policy_api, "get",
return_value={'expression': [old_cond]}) as get_call,\
mock.patch.object(self.policy_api, "update") as update_call:
self.resourceApi.update_condition(domain_id, id,
cond_val=None,
tenant=TEST_TENANT)
expected_def = policy_defs.GroupDef(domain_id=domain_id,
group_id=id,
tenant=TEST_TENANT)
expected_dict = {'expression': []}
self.assert_called_with_def(get_call, expected_def)
self.assert_called_with_def_and_dict(
update_call, expected_def, expected_dict)
class TestPolicyService(NsxPolicyLibTestCase):
def setUp(self, *args, **kwargs):
super(TestPolicyService, self).setUp()
self.resourceApi = self.policy_lib.service
def test_create(self):
name = 's1'
description = 'desc'
protocol = policy_constants.TCP
dest_ports = [81, 82]
with mock.patch.object(self.policy_api,
"create_with_parent") as api_call:
self.resourceApi.create(name, description=description,
protocol=protocol, dest_ports=dest_ports,
tenant=TEST_TENANT)
exp_srv_def = policy_defs.ServiceDef(service_id=mock.ANY,
name=name,
description=description,
tenant=TEST_TENANT)
exp_entry_def = policy_defs.L4ServiceEntryDef(
service_id=mock.ANY,
name=name,
description=description,
protocol=protocol,
dest_ports=dest_ports,
tenant=TEST_TENANT)
self.assert_called_with_defs(
api_call, [exp_srv_def, exp_entry_def])
def test_delete(self):
id = '111'
with mock.patch.object(self.policy_api, "delete") as api_call:
self.resourceApi.delete(id, tenant=TEST_TENANT)
expected_def = policy_defs.ServiceDef(service_id=id,
tenant=TEST_TENANT)
self.assert_called_with_def(api_call, expected_def)
def test_get(self):
id = '111'
with mock.patch.object(self.policy_api, "get") as api_call:
self.resourceApi.get(id, tenant=TEST_TENANT)
expected_def = policy_defs.ServiceDef(service_id=id,
tenant=TEST_TENANT)
self.assert_called_with_def(api_call, expected_def)
def test_get_by_name(self):
name = 's1'
with mock.patch.object(
self.policy_api, "list",
return_value={'results': [{'display_name': name}]}) as api_call:
obj = self.resourceApi.get_by_name(name, tenant=TEST_TENANT)
self.assertIsNotNone(obj)
expected_def = policy_defs.ServiceDef(tenant=TEST_TENANT)
self.assert_called_with_def(api_call, expected_def)
def test_list(self):
with mock.patch.object(self.policy_api, "list") as api_call:
self.resourceApi.list(tenant=TEST_TENANT)
expected_def = policy_defs.ServiceDef(tenant=TEST_TENANT)
self.assert_called_with_def(api_call, expected_def)
def test_update(self):
id = '111'
name = 'new name'
description = 'new desc'
with mock.patch.object(self.policy_api, "get",
return_value={}) as get_call,\
mock.patch.object(self.policy_api, "update") as update_call:
self.resourceApi.update(id,
name=name,
description=description,
tenant=TEST_TENANT)
expected_def = policy_defs.ServiceDef(service_id=id,
tenant=TEST_TENANT)
expected_dict = {'display_name': name,
'description': description,
'service_entries': []}
self.assert_called_with_def(get_call, expected_def)
self.assert_called_with_def_and_dict(
update_call, expected_def, expected_dict)
def test_update_entry(self):
id = '111'
protocol = 'udp'
dest_ports = [555]
service_entry_id = '222'
service_entry = {'id': service_entry_id}
with mock.patch.object(
self.policy_api, "get",
return_value={'service_entries': [service_entry]}) as get_call,\
mock.patch.object(self.policy_api, "update") as update_call:
self.resourceApi.update(id,
protocol=protocol,
dest_ports=dest_ports,
tenant=TEST_TENANT)
# get will be called for the entire service
expected_def = policy_defs.ServiceDef(service_id=id,
tenant=TEST_TENANT)
self.assert_called_with_def(get_call, expected_def)
# update will be called for the service entry only
expected_entry_def = policy_defs.L4ServiceEntryDef(
service_id=id,
service_entry_id=service_entry_id,
tenant=TEST_TENANT)
expected_entry_dict = {'id': service_entry_id,
'l4_protocol': protocol.upper(),
'destination_ports': dest_ports}
self.assert_called_with_def_and_dict(
update_call, expected_entry_def, expected_entry_dict)
def test_update_all(self):
id = '111'
name = 'new name'
description = 'new desc'
protocol = 'udp'
dest_ports = [555]
service_entry_id = '222'
service_entry = {'id': service_entry_id}
with mock.patch.object(
self.policy_api, "get",
return_value={'service_entries': [service_entry]}) as get_call,\
mock.patch.object(self.policy_api, "update") as update_call,\
mock.patch.object(self.policy_api, "list",
return_value={'results': []}):
self.resourceApi.update(id,
name=name,
description=description,
protocol=protocol,
dest_ports=dest_ports,
tenant=TEST_TENANT)
# get will be called for the entire service
expected_def = policy_defs.ServiceDef(service_id=id,
tenant=TEST_TENANT)
self.assert_called_with_def(get_call, expected_def)
# update will be called for the service and entry (2 calls)
expected_dict = {'display_name': name,
'description': description,
'service_entries': [service_entry]}
self.assert_called_with_def_and_dict(
update_call, expected_def, expected_dict)
expected_entry_def = policy_defs.L4ServiceEntryDef(
service_id=id,
service_entry_id=service_entry_id,
tenant=TEST_TENANT)
expected_entry_dict = {'id': service_entry_id,
'display_name': name,
'description': description,
'l4_protocol': protocol.upper(),
'destination_ports': dest_ports}
self.assert_called_with_def_and_dict(
update_call, expected_entry_def, expected_entry_dict,
call_num=1)
class TestPolicyCommunicationProfile(NsxPolicyLibTestCase):
def setUp(self, *args, **kwargs):
super(TestPolicyCommunicationProfile, self).setUp()
self.resourceApi = self.policy_lib.comm_profile
def test_create(self):
name = 'c1'
description = 'desc'
service_id = '333'
action = 'DENY'
with mock.patch.object(self.policy_api,
"create_with_parent") as api_call:
self.resourceApi.create(name, description=description,
services=[service_id], action=action,
tenant=TEST_TENANT)
exp_srv_def = policy_defs.CommunicationProfileDef(
profile_id=mock.ANY,
name=name,
description=description,
tenant=TEST_TENANT)
exp_entry_def = policy_defs.CommunicationProfileEntryDef(
profile_id=mock.ANY,
name=name,
description=description,
services=[service_id],
action=action,
tenant=TEST_TENANT)
self.assert_called_with_defs(
api_call, [exp_srv_def, exp_entry_def])
def test_delete(self):
id = '111'
with mock.patch.object(self.policy_api, "delete") as api_call:
self.resourceApi.delete(id, tenant=TEST_TENANT)
expected_def = policy_defs.CommunicationProfileDef(
profile_id=id, tenant=TEST_TENANT)
self.assert_called_with_def(api_call, expected_def)
def test_get(self):
id = '111'
with mock.patch.object(self.policy_api, "get") as api_call:
self.resourceApi.get(id, tenant=TEST_TENANT)
expected_def = policy_defs.CommunicationProfileDef(
profile_id=id, tenant=TEST_TENANT)
self.assert_called_with_def(api_call, expected_def)
def test_get_by_name(self):
name = 'c1'
with mock.patch.object(
self.policy_api, "list",
return_value={'results': [{'display_name': name}]}) as api_call:
obj = self.resourceApi.get_by_name(name, tenant=TEST_TENANT)
self.assertIsNotNone(obj)
expected_def = policy_defs.CommunicationProfileDef(
tenant=TEST_TENANT)
self.assert_called_with_def(api_call, expected_def)
def test_list(self):
with mock.patch.object(self.policy_api, "list") as api_call:
self.resourceApi.list(tenant=TEST_TENANT)
expected_def = policy_defs.CommunicationProfileDef(
tenant=TEST_TENANT)
self.assert_called_with_def(api_call, expected_def)
def test_update(self):
id = '111'
name = 'new name'
description = 'new desc'
with mock.patch.object(self.policy_api, "get",
return_value={}) as get_call,\
mock.patch.object(self.policy_api, "update") as update_call:
self.resourceApi.update(id,
name=name,
description=description,
tenant=TEST_TENANT)
expected_def = policy_defs.CommunicationProfileDef(
profile_id=id, tenant=TEST_TENANT)
expected_dict = {'display_name': name,
'description': description,
'communication_profile_entries': []}
self.assert_called_with_def(get_call, expected_def)
self.assert_called_with_def_and_dict(
update_call, expected_def, expected_dict)
def test_update_entry(self):
id = '111'
service_id = '333'
action = 'deny'
entry_id = '222'
profile_entry = {'id': entry_id}
entries_dict = {'communication_profile_entries': [profile_entry]}
with mock.patch.object(
self.policy_api, "get", return_value=entries_dict) as get_call,\
mock.patch.object(self.policy_api, "update") as update_call:
self.resourceApi.update(id,
services=[service_id],
action=action,
tenant=TEST_TENANT)
# get will be called for the entire service
expected_def = policy_defs.CommunicationProfileDef(
profile_id=id, tenant=TEST_TENANT)
self.assert_called_with_def(get_call, expected_def)
# update will be called for the service entry only
expected_entry_def = policy_defs.CommunicationProfileEntryDef(
profile_id=id,
profile_entry_id=entry_id,
tenant=TEST_TENANT)
expected_entry_dict = {'id': entry_id,
'action': action.upper(),
'services': [service_id]}
self.assert_called_with_def_and_dict(
update_call, expected_entry_def, expected_entry_dict)
def test_update_all(self):
id = '111'
name = 'new name'
description = 'new desc'
service_id = '333'
action = 'deny'
entry_id = '222'
profile_entry = {'id': entry_id}
entries_dict = {'communication_profile_entries': [profile_entry]}
with mock.patch.object(
self.policy_api, "get", return_value=entries_dict) as get_call,\
mock.patch.object(self.policy_api, "update") as update_call:
self.resourceApi.update(id,
name=name,
description=description,
services=[service_id],
action=action,
tenant=TEST_TENANT)
# get will be called for the entire service
expected_def = policy_defs.CommunicationProfileDef(
profile_id=id, tenant=TEST_TENANT)
self.assert_called_with_def(get_call, expected_def)
# update will be called for the service and entry (2 calls)
expected_dict = {'display_name': name,
'description': description,
'communication_profile_entries': [profile_entry]}
self.assert_called_with_def_and_dict(
update_call, expected_def, expected_dict)
expected_entry_def = policy_defs.CommunicationProfileEntryDef(
profile_id=id,
profile_entry_id=entry_id,
tenant=TEST_TENANT)
expected_entry_dict = {'id': entry_id,
'display_name': name,
'description': description,
'action': action.upper(),
'services': [service_id]}
self.assert_called_with_def_and_dict(
update_call, expected_entry_def, expected_entry_dict,
call_num=1)
class TestPolicyCommunicationMap(NsxPolicyLibTestCase):
def setUp(self, *args, **kwargs):
super(TestPolicyCommunicationMap, self).setUp()
self.resourceApi = self.policy_lib.comm_map
def test_create(self):
domain_id = '111'
name = 'cm1'
description = 'desc'
source_group = 'g1'
dest_group = 'g2'
seq_num = 7
profile_id = 'c1'
list_return_value = {'results': [{'sequence_number': 1}]}
with mock.patch.object(self.policy_api, "create") as api_call,\
mock.patch.object(self.policy_api, "list",
return_value=list_return_value):
self.resourceApi.create(name, domain_id, description=description,
sequence_number=seq_num,
profile_id=profile_id,
source_groups=[source_group],
dest_groups=[dest_group],
tenant=TEST_TENANT)
expected_def = policy_defs.CommunicationMapEntryDef(
domain_id=domain_id,
map_id=mock.ANY,
name=name,
description=description,
sequence_number=seq_num,
profile_id=profile_id,
source_groups=[source_group],
dest_groups=[dest_group],
tenant=TEST_TENANT)
self.assert_called_with_def(api_call, expected_def)
def test_create_without_seqnum(self):
domain_id = '111'
name = 'cm1'
description = 'desc'
source_group = 'g1'
dest_group = 'g2'
profile_id = 'c1'
with mock.patch.object(self.policy_api,
"create_with_parent") as api_call, \
mock.patch.object(self.resourceApi, "list", return_value=[]):
self.resourceApi.create(name, domain_id, description=description,
profile_id=profile_id,
source_groups=[source_group],
dest_groups=[dest_group],
tenant=TEST_TENANT)
expected_map_def = policy_defs.CommunicationMapDef(
domain_id=domain_id,
tenant=TEST_TENANT)
expected_entry_def = policy_defs.CommunicationMapEntryDef(
domain_id=domain_id,
map_id=mock.ANY,
name=name,
description=description,
sequence_number=1,
profile_id=profile_id,
source_groups=[source_group],
dest_groups=[dest_group],
tenant=TEST_TENANT)
self.assert_called_with_defs(
api_call,
[expected_map_def, expected_entry_def])
def test_delete(self):
domain_id = '111'
id = '222'
with mock.patch.object(self.policy_api, "delete") as api_call:
self.resourceApi.delete(domain_id, id, tenant=TEST_TENANT)
expected_def = policy_defs.CommunicationMapEntryDef(
domain_id=domain_id,
map_id=id,
tenant=TEST_TENANT)
self.assert_called_with_def(api_call, expected_def)
def test_get(self):
domain_id = '111'
id = '222'
with mock.patch.object(self.policy_api, "get") as api_call:
self.resourceApi.get(domain_id, id, tenant=TEST_TENANT)
expected_def = policy_defs.CommunicationMapEntryDef(
domain_id=domain_id,
map_id=id,
tenant=TEST_TENANT)
self.assert_called_with_def(api_call, expected_def)
def test_get_by_name(self):
domain_id = '111'
name = 'cm1'
with mock.patch.object(
self.policy_api, "list",
return_value={'results': [{'display_name': name}]}) as api_call:
obj = self.resourceApi.get_by_name(domain_id, name,
tenant=TEST_TENANT)
self.assertIsNotNone(obj)
expected_def = policy_defs.CommunicationMapDef(domain_id,
tenant=TEST_TENANT)
self.assert_called_with_def(api_call, expected_def)
def test_list(self):
domain_id = '111'
with mock.patch.object(self.policy_api, "list") as api_call:
self.resourceApi.list(domain_id, tenant=TEST_TENANT)
expected_def = policy_defs.CommunicationMapDef(domain_id=domain_id,
tenant=TEST_TENANT)
self.assert_called_with_def(api_call, expected_def)
def test_update(self):
domain_id = '111'
id = '222'
name = 'new name'
description = 'new desc'
source_group = 'ng1'
dest_group = 'ng2'
profile_id = 'nc1'
with mock.patch.object(self.policy_api, "get",
return_value={}) as get_call,\
mock.patch.object(self.policy_api, "update") as update_call:
self.resourceApi.update(domain_id, id,
name=name,
description=description,
profile_id=profile_id,
source_groups=[source_group],
dest_groups=[dest_group],
tenant=TEST_TENANT)
expected_def = policy_defs.CommunicationMapEntryDef(
domain_id=domain_id,
map_id=id,
tenant=TEST_TENANT)
sgroup_path = "/%s/domains/%s/groups/%s" % (
TEST_TENANT, domain_id, source_group)
dgroup_path = "/%s/domains/%s/groups/%s" % (
TEST_TENANT, domain_id, dest_group)
profile_path = "/%s/communication-profiles/%s" % (
TEST_TENANT, profile_id)
expected_dict = {'display_name': name,
'description': description,
'communication_profile_path': profile_path,
'source_groups': [sgroup_path],
'destination_groups': [dgroup_path]}
self.assert_called_with_def(get_call, expected_def)
self.assert_called_with_def_and_dict(
update_call, expected_def, expected_dict)
class TestPolicyEnforcementPoint(NsxPolicyLibTestCase):
def setUp(self, *args, **kwargs):
super(TestPolicyEnforcementPoint, self).setUp()
self.resourceApi = self.policy_lib.enforcement_point
def test_create(self):
name = 'ep'
description = 'desc'
ip_address = '1.1.1.1'
username = 'admin'
password = 'zzz'
with mock.patch.object(self.policy_api, "create") as api_call:
self.resourceApi.create(name, description=description,
ip_address=ip_address,
username=username,
password=password,
tenant=TEST_TENANT)
expected_def = policy_defs.EnforcementPointDef(
ep_id=mock.ANY,
name=name,
description=description,
ip_address=ip_address,
username=username,
password=password,
tenant=TEST_TENANT)
self.assert_called_with_def(api_call, expected_def)
def test_delete(self):
id = '111'
with mock.patch.object(self.policy_api, "delete") as api_call:
self.resourceApi.delete(id, tenant=TEST_TENANT)
expected_def = policy_defs.EnforcementPointDef(ep_id=id,
tenant=TEST_TENANT)
self.assert_called_with_def(api_call, expected_def)
def test_get(self):
id = '111'
with mock.patch.object(self.policy_api, "get") as api_call:
self.resourceApi.get(id, tenant=TEST_TENANT)
expected_def = policy_defs.EnforcementPointDef(ep_id=id,
tenant=TEST_TENANT)
self.assert_called_with_def(api_call, expected_def)
def test_get_by_name(self):
name = 'ep1'
with mock.patch.object(
self.policy_api, "list",
return_value={'results': [{'display_name': name}]}) as api_call:
obj = self.resourceApi.get_by_name(name, tenant=TEST_TENANT)
self.assertIsNotNone(obj)
expected_def = policy_defs.EnforcementPointDef(tenant=TEST_TENANT)
self.assert_called_with_def(api_call, expected_def)
def test_list(self):
with mock.patch.object(self.policy_api, "list") as api_call:
self.resourceApi.list(tenant=TEST_TENANT)
expected_def = policy_defs.EnforcementPointDef(tenant=TEST_TENANT)
self.assert_called_with_def(api_call, expected_def)
def test_update(self):
id = '111'
name = 'new name'
username = 'admin'
password = 'zzz'
with mock.patch.object(self.policy_api, "get",
return_value={}) as get_call,\
mock.patch.object(self.policy_api, "update") as update_call:
self.resourceApi.update(id,
name=name,
username=username,
password=password,
tenant=TEST_TENANT)
expected_def = policy_defs.EnforcementPointDef(ep_id=id,
tenant=TEST_TENANT)
expected_dict = {'display_name': name,
'username': username,
'password': password}
self.assert_called_with_def(get_call, expected_def)
self.assert_called_with_def_and_dict(
update_call, expected_def, expected_dict)
class TestPolicyDeploymentMap(NsxPolicyLibTestCase):
def setUp(self, *args, **kwargs):
super(TestPolicyDeploymentMap, self).setUp()
self.resourceApi = self.policy_lib.deployment_map
def test_create(self):
name = 'map1'
description = 'desc'
domain_id = 'domain1'
ep_id = 'ep1'
with mock.patch.object(self.policy_api, "create") as api_call:
self.resourceApi.create(name, description=description,
ep_id=ep_id,
domain_id=domain_id,
tenant=TEST_TENANT)
expected_def = policy_defs.DeploymentMapDef(
map_id=mock.ANY,
name=name,
description=description,
ep_id=ep_id,
domain_id=domain_id,
tenant=TEST_TENANT)
self.assert_called_with_def(api_call, expected_def)
def test_delete(self):
id = '111'
with mock.patch.object(self.policy_api, "delete") as api_call:
self.resourceApi.delete(id, tenant=TEST_TENANT)
expected_def = policy_defs.DeploymentMapDef(map_id=id,
tenant=TEST_TENANT)
self.assert_called_with_def(api_call, expected_def)
def test_get(self):
id = '111'
with mock.patch.object(self.policy_api, "get") as api_call:
self.resourceApi.get(id, tenant=TEST_TENANT)
expected_def = policy_defs.DeploymentMapDef(map_id=id,
tenant=TEST_TENANT)
self.assert_called_with_def(api_call, expected_def)
def test_get_by_name(self):
name = 'ep1'
with mock.patch.object(
self.policy_api, "list",
return_value={'results': [{'display_name': name}]}) as api_call:
obj = self.resourceApi.get_by_name(name, tenant=TEST_TENANT)
self.assertIsNotNone(obj)
expected_def = policy_defs.DeploymentMapDef(tenant=TEST_TENANT)
self.assert_called_with_def(api_call, expected_def)
def test_list(self):
with mock.patch.object(self.policy_api, "list") as api_call:
self.resourceApi.list(tenant=TEST_TENANT)
expected_def = policy_defs.DeploymentMapDef(tenant=TEST_TENANT)
self.assert_called_with_def(api_call, expected_def)
def test_update(self):
id = '111'
name = 'new name'
domain_id = 'domain2'
ep_id = 'ep2'
with mock.patch.object(self.policy_api, "get",
return_value={}) as get_call,\
mock.patch.object(self.policy_api, "update") as update_call:
self.resourceApi.update(id,
name=name,
ep_id=ep_id,
domain_id=domain_id,
tenant=TEST_TENANT)
expected_def = policy_defs.DeploymentMapDef(map_id=id,
tenant=TEST_TENANT)
domain_path = "/%s/domains/%s" % (TEST_TENANT, domain_id)
ep_path = ("/%s/deploymentzones/default-deployment-zone/"
"enforcementpoints/%s" % (TEST_TENANT, ep_id))
expected_dict = {'display_name': name,
'enforcement_point_paths': [ep_path],
'domain_path': domain_path}
self.assert_called_with_def(get_call, expected_def)
self.assert_called_with_def_and_dict(
update_call, expected_def, expected_dict)

View File

@ -24,6 +24,8 @@ from vmware_nsxlib.v3 import cluster
from vmware_nsxlib.v3 import core_resources
from vmware_nsxlib.v3 import exceptions
from vmware_nsxlib.v3 import native_dhcp
from vmware_nsxlib.v3 import policy_defs
from vmware_nsxlib.v3 import policy_resources
from vmware_nsxlib.v3 import security
from vmware_nsxlib.v3 import utils
@ -164,8 +166,19 @@ class NsxLib(NsxLibBase):
class NsxPolicyLib(NsxLibBase):
def init_api(self):
pass
self.policy_api = policy_defs.NsxPolicyApi(self.client)
self.domain = policy_resources.NsxPolicyDomainApi(self.policy_api)
self.group = policy_resources.NsxPolicyGroupApi(self.policy_api)
self.service = policy_resources.NsxPolicyL4ServiceApi(self.policy_api)
self.comm_profile = policy_resources.NsxPolicyCommunicationProfileApi(
self.policy_api)
self.comm_map = policy_resources.NsxPolicyCommunicationMapApi(
self.policy_api)
self.enforcement_point = policy_resources.NsxPolicyEnforcementPointApi(
self.policy_api)
self.deployment_map = policy_resources.NsxPolicyDeploymentMapApi(
self.policy_api)
@property
def keepalive_section(self):
return 'tenants'
return 'infra'

View File

@ -50,7 +50,7 @@ class RESTClient(object):
_VERB_RESP_CODES = {
'get': [requests.codes.ok],
'post': [requests.codes.created, requests.codes.ok],
'put': [requests.codes.ok],
'put': [requests.codes.created, requests.codes.ok],
'delete': [requests.codes.ok]
}

View File

@ -153,7 +153,8 @@ class NSXRequestsHTTPProvider(AbstractHTTPProvider):
client = nsx_client.NSX3Client(conn, url_prefix=endpoint.provider.url)
keepalive_section = cluster_api.nsxlib_config.keepalive_section
result = client.get(keepalive_section, silent=True)
if not result or result['result_count'] <= 0:
# If keeplive section returns a list, it is assumed to be non-empty
if not result or result.get('result_count', 1) <= 0:
msg = _("No %(section)s found "
"for '%(url)s'") % {'section': keepalive_section,
'url': endpoint.provider.url}

View File

@ -0,0 +1,34 @@
# Copyright 2017 VMware, Inc.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
TCP = 'TCP'
UDP = 'UDP'
POLICY_INFRA_TENANT = 'infra'
ACTION_ALLOW = 'ALLOW'
ACTION_DENY = 'DENY'
ANY_GROUP = "ANY"
CONDITION_KEY_TAG = 'Tag'
CONDITION_MEMBER_VM = 'VirtualMachine'
CONDITION_MEMBER_PORT = 'LogicalPort'
CONDITION_MEMBER_NET = 'LogicalSwitch'
CONDITION_OP_EQUALS = 'EQUALS'
CONDITION_OP_CONTAINS = 'CONTAINS'
CONDITION_OP_STARTS_WITH = 'STARTSWITH'
DEFAULT_THUMBPRINT = 'abc'

View File

@ -0,0 +1,555 @@
# Copyright 2017 VMware, Inc.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import abc
import six
from vmware_nsxlib.v3 import policy_constants
TENANTS_PATH_PATTERN = "%s/"
DOMAINS_PATH_PATTERN = TENANTS_PATH_PATTERN + "domains/"
COMM_PROF_PATH_PATTERN = TENANTS_PATH_PATTERN + "communication-profiles/"
SERVICES_PATH_PATTERN = TENANTS_PATH_PATTERN + "services/"
@six.add_metaclass(abc.ABCMeta)
class ResourceDef(object):
def __init__(self):
self.tenant = None
self.id = None
self.name = None
self.description = None
self.parent_ids = None
self.body = {}
def get_obj_dict(self):
body = {'_revision': 0,
'display_name': self.name,
'description': self.description}
if self.id:
body['id'] = self.id
return body
@abc.abstractproperty
def path_pattern(self):
pass
def get_section_path(self):
return self.path_pattern % self.parent_ids
def get_resource_path(self):
if self.id:
return self.get_section_path() + self.id
return self.get_section_path()
def get_resource_full_path(self):
return '/' + self.get_resource_path()
@property
def get_last_section_dict_key(self):
last_section = self.path_pattern.split("/")[-2]
return last_section.replace('-', '_')
@staticmethod
def sub_entries_path():
pass
def update_attributes_in_body(self, body, **kwargs):
self.body = body
for key, value in six.iteritems(kwargs):
if value is not None:
if key == 'name':
self.body['display_name'] = value
else:
self.body[key] = value
entries_path = self.sub_entries_path()
# make sure service entries are there
if entries_path and entries_path not in self.body:
self.body[entries_path] = []
@classmethod
def get_single_entry(cls, obj_body):
"""Return the single sub-entry from the object body.
If there are no entries, or more than 1 - return None.
"""
entries_path = cls.sub_entries_path()
if not entries_path:
# This sub class doesn't support this
return None
if (entries_path not in obj_body or
len(obj_body[entries_path]) != 1):
return None
return obj_body[entries_path][0]
class DomainDef(ResourceDef):
def __init__(self,
domain_id=None,
name=None,
description=None,
tenant=policy_constants.POLICY_INFRA_TENANT):
super(DomainDef, self).__init__()
self.tenant = tenant
self.id = domain_id
self.name = name
self.description = description
self.parent_ids = (tenant)
@property
def path_pattern(self):
return DOMAINS_PATH_PATTERN
class Condition(object):
def __init__(self, value, key=policy_constants.CONDITION_KEY_TAG,
member_type=policy_constants.CONDITION_MEMBER_PORT,
operator=policy_constants.CONDITION_OP_EQUALS):
self.value = value
self.key = key
self.member_type = member_type
self.operator = operator
def get_obj_dict(self):
return {'resource_type': 'Condition',
'member_type': self.member_type,
'key': self.key,
'value': self.value,
'operator': self.operator}
class GroupDef(ResourceDef):
def __init__(self,
domain_id=None,
group_id=None,
name=None,
description=None,
conditions=None,
tenant=policy_constants.POLICY_INFRA_TENANT):
super(GroupDef, self).__init__()
self.tenant = tenant
self.id = group_id
self.name = name
self.description = description
self.parent_ids = (tenant, domain_id)
if conditions and isinstance(conditions, Condition):
self.conditions = [conditions]
else:
self.conditions = conditions
@property
def path_pattern(self):
return DOMAINS_PATH_PATTERN + "%s/groups/"
def get_obj_dict(self):
body = super(GroupDef, self).get_obj_dict()
if self.conditions:
body['expression'] = [condition.get_obj_dict()
for condition in self.conditions]
return body
def update_attributes_in_body(self, body, **kwargs):
# Fix params that need special conversions
if kwargs.get('conditions') is not None:
body['expression'] = [cond.get_obj_dict()
for cond in kwargs['conditions']]
del kwargs['conditions']
super(GroupDef, self).update_attributes_in_body(body, **kwargs)
class ServiceDef(ResourceDef):
def __init__(self,
service_id=None,
name=None,
description=None,
tenant=policy_constants.POLICY_INFRA_TENANT):
super(ServiceDef, self).__init__()
self.tenant = tenant
self.id = service_id
self.name = name
self.description = description
self.parent_ids = (tenant)
self.service_entries = []
@property
def path_pattern(self):
return SERVICES_PATH_PATTERN
def get_obj_dict(self):
body = super(ServiceDef, self).get_obj_dict()
body['service_entries'] = [entry.get_obj_dict()
for entry in self.service_entries]
return body
@staticmethod
def sub_entries_path():
return L4ServiceEntryDef().get_last_section_dict_key
class L4ServiceEntryDef(ResourceDef):
def __init__(self,
service_id=None,
service_entry_id=None,
name=None,
description=None,
protocol=policy_constants.TCP,
dest_ports=None,
tenant=policy_constants.POLICY_INFRA_TENANT):
super(L4ServiceEntryDef, self).__init__()
self.tenant = tenant
self.id = service_entry_id
self.name = name
self.description = description
self.protocol = protocol.upper()
self.dest_ports = dest_ports
self.parent_ids = (tenant, service_id)
@property
def path_pattern(self):
return SERVICES_PATH_PATTERN + "%s/service-entries/"
def get_obj_dict(self):
body = super(L4ServiceEntryDef, self).get_obj_dict()
body['resource_type'] = 'L4PortSetServiceEntry'
body['l4_protocol'] = self.protocol
body['destination_ports'] = self.dest_ports
return body
def update_attributes_in_body(self, body, **kwargs):
# Fix params that need special conversions
if kwargs.get('protocol') is not None:
body['l4_protocol'] = kwargs['protocol'].upper()
del kwargs['protocol']
if kwargs.get('dest_ports') is not None:
body['destination_ports'] = kwargs['dest_ports']
del kwargs['dest_ports']
super(L4ServiceEntryDef, self).update_attributes_in_body(
body, **kwargs)
class CommunicationProfileDef(ResourceDef):
def __init__(self,
profile_id=None,
name=None,
description=None,
tenant=policy_constants.POLICY_INFRA_TENANT):
super(CommunicationProfileDef, self).__init__()
self.tenant = tenant
self.id = profile_id
self.name = name
self.description = description
self.parent_ids = (tenant)
@property
def path_pattern(self):
return COMM_PROF_PATH_PATTERN
def get_obj_dict(self):
body = super(CommunicationProfileDef, self).get_obj_dict()
body['communication_profile_entries'] = []
return body
@staticmethod
def sub_entries_path():
entryDef = CommunicationProfileEntryDef()
return entryDef.get_last_section_dict_key
def update_attributes_in_body(self, body, **kwargs):
super(CommunicationProfileDef, self).update_attributes_in_body(
body, **kwargs)
# make sure entries are there
entries_path = self.sub_entries_path()
if entries_path not in self.body:
self.body[entries_path] = []
class CommunicationProfileEntryDef(ResourceDef):
def __init__(self,
profile_id=None,
profile_entry_id=None,
name=None,
description=None,
services=None,
action=policy_constants.ACTION_ALLOW,
tenant=policy_constants.POLICY_INFRA_TENANT):
super(CommunicationProfileEntryDef, self).__init__()
self.tenant = tenant
self.id = profile_entry_id
self.name = name
self.description = description
self.services = services
self.action = action.upper()
self.parent_ids = (tenant, profile_id)
@property
def path_pattern(self):
return COMM_PROF_PATH_PATTERN + "%s/communication-profile-entries/"
def get_obj_dict(self):
body = super(CommunicationProfileEntryDef, self).get_obj_dict()
body['services'] = self.services
body['action'] = self.action
return body
def update_attributes_in_body(self, body, **kwargs):
if kwargs.get('action') is not None:
body['action'] = kwargs['action'].upper()
del kwargs['action']
super(CommunicationProfileEntryDef, self).update_attributes_in_body(
body, **kwargs)
class CommunicationMapDef(ResourceDef):
def __init__(self,
domain_id=None,
tenant=policy_constants.POLICY_INFRA_TENANT):
super(CommunicationMapDef, self).__init__()
self.tenant = tenant
self.parent_ids = (tenant, domain_id)
@property
def path_pattern(self):
return (DOMAINS_PATH_PATTERN + "%s/communication-map/")
class CommunicationMapEntryDef(ResourceDef):
def __init__(self,
domain_id=None,
map_id=None,
sequence_number=None,
source_groups=None,
dest_groups=None,
profile_id=None,
name=None,
description=None,
tenant=policy_constants.POLICY_INFRA_TENANT):
super(CommunicationMapEntryDef, self).__init__()
self.tenant = tenant
self.domain_id = domain_id
self.id = map_id
self.name = name
self.description = description
self.sequence_number = sequence_number
self.source_groups = self.get_groups_path(domain_id, source_groups)
self.dest_groups = self.get_groups_path(domain_id, dest_groups)
self.profile_path = self.get_profile_path(
profile_id) if profile_id else None
self.parent_ids = (tenant, domain_id)
# convert groups and communication profile to full path
def get_groups_path(self, domain_id, group_ids):
if not group_ids:
return [policy_constants.ANY_GROUP]
return [GroupDef(domain_id,
group_id,
tenant=self.tenant).get_resource_full_path()
for group_id in group_ids]
def get_profile_path(self, profile_id):
return CommunicationProfileDef(
profile_id,
tenant=self.tenant).get_resource_full_path()
@property
def path_pattern(self):
return (DOMAINS_PATH_PATTERN +
"%s/communication-map/communication-entries/")
def get_obj_dict(self):
body = super(CommunicationMapEntryDef, self).get_obj_dict()
body['source_groups'] = self.source_groups
body['destination_groups'] = self.dest_groups
body['sequence_number'] = self.sequence_number
body['communication_profile_path'] = self.profile_path
return body
def update_attributes_in_body(self, body, **kwargs):
# Fix params that need special conversions
if kwargs.get('profile_id') is not None:
profile_path = self.get_profile_path(kwargs['profile_id'])
body['communication_profile_path'] = profile_path
del kwargs['profile_id']
if kwargs.get('dest_groups') is not None:
groups = self.get_groups_path(
self.domain_id, kwargs['dest_groups'])
body['destination_groups'] = groups
del kwargs['dest_groups']
if kwargs.get('source_groups') is not None:
groups = self.get_groups_path(
self.domain_id, kwargs['source_groups'])
body['source_groups'] = groups
del kwargs['source_groups']
super(CommunicationMapEntryDef, self).update_attributes_in_body(
body, **kwargs)
class EnforcementPointDef(ResourceDef):
def __init__(self, ep_id=None,
name=None,
description=None,
ip_address=None,
username=None,
password=None,
ep_type='NSXT',
tenant=policy_constants.POLICY_INFRA_TENANT):
super(EnforcementPointDef, self).__init__()
self.id = ep_id
self.name = name
self.description = description
self.tenant = tenant
self.type = ep_type
self.username = username
self.password = password
self.ip_address = ip_address
self.parent_ids = (tenant)
@property
def path_pattern(self):
return (TENANTS_PATH_PATTERN +
'deploymentzones/default-deployment-zone/enforcementpoints/')
def get_obj_dict(self):
body = super(EnforcementPointDef, self).get_obj_dict()
body['id'] = self.id
body['connection_info'] = [{'fqdn': 'abc',
'thumbprint':
policy_constants.DEFAULT_THUMBPRINT,
'username': self.username,
'password': self.password,
'ip_address': self.ip_address,
'resource_type': 'NSXTConnectionInfo'}]
body['enforcement_type'] = self.type
body['resource_type'] = 'EnforcementPoint'
return body
def update_attributes_in_body(self, body, **kwargs):
# Fix params that need special conversions
if body.get('connection_info'):
body['connection_info'][0]['resource_type'] = 'NSXTConnectionInfo'
if kwargs.get('username') is not None:
body['connection_info'][0]['username'] = kwargs['username']
del kwargs['username']
if kwargs.get('password') is not None:
body['connection_info'][0]['password'] = kwargs['password']
del kwargs['password']
if kwargs.get('ip_address') is not None:
body['connection_info'][0]['ip_address'] = kwargs['ip_address']
del kwargs['ip_address']
super(EnforcementPointDef, self).update_attributes_in_body(
body, **kwargs)
# Currently assumes one deployment point per id
class DeploymentMapDef(ResourceDef):
def __init__(self, map_id=None,
name=None,
description=None,
domain_id=None,
ep_id=None,
tenant=policy_constants.POLICY_INFRA_TENANT):
super(DeploymentMapDef, self).__init__()
self.id = map_id
self.name = name
self.description = description
# convert enforcement point id to path
self.ep_path = EnforcementPointDef(
ep_id,
tenant=tenant).get_resource_full_path() if ep_id else None
self.domain_path = DomainDef(
domain_id,
tenant=tenant).get_resource_full_path() if domain_id else None
self.tenant = tenant
self.parent_ids = (tenant)
@property
def path_pattern(self):
return (TENANTS_PATH_PATTERN + 'domaindeploymentmap/')
def get_obj_dict(self):
body = super(DeploymentMapDef, self).get_obj_dict()
body['id'] = self.id
body['domain_path'] = self.domain_path
body['enforcement_point_paths'] = [self.ep_path]
return body
def update_attributes_in_body(self, body, **kwargs):
# Fix params that need special conversions
if kwargs.get('domain_id') is not None:
domain_id = kwargs.get('domain_id')
domain_path = DomainDef(
domain_id, tenant=self.tenant).get_resource_full_path()
body['domain_path'] = domain_path
del kwargs['domain_id']
if kwargs.get('ep_id') is not None:
ep_id = kwargs.get('ep_id')
ep_path = EnforcementPointDef(
ep_id, tenant=self.tenant).get_resource_full_path()
body['enforcement_point_paths'] = [ep_path]
del kwargs['ep_id']
super(DeploymentMapDef, self).update_attributes_in_body(
body, **kwargs)
class NsxPolicyApi(object):
def __init__(self, client):
self.client = client
def create(self, resource_def):
path = resource_def.get_resource_path()
return self.client.update(path, resource_def.get_obj_dict())
def create_with_parent(self, parent_def, resource_def):
path = parent_def.get_resource_path()
body = parent_def.get_obj_dict()
if isinstance(resource_def, list):
child_dict_key = resource_def[0].get_last_section_dict_key
body[child_dict_key] = [r.get_obj_dict() for r in resource_def]
else:
child_dict_key = resource_def.get_last_section_dict_key
body[child_dict_key] = [resource_def.get_obj_dict()]
return self.client.update(path, body)
def delete(self, resource_def):
path = resource_def.get_resource_path()
self.client.delete(path)
def get(self, resource_def):
path = resource_def.get_resource_path()
return self.client.get(path)
def list(self, resource_def):
path = resource_def.get_section_path()
return self.client.list(path)
def update(self, resource_def):
path = resource_def.get_resource_path()
body = resource_def.body
return self.client.update(path, body)

View File

@ -0,0 +1,632 @@
# Copyright 2017 VMware, Inc.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import abc
import six
import uuid
from oslo_log import log as logging
from vmware_nsxlib._i18n import _
from vmware_nsxlib.v3 import exceptions
from vmware_nsxlib.v3 import policy_constants
from vmware_nsxlib.v3 import policy_defs
LOG = logging.getLogger(__name__)
# TODO(asarfaty): support retries?
# TODO(asarfaty): In future versions PATCH may be supported for update
@six.add_metaclass(abc.ABCMeta)
class NsxPolicyResourceBase(object):
"""Abstract class for NSX policy resources
declaring the basic apis each policy resource should support,
and implement some common apis and utilities
"""
def __init__(self, policy_api):
self.policy_api = policy_api
@abc.abstractmethod
def list(self, *args, **kwargs):
pass
@abc.abstractmethod
def get(self, uuid, *args, **kwargs):
pass
@abc.abstractmethod
def delete(self, uuid, *args, **kwargs):
pass
@abc.abstractmethod
def create(self, *args, **kwargs):
pass
@abc.abstractmethod
def update(self, uuid, *args, **kwargs):
pass
@staticmethod
def _init_obj_uuid(obj_uuid):
if not obj_uuid:
# generate a random id
obj_uuid = str(uuid.uuid4())
return obj_uuid
def get_by_name(self, name, *args, **kwargs):
# Return first match by name
resources_list = self.list(*args, **kwargs)
for obj in resources_list:
if obj.get('display_name') == name:
return obj
class NsxPolicyDomainApi(NsxPolicyResourceBase):
"""NSX Policy Domain."""
def create(self, name, domain_id=None, description=None,
tenant=policy_constants.POLICY_INFRA_TENANT):
domain_id = self._init_obj_uuid(domain_id)
domain_def = policy_defs.DomainDef(domain_id=domain_id,
name=name,
description=description,
tenant=tenant)
return self.policy_api.create(domain_def)
def delete(self, domain_id, tenant=policy_constants.POLICY_INFRA_TENANT):
domain_def = policy_defs.DomainDef(domain_id, tenant=tenant)
self.policy_api.delete(domain_def)
def get(self, domain_id, tenant=policy_constants.POLICY_INFRA_TENANT):
domain_def = policy_defs.DomainDef(domain_id, tenant=tenant)
return self.policy_api.get(domain_def)
def list(self, tenant=policy_constants.POLICY_INFRA_TENANT):
domain_def = policy_defs.DomainDef(tenant=tenant)
return self.policy_api.list(domain_def)['results']
def update(self, domain_id, name=None, description=None,
tenant=policy_constants.POLICY_INFRA_TENANT):
domain_def = policy_defs.DomainDef(domain_id=domain_id,
tenant=tenant)
# Get the current data, and update it with the new values
domain = self.get(domain_id, tenant=tenant)
domain_def.update_attributes_in_body(domain,
name=name,
description=description)
# update the backend
return self.policy_api.update(domain_def)
class NsxPolicyGroupApi(NsxPolicyResourceBase):
"""NSX Policy Group (under a Domain) with a single condition."""
def create(self, name, domain_id, group_id=None,
description=None,
cond_val=None,
cond_key=policy_constants.CONDITION_KEY_TAG,
cond_op=policy_constants.CONDITION_OP_EQUALS,
cond_member_type=policy_constants.CONDITION_MEMBER_PORT,
tenant=policy_constants.POLICY_INFRA_TENANT):
"""Create a group with/without a condition.
Empty condition value will result a group with no condition.
"""
group_id = self._init_obj_uuid(group_id)
# Prepare the condition
if cond_val is not None:
condition = policy_defs.Condition(value=cond_val,
key=cond_key,
operator=cond_op,
member_type=cond_member_type)
conditions = [condition]
else:
conditions = []
group_def = policy_defs.GroupDef(domain_id=domain_id,
group_id=group_id,
name=name,
description=description,
conditions=conditions,
tenant=tenant)
return self.policy_api.create(group_def)
def delete(self, domain_id, group_id,
tenant=policy_constants.POLICY_INFRA_TENANT):
group_def = policy_defs.GroupDef(domain_id=domain_id,
group_id=group_id,
tenant=tenant)
self.policy_api.delete(group_def)
def get(self, domain_id, group_id,
tenant=policy_constants.POLICY_INFRA_TENANT):
group_def = policy_defs.GroupDef(domain_id=domain_id,
group_id=group_id,
tenant=tenant)
return self.policy_api.get(group_def)
def list(self, domain_id,
tenant=policy_constants.POLICY_INFRA_TENANT):
"""List all the groups of a specific domain."""
group_def = policy_defs.GroupDef(domain_id=domain_id,
tenant=tenant)
return self.policy_api.list(group_def)['results']
def get_by_name(self, domain_id, name,
tenant=policy_constants.POLICY_INFRA_TENANT):
"""Return first group matched by name of this domain"""
return super(NsxPolicyGroupApi, self).get_by_name(name, domain_id,
tenant=tenant)
def update(self, domain_id, group_id, name=None, description=None,
tenant=policy_constants.POLICY_INFRA_TENANT):
"""Update the general data of the group.
Without changing the conditions
"""
group_def = policy_defs.GroupDef(domain_id=domain_id,
group_id=group_id,
tenant=tenant)
# Get the current data, and update it with the new values
group = self.get(domain_id, group_id, tenant=tenant)
group_def.update_attributes_in_body(group, name=name,
description=description)
# update the backend
return self.policy_api.update(group_def)
def update_condition(
self, domain_id, group_id,
cond_val=None,
cond_key=policy_constants.CONDITION_KEY_TAG,
cond_op=policy_constants.CONDITION_OP_EQUALS,
cond_member_type=policy_constants.CONDITION_MEMBER_PORT,
tenant=policy_constants.POLICY_INFRA_TENANT):
"""Update/Remove the condition of a group.
Empty condition value will result a group with no condition.
"""
group_def = policy_defs.GroupDef(domain_id=domain_id,
group_id=group_id,
tenant=tenant)
# Prepare the condition
if cond_val is not None:
condition = policy_defs.Condition(value=cond_val,
key=cond_key,
operator=cond_op,
member_type=cond_member_type)
conditions = [condition]
else:
conditions = []
# Get the current data, and update it with the new values
group = self.get(domain_id, group_id, tenant=tenant)
group_def.update_attributes_in_body(group, conditions=conditions)
# update the backend
return self.policy_api.update(group_def)
class NsxPolicyL4ServiceApi(NsxPolicyResourceBase):
"""NSX Policy Service (with a single L4 service entry).
Note the nsx-policy backend supports different types of service entries,
and multiple service entries per service.
At this point this is not supported here.
"""
def create(self, name, service_id=None, description=None,
protocol=policy_constants.TCP, dest_ports=None,
tenant=policy_constants.POLICY_INFRA_TENANT):
service_id = self._init_obj_uuid(service_id)
service_def = policy_defs.ServiceDef(service_id=service_id,
name=name,
description=description,
tenant=tenant)
# NOTE(asarfaty) We set the service entry display name (which is also
# used as the id) to be the same as the service name. In case we
# support multiple service entries, we need the name to be unique.
entry_def = policy_defs.L4ServiceEntryDef(
service_id=service_id,
name=name,
description=description,
protocol=protocol,
dest_ports=dest_ports,
tenant=tenant)
return self.policy_api.create_with_parent(service_def, entry_def)
def delete(self, service_id,
tenant=policy_constants.POLICY_INFRA_TENANT):
service_def = policy_defs.ServiceDef(service_id=service_id,
tenant=tenant)
self.policy_api.delete(service_def)
def get(self, service_id,
tenant=policy_constants.POLICY_INFRA_TENANT):
service_def = policy_defs.ServiceDef(service_id=service_id,
tenant=tenant)
return self.policy_api.get(service_def)
def list(self, tenant=policy_constants.POLICY_INFRA_TENANT):
service_def = policy_defs.ServiceDef(tenant=tenant)
return self.policy_api.list(service_def)['results']
def _update_service_entry(self, service_id, srv_entry,
name=None, description=None,
protocol=None, dest_ports=None,
tenant=policy_constants.POLICY_INFRA_TENANT):
entry_id = srv_entry['id']
entry_def = policy_defs.L4ServiceEntryDef(service_id=service_id,
service_entry_id=entry_id,
tenant=tenant)
entry_def.update_attributes_in_body(srv_entry, name=name,
description=description,
protocol=protocol,
dest_ports=dest_ports)
self.policy_api.update(entry_def)
def update(self, service_id, name=None, description=None,
protocol=None, dest_ports=None,
tenant=policy_constants.POLICY_INFRA_TENANT):
# Get the current data of service & its' service entry
service = self.get(service_id, tenant=tenant)
# update the service itself:
if name is not None or description is not None:
# update the service itself
service_def = policy_defs.ServiceDef(service_id=service_id,
tenant=tenant)
service_def.update_attributes_in_body(service, name=name,
description=description)
# update the backend
updated_service = self.policy_api.update(service_def)
else:
updated_service = service
# update the service entry if it exists
service_entry = policy_defs.ServiceDef.get_single_entry(service)
if not service_entry:
LOG.error("Cannot update service %s - expected 1 service "
"entry", service_id)
return updated_service
self._update_service_entry(
service_id, service_entry,
name=name, description=description,
protocol=protocol, dest_ports=dest_ports,
tenant=tenant)
# re-read the service from the backend to return the current data
return self.get(service_id, tenant=tenant)
class NsxPolicyCommunicationProfileApi(NsxPolicyResourceBase):
"""NSX Policy Communication profile (with a single entry).
Note the nsx-policy backend supports multiple entries per communication
profile.
At this point this is not supported here.
"""
def create(self, name, profile_id=None, description=None,
services=None, action=policy_constants.ACTION_ALLOW,
tenant=policy_constants.POLICY_INFRA_TENANT):
"""Create a Communication proflie with a single entry.
Services should be a list of service ids
"""
profile_id = self._init_obj_uuid(profile_id)
profile_def = policy_defs.CommunicationProfileDef(
profile_id=profile_id,
name=name,
description=description,
tenant=tenant)
# NOTE(asarfaty) We set the profile entry display name (which is also
# used as the id) to be the same as the profile name. In case we
# support multiple entries, we need the name to be unique.
entry_def = policy_defs.CommunicationProfileEntryDef(
profile_id=profile_id,
name=name,
description=description,
services=services,
action=action,
tenant=tenant)
return self.policy_api.create_with_parent(profile_def, entry_def)
def delete(self, profile_id,
tenant=policy_constants.POLICY_INFRA_TENANT):
profile_def = policy_defs.CommunicationProfileDef(
profile_id=profile_id, tenant=tenant)
self.policy_api.delete(profile_def)
def get(self, profile_id,
tenant=policy_constants.POLICY_INFRA_TENANT):
profile_def = policy_defs.CommunicationProfileDef(
profile_id=profile_id, tenant=tenant)
return self.policy_api.get(profile_def)
def list(self, tenant=policy_constants.POLICY_INFRA_TENANT):
profile_def = policy_defs.CommunicationProfileDef(tenant=tenant)
return self.policy_api.list(profile_def)['results']
def _update_profile_entry(self, profile_id, profile_entry,
name=None, description=None,
services=None, action=None,
tenant=policy_constants.POLICY_INFRA_TENANT):
entry_id = profile_entry['id']
entry_def = policy_defs.CommunicationProfileEntryDef(
profile_id=profile_id,
profile_entry_id=entry_id,
tenant=tenant)
entry_def.update_attributes_in_body(profile_entry, name=name,
description=description,
services=services,
action=action)
self.policy_api.update(entry_def)
def update(self, profile_id, name=None, description=None,
services=None, action=None,
tenant=policy_constants.POLICY_INFRA_TENANT):
# Get the current data of the profile & its' entry
profile = self.get(profile_id, tenant=tenant)
if name is not None or description is not None:
# update the profile itself
profile_def = policy_defs.CommunicationProfileDef(
profile_id=profile_id, tenant=tenant)
profile_def.update_attributes_in_body(profile, name=name,
description=description)
# update the backend
updated_profile = self.policy_api.update(profile_def)
else:
updated_profile = profile
# update the profile entry if it exists
profile_entry = policy_defs.CommunicationProfileDef.get_single_entry(
profile)
if not profile_entry:
LOG.error("Cannot update communication profile %s - expected 1 "
"profile entry", profile_id)
return updated_profile
self._update_profile_entry(
profile_id, profile_entry,
name=name, description=description,
services=services, action=action,
tenant=tenant)
# re-read the profile from the backend to return the current data
return self.get(profile_id, tenant=tenant)
class NsxPolicyCommunicationMapApi(NsxPolicyResourceBase):
"""NSX Policy CommunicationMap (Under a Domain)."""
def _get_last_seq_num(self, domain_id,
tenant=policy_constants.POLICY_INFRA_TENANT):
# get the current entries, and choose the next unused sequence number
communication_maps = self.list(domain_id, tenant=tenant)
if not len(communication_maps):
return 0
seq_nums = [int(cm['sequence_number']) for cm in communication_maps]
seq_nums.sort()
return seq_nums[-1]
def create(self, name, domain_id, map_id=None,
description=None, sequence_number=None, profile_id=None,
source_groups=None, dest_groups=None,
tenant=policy_constants.POLICY_INFRA_TENANT):
"""Create a CommunicationtMap.
source_groups/dest_groups should be a list of group ids belonging
to the domain.
"""
# Validate and convert inputs
map_id = self._init_obj_uuid(map_id)
if not profile_id:
# profile-id must be provided
err_msg = (_("Cannot create a communication map %(name)s without "
"communication profile id") % {'name': name})
raise exceptions.ManagerError(details=err_msg)
# get the next available sequence number
last_sequence = self._get_last_seq_num(domain_id, tenant=tenant)
if not sequence_number:
sequence_number = last_sequence + 1
entry_def = policy_defs.CommunicationMapEntryDef(
domain_id=domain_id,
map_id=map_id,
name=name,
description=description,
sequence_number=sequence_number,
source_groups=source_groups,
dest_groups=dest_groups,
profile_id=profile_id,
tenant=tenant)
if last_sequence == 0:
# if communication map is absent, we need to create it
map_def = policy_defs.CommunicationMapDef(domain_id, tenant)
return self.policy_api.create_with_parent(map_def, entry_def)
return self.policy_api.create(entry_def)
def delete(self, domain_id, map_id,
tenant=policy_constants.POLICY_INFRA_TENANT):
map_def = policy_defs.CommunicationMapEntryDef(
domain_id=domain_id,
map_id=map_id,
tenant=tenant)
self.policy_api.delete(map_def)
def get(self, domain_id, map_id,
tenant=policy_constants.POLICY_INFRA_TENANT):
map_def = policy_defs.CommunicationMapEntryDef(
domain_id=domain_id,
map_id=map_id,
tenant=tenant)
return self.policy_api.get(map_def)
def get_by_name(self, domain_id, name,
tenant=policy_constants.POLICY_INFRA_TENANT):
"""Return first communication map matched by name of this domain"""
return super(NsxPolicyCommunicationMapApi, self).get_by_name(
name, domain_id, tenant=tenant)
def list(self, domain_id,
tenant=policy_constants.POLICY_INFRA_TENANT):
"""List all the map entries of a specific domain."""
map_def = policy_defs.CommunicationMapDef(
domain_id=domain_id,
tenant=tenant)
return self.policy_api.list(map_def)['results']
def update(self, domain_id, map_id, name=None, description=None,
sequence_number=None, profile_id=None,
source_groups=None, dest_groups=None,
tenant=policy_constants.POLICY_INFRA_TENANT):
map_def = policy_defs.CommunicationMapEntryDef(
domain_id=domain_id,
map_id=map_id,
tenant=tenant)
# Get the current data, and update it with the new values
comm_map = self.get(domain_id, map_id, tenant=tenant)
map_def.update_attributes_in_body(
comm_map,
name=name,
description=description,
sequence_number=sequence_number,
profile_id=profile_id,
source_groups=source_groups,
dest_groups=dest_groups)
# update the backend
return self.policy_api.update(map_def)
class NsxPolicyEnforcementPointApi(NsxPolicyResourceBase):
"""NSX Policy Enforcement Point."""
def create(self, name, ep_id=None, description=None,
ip_address=None, username=None,
password=None,
tenant=policy_constants.POLICY_INFRA_TENANT):
if not ip_address or not username or password is None:
err_msg = (_("Cannot create an enforcement point without "
"ip_address, username and password"))
raise exceptions.ManagerError(details=err_msg)
ep_id = self._init_obj_uuid(ep_id)
ep_def = policy_defs.EnforcementPointDef(
ep_id=ep_id,
name=name,
description=description,
ip_address=ip_address,
username=username,
password=password,
tenant=tenant)
return self.policy_api.create(ep_def)
def delete(self, ep_id,
tenant=policy_constants.POLICY_INFRA_TENANT):
ep_def = policy_defs.EnforcementPointDef(
ep_id=ep_id, tenant=tenant)
self.policy_api.delete(ep_def)
def get(self, ep_id,
tenant=policy_constants.POLICY_INFRA_TENANT):
ep_def = policy_defs.EnforcementPointDef(
ep_id=ep_id, tenant=tenant)
return self.policy_api.get(ep_def)
def list(self, tenant=policy_constants.POLICY_INFRA_TENANT):
ep_def = policy_defs.EnforcementPointDef(tenant=tenant)
return self.policy_api.list(ep_def)['results']
def update(self, ep_id, name=None, description=None,
ip_address=None, username=None, password=None,
tenant=policy_constants.POLICY_INFRA_TENANT):
"""Update the enforcment point.
username & password must be defined
"""
if not username or password is None:
# profile-id must be provided
err_msg = (_("Cannot update an enforcement point without "
"username and password"))
raise exceptions.ManagerError(details=err_msg)
ep_def = policy_defs.EnforcementPointDef(ep_id=ep_id, tenant=tenant)
# Get the current data, and update it with the new values
ep = self.get(ep_id, tenant=tenant)
ep_def.update_attributes_in_body(ep,
name=name,
description=description,
ip_address=ip_address,
username=username,
password=password)
# update the backend
return self.policy_api.update(ep_def)
class NsxPolicyDeploymentMapApi(NsxPolicyResourceBase):
"""NSX Policy Deployment Map."""
def create(self, name, map_id=None, description=None,
ep_id=None, domain_id=None,
tenant=policy_constants.POLICY_INFRA_TENANT):
map_id = self._init_obj_uuid(map_id)
map_def = policy_defs.DeploymentMapDef(
map_id=map_id,
name=name,
description=description,
ep_id=ep_id,
domain_id=domain_id,
tenant=tenant)
return self.policy_api.create(map_def)
def delete(self, map_id,
tenant=policy_constants.POLICY_INFRA_TENANT):
map_def = policy_defs.DeploymentMapDef(
map_id=map_id, tenant=tenant)
self.policy_api.delete(map_def)
def get(self, map_id,
tenant=policy_constants.POLICY_INFRA_TENANT):
map_def = policy_defs.DeploymentMapDef(
map_id=map_id, tenant=tenant)
return self.policy_api.get(map_def)
def list(self, tenant=policy_constants.POLICY_INFRA_TENANT):
map_def = policy_defs.DeploymentMapDef(tenant=tenant)
return self.policy_api.list(map_def)['results']
def update(self, map_id, name=None, description=None,
ep_id=None, domain_id=None,
tenant=policy_constants.POLICY_INFRA_TENANT):
map_def = policy_defs.DeploymentMapDef(
map_id=map_id, tenant=tenant)
# Get the current data, and update it with the new values
map_obj = self.get(map_id, tenant=tenant)
map_def.update_attributes_in_body(map_obj,
name=name,
description=description,
ep_id=ep_id,
domain_id=domain_id)
# update the backend
return self.policy_api.update(map_def)