openstacksdk/openstack/tests/unit/cloud/test_port.py

376 lines
14 KiB
Python

# Copyright (c) 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
test_port
----------------------------------
Test port resource (managed by neutron)
"""
from openstack.cloud.exc import OpenStackCloudException
from openstack.tests.unit import base
class TestPort(base.TestCase):
mock_neutron_port_create_rep = {
'port': {
'status': 'DOWN',
'binding:host_id': '',
'name': 'test-port-name',
'allowed_address_pairs': [],
'admin_state_up': True,
'network_id': 'test-net-id',
'tenant_id': 'test-tenant-id',
'binding:vif_details': {},
'binding:vnic_type': 'normal',
'binding:vif_type': 'unbound',
'device_owner': '',
'mac_address': '50:1c:0d:e4:f0:0d',
'binding:profile': {},
'fixed_ips': [
{
'subnet_id': 'test-subnet-id',
'ip_address': '29.29.29.29'
}
],
'id': 'test-port-id',
'security_groups': [],
'device_id': ''
}
}
mock_neutron_port_update_rep = {
'port': {
'status': 'DOWN',
'binding:host_id': '',
'name': 'test-port-name-updated',
'allowed_address_pairs': [],
'admin_state_up': True,
'network_id': 'test-net-id',
'tenant_id': 'test-tenant-id',
'binding:vif_details': {},
'binding:vnic_type': 'normal',
'binding:vif_type': 'unbound',
'device_owner': '',
'mac_address': '50:1c:0d:e4:f0:0d',
'binding:profile': {},
'fixed_ips': [
{
'subnet_id': 'test-subnet-id',
'ip_address': '29.29.29.29'
}
],
'id': 'test-port-id',
'security_groups': [],
'device_id': ''
}
}
mock_neutron_port_list_rep = {
'ports': [
{
'status': 'ACTIVE',
'binding:host_id': 'devstack',
'name': 'first-port',
'allowed_address_pairs': [],
'admin_state_up': True,
'network_id': '70c1db1f-b701-45bd-96e0-a313ee3430b3',
'tenant_id': '',
'extra_dhcp_opts': [],
'binding:vif_details': {
'port_filter': True,
'ovs_hybrid_plug': True
},
'binding:vif_type': 'ovs',
'device_owner': 'network:router_gateway',
'mac_address': 'fa:16:3e:58:42:ed',
'binding:profile': {},
'binding:vnic_type': 'normal',
'fixed_ips': [
{
'subnet_id': '008ba151-0b8c-4a67-98b5-0d2b87666062',
'ip_address': '172.24.4.2'
}
],
'id': 'd80b1a3b-4fc1-49f3-952e-1e2ab7081d8b',
'security_groups': [],
'device_id': '9ae135f4-b6e0-4dad-9e91-3c223e385824'
},
{
'status': 'ACTIVE',
'binding:host_id': 'devstack',
'name': '',
'allowed_address_pairs': [],
'admin_state_up': True,
'network_id': 'f27aa545-cbdd-4907-b0c6-c9e8b039dcc2',
'tenant_id': 'd397de8a63f341818f198abb0966f6f3',
'extra_dhcp_opts': [],
'binding:vif_details': {
'port_filter': True,
'ovs_hybrid_plug': True
},
'binding:vif_type': 'ovs',
'device_owner': 'network:router_interface',
'mac_address': 'fa:16:3e:bb:3c:e4',
'binding:profile': {},
'binding:vnic_type': 'normal',
'fixed_ips': [
{
'subnet_id': '288bf4a1-51ba-43b6-9d0a-520e9005db17',
'ip_address': '10.0.0.1'
}
],
'id': 'f71a6703-d6de-4be1-a91a-a570ede1d159',
'security_groups': [],
'device_id': '9ae135f4-b6e0-4dad-9e91-3c223e385824'
}
]
}
def test_create_port(self):
self.register_uris([
dict(method="POST",
uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'ports.json']),
json=self.mock_neutron_port_create_rep,
validate=dict(
json={'port': {
'network_id': 'test-net-id',
'name': 'test-port-name',
'admin_state_up': True}}))
])
port = self.cloud.create_port(
network_id='test-net-id', name='test-port-name',
admin_state_up=True)
self.assertEqual(self.mock_neutron_port_create_rep['port'], port)
self.assert_calls()
def test_create_port_parameters(self):
"""Test that we detect invalid arguments passed to create_port"""
self.assertRaises(
TypeError, self.cloud.create_port,
network_id='test-net-id', nome='test-port-name',
stato_amministrativo_porta=True)
def test_create_port_exception(self):
self.register_uris([
dict(method="POST",
uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'ports.json']),
status_code=500,
validate=dict(
json={'port': {
'network_id': 'test-net-id',
'name': 'test-port-name',
'admin_state_up': True}}))
])
self.assertRaises(
OpenStackCloudException, self.cloud.create_port,
network_id='test-net-id', name='test-port-name',
admin_state_up=True)
self.assert_calls()
def test_update_port(self):
port_id = 'd80b1a3b-4fc1-49f3-952e-1e2ab7081d8b'
self.register_uris([
dict(method='GET',
uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'ports.json']),
json=self.mock_neutron_port_list_rep),
dict(method='PUT',
uri=self.get_mock_url(
'network', 'public',
append=['v2.0', 'ports', '%s.json' % port_id]),
json=self.mock_neutron_port_update_rep,
validate=dict(
json={'port': {'name': 'test-port-name-updated'}}))
])
port = self.cloud.update_port(
name_or_id=port_id, name='test-port-name-updated')
self.assertEqual(self.mock_neutron_port_update_rep['port'], port)
self.assert_calls()
def test_update_port_parameters(self):
"""Test that we detect invalid arguments passed to update_port"""
self.assertRaises(
TypeError, self.cloud.update_port,
name_or_id='test-port-id', nome='test-port-name-updated')
def test_update_port_exception(self):
port_id = 'd80b1a3b-4fc1-49f3-952e-1e2ab7081d8b'
self.register_uris([
dict(method='GET',
uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'ports.json']),
json=self.mock_neutron_port_list_rep),
dict(method='PUT',
uri=self.get_mock_url(
'network', 'public',
append=['v2.0', 'ports', '%s.json' % port_id]),
status_code=500,
validate=dict(
json={'port': {'name': 'test-port-name-updated'}}))
])
self.assertRaises(
OpenStackCloudException, self.cloud.update_port,
name_or_id='d80b1a3b-4fc1-49f3-952e-1e2ab7081d8b',
name='test-port-name-updated')
self.assert_calls()
def test_list_ports(self):
self.register_uris([
dict(method='GET',
uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'ports.json']),
json=self.mock_neutron_port_list_rep)
])
ports = self.cloud.list_ports()
self.assertItemsEqual(self.mock_neutron_port_list_rep['ports'], ports)
self.assert_calls()
def test_list_ports_filtered(self):
self.register_uris([
dict(method='GET',
uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'ports.json'],
qs_elements=['status=DOWN']),
json=self.mock_neutron_port_list_rep)
])
ports = self.cloud.list_ports(filters={'status': 'DOWN'})
self.assertItemsEqual(self.mock_neutron_port_list_rep['ports'], ports)
self.assert_calls()
def test_list_ports_exception(self):
self.register_uris([
dict(method='GET',
uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'ports.json']),
status_code=500)
])
self.assertRaises(OpenStackCloudException, self.cloud.list_ports)
def test_search_ports_by_id(self):
port_id = 'f71a6703-d6de-4be1-a91a-a570ede1d159'
self.register_uris([
dict(method='GET',
uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'ports.json']),
json=self.mock_neutron_port_list_rep)
])
ports = self.cloud.search_ports(name_or_id=port_id)
self.assertEqual(1, len(ports))
self.assertEqual('fa:16:3e:bb:3c:e4', ports[0]['mac_address'])
self.assert_calls()
def test_search_ports_by_name(self):
port_name = "first-port"
self.register_uris([
dict(method='GET',
uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'ports.json']),
json=self.mock_neutron_port_list_rep)
])
ports = self.cloud.search_ports(name_or_id=port_name)
self.assertEqual(1, len(ports))
self.assertEqual('fa:16:3e:58:42:ed', ports[0]['mac_address'])
self.assert_calls()
def test_search_ports_not_found(self):
self.register_uris([
dict(method='GET',
uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'ports.json']),
json=self.mock_neutron_port_list_rep)
])
ports = self.cloud.search_ports(name_or_id='non-existent')
self.assertEqual(0, len(ports))
self.assert_calls()
def test_delete_port(self):
port_id = 'd80b1a3b-4fc1-49f3-952e-1e2ab7081d8b'
self.register_uris([
dict(method='GET',
uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'ports.json']),
json=self.mock_neutron_port_list_rep),
dict(method='DELETE',
uri=self.get_mock_url(
'network', 'public',
append=['v2.0', 'ports', '%s.json' % port_id]),
json={})
])
self.assertTrue(self.cloud.delete_port(name_or_id='first-port'))
def test_delete_port_not_found(self):
self.register_uris([
dict(method='GET',
uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'ports.json']),
json=self.mock_neutron_port_list_rep)
])
self.assertFalse(self.cloud.delete_port(name_or_id='non-existent'))
self.assert_calls()
def test_delete_subnet_multiple_found(self):
port_name = "port-name"
port1 = dict(id='123', name=port_name)
port2 = dict(id='456', name=port_name)
self.register_uris([
dict(method='GET',
uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'ports.json']),
json={'ports': [port1, port2]})
])
self.assertRaises(OpenStackCloudException,
self.cloud.delete_port, port_name)
self.assert_calls()
def test_delete_subnet_multiple_using_id(self):
port_name = "port-name"
port1 = dict(id='123', name=port_name)
port2 = dict(id='456', name=port_name)
self.register_uris([
dict(method='GET',
uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'ports.json']),
json={'ports': [port1, port2]}),
dict(method='DELETE',
uri=self.get_mock_url(
'network', 'public',
append=['v2.0', 'ports', '%s.json' % port1['id']]),
json={})
])
self.assertTrue(self.cloud.delete_port(name_or_id=port1['id']))
self.assert_calls()
def test_get_port_by_id(self):
fake_port = dict(id='123', name='456')
self.register_uris([
dict(method='GET',
uri=self.get_mock_url(
'network', 'public', append=['v2.0',
'ports',
fake_port['id']]),
json={'port': fake_port})
])
r = self.cloud.get_port_by_id(fake_port['id'])
self.assertIsNotNone(r)
self.assertDictEqual(fake_port, r)
self.assert_calls()