diff --git a/patrole_tempest_plugin/tests/api/network/test_agents_rbac.py b/patrole_tempest_plugin/tests/api/network/test_agents_rbac.py new file mode 100644 index 00000000..506dd5b7 --- /dev/null +++ b/patrole_tempest_plugin/tests/api/network/test_agents_rbac.py @@ -0,0 +1,237 @@ +# Copyright 2017 AT&T Corporation. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from tempest.lib.common.utils import test_utils +from tempest.lib import decorators +from tempest import test + +from patrole_tempest_plugin import rbac_rule_validation +from patrole_tempest_plugin.tests.api.network import rbac_base as base + + +class AgentsRbacTest(base.BaseNetworkRbacTest): + + @classmethod + def skip_checks(cls): + super(AgentsRbacTest, cls).skip_checks() + if not test.is_extension_enabled('agent', 'network'): + msg = "agent extension not enabled." + raise cls.skipException(msg) + + @classmethod + def resource_setup(cls): + super(AgentsRbacTest, cls).resource_setup() + agents = cls.agents_client.list_agents()['agents'] + cls.agent = agents[0] + + @decorators.idempotent_id('f88e38e0-ab52-4b97-8ffa-48a27f9d199b') + @rbac_rule_validation.action(service="neutron", + rule="get_agent", + expected_error_code=404) + def test_show_agent(self): + """Show agent test. + + RBAC test for the neutron get_agent policy + """ + self.rbac_utils.switch_role(self, toggle_rbac_role=True) + self.agents_client.show_agent(self.agent['id']) + + @decorators.idempotent_id('8ca68fdb-eaf6-4880-af82-ba0982949dec') + @rbac_rule_validation.action(service="neutron", + rule="update_agent", + expected_error_code=404) + def test_update_agent(self): + """Update agent test. + + RBAC test for the neutron update_agent policy + """ + original_status = self.agent['admin_state_up'] + agent_status = {'admin_state_up': original_status} + + self.rbac_utils.switch_role(self, toggle_rbac_role=True) + self.agents_client.update_agent(agent_id=self.agent['id'], + agent=agent_status) + + +class L3AgentSchedulerRbacTest(base.BaseNetworkRbacTest): + + @classmethod + def skip_checks(cls): + super(L3AgentSchedulerRbacTest, cls).skip_checks() + if not test.is_extension_enabled('l3_agent_scheduler', 'network'): + msg = "l3_agent_scheduler extension not enabled." + raise cls.skipException(msg) + + @classmethod + def resource_setup(cls): + super(L3AgentSchedulerRbacTest, cls).resource_setup() + cls.router = cls.create_router() + cls.agent = None + + def setUp(self): + super(L3AgentSchedulerRbacTest, self).setUp() + if self.agent is not None: + return + + # Find an agent and validate that it is correct. + agents = self.agents_client.list_agents()['agents'] + agent = {'agent_type': None} + for a in agents: + if a['agent_type'] == 'L3 agent': + agent = a + break + self.assertEqual(agent['agent_type'], 'L3 agent', 'Could not find ' + 'L3 agent in agent list though l3_agent_scheduler ' + 'is enabled.') + self.agent = agent + + @decorators.idempotent_id('5d2bbdbc-40a5-43d2-828a-84dc93fcc453') + @rbac_rule_validation.action(service="neutron", + rule="get_l3-routers") + def test_list_routers_on_l3_agent(self): + """List routers on L3 agent test. + + RBAC test for the neutron get_l3-routers policy + """ + self.rbac_utils.switch_role(self, toggle_rbac_role=True) + self.agents_client.list_routers_on_l3_agent(self.agent['id']) + + @decorators.idempotent_id('466b2a10-8747-4c09-855a-bd90a1c86ce7') + @rbac_rule_validation.action(service="neutron", + rule="create_l3-router") + def test_create_router_on_l3_agent(self): + """Create router on L3 agent test. + + RBAC test for the neutron create_l3-router policy + """ + self.rbac_utils.switch_role(self, toggle_rbac_role=True) + self.agents_client.create_router_on_l3_agent( + self.agent['id'], router_id=self.router['id']) + self.addCleanup( + test_utils.call_and_ignore_notfound_exc, + self.agents_client.delete_router_from_l3_agent, + self.agent['id'], router_id=self.router['id']) + + @decorators.idempotent_id('8138cfc9-3e48-4a34-adf6-894077aa1be4') + @rbac_rule_validation.action(service="neutron", + rule="delete_l3-router") + def test_delete_router_from_l3_agent(self): + """Delete router from L3 agent test. + + RBAC test for the neutron delete_l3-router policy + """ + self.agents_client.create_router_on_l3_agent( + self.agent['id'], router_id=self.router['id']) + self.addCleanup( + test_utils.call_and_ignore_notfound_exc, + self.agents_client.delete_router_from_l3_agent, + self.agent['id'], router_id=self.router['id']) + + self.rbac_utils.switch_role(self, toggle_rbac_role=True) + self.agents_client.delete_router_from_l3_agent( + self.agent['id'], router_id=self.router['id']) + + +class DHCPAgentSchedulersRbacTest(base.BaseNetworkRbacTest): + + @classmethod + def skip_checks(cls): + super(DHCPAgentSchedulersRbacTest, cls).skip_checks() + if not test.is_extension_enabled('dhcp_agent_scheduler', 'network'): + msg = "dhcp_agent_scheduler extension not enabled." + raise cls.skipException(msg) + + @classmethod + def resource_setup(cls): + super(DHCPAgentSchedulersRbacTest, cls).resource_setup() + cls.agent = None + + def setUp(self): + super(DHCPAgentSchedulersRbacTest, self).setUp() + if self.agent is not None: + return + + # Find a DHCP agent and validate that it is correct. + agents = self.agents_client.list_agents()['agents'] + agent = {'agent_type': None} + for a in agents: + if a['agent_type'] == 'DHCP agent': + agent = a + break + self.assertEqual(agent['agent_type'], 'DHCP agent', 'Could not find ' + 'DHCP agent in agent list though dhcp_agent_scheduler' + ' is enabled.') + self.agent = agent + + def _create_and_prepare_network_for_agent(self, agent_id): + """Create network and ensure it is not hosted by agent_id.""" + network_id = self.create_network()['id'] + + if self._check_network_in_dhcp_agent(network_id, agent_id): + self.agents_client.delete_network_from_dhcp_agent( + agent_id=agent_id, network_id=network_id) + + return network_id + + def _check_network_in_dhcp_agent(self, network_id, agent_id): + networks = self.agents_client.list_networks_hosted_by_one_dhcp_agent( + agent_id)['networks'] or [] + return network_id in [network['id'] for network in networks] + + @decorators.idempotent_id('dc84087b-4c2a-4878-8ed0-40370e19da17') + @rbac_rule_validation.action(service="neutron", + rule="get_dhcp-networks") + def test_list_networks_hosted_by_one_dhcp_agent(self): + """List networks hosted by one DHCP agent test. + + RBAC test for the neutron get_dhcp-networks policy + """ + self.rbac_utils.switch_role(self, toggle_rbac_role=True) + self.agents_client.list_networks_hosted_by_one_dhcp_agent( + self.agent['id']) + + @decorators.idempotent_id('14e014ac-f355-46d3-b6d8-98f2c9ec1610') + @rbac_rule_validation.action(service="neutron", + rule="create_dhcp-network") + def test_add_dhcp_agent_to_network(self): + """Add DHCP agent to network test. + + RBAC test for the neutron create_dhcp-network policy + """ + network_id = self._create_and_prepare_network_for_agent( + self.agent['id']) + + self.rbac_utils.switch_role(self, toggle_rbac_role=True) + self.agents_client.add_dhcp_agent_to_network( + self.agent['id'], network_id=network_id) + # Clean up is not necessary and might result in 409 being raised. + + @decorators.idempotent_id('937a4302-4b49-407d-9980-5843d7badc38') + @rbac_rule_validation.action(service="neutron", + rule="delete_dhcp-network") + def test_delete_network_from_dhcp_agent(self): + """Delete DHCP agent from network test. + + RBAC test for the neutron delete_dhcp-network policy + """ + network_id = self._create_and_prepare_network_for_agent( + self.agent['id']) + self.agents_client.add_dhcp_agent_to_network( + self.agent['id'], network_id=network_id) + # Clean up is not necessary and might result in 409 being raised. + + self.rbac_utils.switch_role(self, toggle_rbac_role=True) + self.agents_client.delete_network_from_dhcp_agent( + self.agent['id'], network_id=network_id) diff --git a/releasenotes/notes/rbac-tests-for-network-agents-fbc899925b5948b1.yaml b/releasenotes/notes/rbac-tests-for-network-agents-fbc899925b5948b1.yaml new file mode 100644 index 00000000..64deadc7 --- /dev/null +++ b/releasenotes/notes/rbac-tests-for-network-agents-fbc899925b5948b1.yaml @@ -0,0 +1,14 @@ +--- +features: + - | + Implements RBAC tests for Tempest network agents_client, providing + coverage for the following policies: + + * update_agent + * get_agent + * create_dhcp-network + * delete_dhcp-network + * get_dhcp-networks + * create_l3-router + * delete_l3-router + * get_l3-routers