Change all tests to use the standard addCleanup
Currently a lot of tests are using a custom self.store() method to perform automatic cleanup on tearDown. We should change to the standard addCleanup method that is being called on every object (also if setUp fails and tearDown is not called). See [1] for details. [1] https://docs.python.org/2/library/unittest.html#unittest.TestCase.addCleanup Change-Id: Ic62dd9a5e07435d1179793ec2c5cdc6e4eddef60
This commit is contained in:
parent
7d9ebea601
commit
53db6e21df
|
@ -63,41 +63,35 @@ class TestAllowedAddressPairsDetectActive(test_base.DFTestBase):
|
|||
self.policy = None
|
||||
self.allowed_address_pair_ip_address = None
|
||||
self.allowed_address_pair_mac_address = None
|
||||
try:
|
||||
self.topology = self.store(app_testing_objects.Topology(
|
||||
self.neutron,
|
||||
self.nb_api))
|
||||
subnet = self.topology.create_subnet(cidr='192.168.98.0/24')
|
||||
self.subnet = subnet
|
||||
port = subnet.create_port()
|
||||
self.port = port
|
||||
self.topology = app_testing_objects.Topology(self.neutron,
|
||||
self.nb_api)
|
||||
self.addCleanup(self.topology.close)
|
||||
subnet = self.topology.create_subnet(cidr='192.168.98.0/24')
|
||||
self.subnet = subnet
|
||||
port = subnet.create_port()
|
||||
self.port = port
|
||||
|
||||
time.sleep(const.DEFAULT_RESOURCE_READY_TIMEOUT)
|
||||
time.sleep(const.DEFAULT_RESOURCE_READY_TIMEOUT)
|
||||
|
||||
port.update({'allowed_address_pairs': [{
|
||||
'ip_address': '192.168.98.100'}]})
|
||||
port.update({'allowed_address_pairs': [{
|
||||
'ip_address': '192.168.98.100'}]})
|
||||
|
||||
time.sleep(const.DEFAULT_RESOURCE_READY_TIMEOUT)
|
||||
time.sleep(const.DEFAULT_RESOURCE_READY_TIMEOUT)
|
||||
|
||||
port_lport = port.port.get_logical_port()
|
||||
self.assertIsNotNone(port_lport)
|
||||
port_lport = port.port.get_logical_port()
|
||||
self.assertIsNotNone(port_lport)
|
||||
|
||||
self.allowed_address_pair_mac_address, \
|
||||
self.allowed_address_pair_ip_address = \
|
||||
apps.get_port_mac_and_ip(port, True)
|
||||
self.allowed_address_pair_mac_address, \
|
||||
self.allowed_address_pair_ip_address = \
|
||||
apps.get_port_mac_and_ip(port, True)
|
||||
|
||||
# Create policy to reply arp request sent from controller
|
||||
self.policy = self.store(
|
||||
app_testing_objects.Policy(
|
||||
initial_actions=[],
|
||||
port_policies=self._create_policy_to_reply_arp_request(),
|
||||
unknown_port_action=app_testing_objects.IgnoreAction()
|
||||
)
|
||||
)
|
||||
except Exception:
|
||||
if self.topology:
|
||||
self.topology.close()
|
||||
raise
|
||||
# Create policy to reply arp request sent from controller
|
||||
self.policy = app_testing_objects.Policy(
|
||||
initial_actions=[],
|
||||
port_policies=self._create_policy_to_reply_arp_request(),
|
||||
unknown_port_action=app_testing_objects.IgnoreAction()
|
||||
)
|
||||
self.addCleanup(self.policy.close)
|
||||
|
||||
def _create_arp_response(self, buf):
|
||||
pkt = ryu.lib.packet.packet.Packet(buf)
|
||||
|
|
|
@ -30,22 +30,13 @@ LOG = log.getLogger(__name__)
|
|||
class TestDHCPApp(test_base.DFTestBase):
|
||||
|
||||
def _create_topology(self, enable_dhcp=True, cidr='192.168.11.0/24'):
|
||||
try:
|
||||
self.topology = self.store(
|
||||
app_testing_objects.Topology(
|
||||
self.neutron,
|
||||
self.nb_api
|
||||
)
|
||||
)
|
||||
self.subnet1 = self.topology.create_subnet(
|
||||
cidr=cidr, enable_dhcp=enable_dhcp)
|
||||
self.port1 = self.subnet1.create_port()
|
||||
self.port2 = self.subnet1.create_port()
|
||||
time.sleep(const.DEFAULT_RESOURCE_READY_TIMEOUT)
|
||||
except Exception:
|
||||
if self.topology:
|
||||
self.topology.close()
|
||||
raise
|
||||
self.topology = app_testing_objects.Topology(self.neutron, self.nb_api)
|
||||
self.addCleanup(self.topology.close)
|
||||
self.subnet1 = self.topology.create_subnet(
|
||||
cidr=cidr, enable_dhcp=enable_dhcp)
|
||||
self.port1 = self.subnet1.create_port()
|
||||
self.port2 = self.subnet1.create_port()
|
||||
time.sleep(const.DEFAULT_RESOURCE_READY_TIMEOUT)
|
||||
|
||||
def _create_udp_packet_for_dhcp(self,
|
||||
dst_mac=constants.BROADCAST_MAC,
|
||||
|
@ -275,16 +266,15 @@ class TestDHCPApp(test_base.DFTestBase):
|
|||
)
|
||||
|
||||
port_policies = self._create_port_policies(disable_rule=False)
|
||||
policy = self.store(
|
||||
app_testing_objects.Policy(
|
||||
initial_actions=[send_dhcp_offer,
|
||||
send_dhcp_offer,
|
||||
send_dhcp_offer,
|
||||
send_dhcp_offer],
|
||||
port_policies=port_policies,
|
||||
unknown_port_action=app_testing_objects.IgnoreAction()
|
||||
)
|
||||
policy = app_testing_objects.Policy(
|
||||
initial_actions=[send_dhcp_offer,
|
||||
send_dhcp_offer,
|
||||
send_dhcp_offer,
|
||||
send_dhcp_offer],
|
||||
port_policies=port_policies,
|
||||
unknown_port_action=app_testing_objects.IgnoreAction()
|
||||
)
|
||||
self.addCleanup(policy.close)
|
||||
|
||||
policy.start(self.topology)
|
||||
test_utils.wait_until_true(internal_predicate,
|
||||
|
@ -322,13 +312,12 @@ class TestDHCPApp(test_base.DFTestBase):
|
|||
rules=rules,
|
||||
default_action=raise_action
|
||||
)
|
||||
policy = self.store(
|
||||
app_testing_objects.Policy(
|
||||
initial_actions=[send_dhcp_offer],
|
||||
port_policies={key: port_policy},
|
||||
unknown_port_action=app_testing_objects.IgnoreAction()
|
||||
)
|
||||
policy = app_testing_objects.Policy(
|
||||
initial_actions=[send_dhcp_offer],
|
||||
port_policies={key: port_policy},
|
||||
unknown_port_action=app_testing_objects.IgnoreAction()
|
||||
)
|
||||
self.addCleanup(policy.close)
|
||||
policy.start(self.topology)
|
||||
# Since there is no dhcp response, we are expecting timeout
|
||||
# exception here.
|
||||
|
@ -349,13 +338,12 @@ class TestDHCPApp(test_base.DFTestBase):
|
|||
dhcp_packet,
|
||||
)
|
||||
port_policies = self._create_port_policies()
|
||||
policy = self.store(
|
||||
app_testing_objects.Policy(
|
||||
initial_actions=[send_dhcp_offer],
|
||||
port_policies=port_policies,
|
||||
unknown_port_action=app_testing_objects.IgnoreAction()
|
||||
)
|
||||
policy = app_testing_objects.Policy(
|
||||
initial_actions=[send_dhcp_offer],
|
||||
port_policies=port_policies,
|
||||
unknown_port_action=app_testing_objects.IgnoreAction()
|
||||
)
|
||||
self.addCleanup(policy.close)
|
||||
apps.start_policy(policy, self.topology,
|
||||
const.DEFAULT_RESOURCE_READY_TIMEOUT)
|
||||
|
||||
|
|
|
@ -28,29 +28,21 @@ class TestDNATApp(test_base.DFTestBase):
|
|||
super(TestDNATApp, self).setUp()
|
||||
|
||||
self.topology = None
|
||||
try:
|
||||
self.topology = self.store(
|
||||
app_testing_objects.Topology(
|
||||
self.neutron,
|
||||
self.nb_api
|
||||
)
|
||||
)
|
||||
self.subnet = self.topology.create_subnet()
|
||||
self.port = self.subnet.create_port()
|
||||
self.router = self.topology.create_router([
|
||||
self.subnet.subnet_id
|
||||
])
|
||||
ext_net_id = self.topology.create_external_network([
|
||||
self.router.router_id
|
||||
])
|
||||
self.fip = self.store(
|
||||
objects.FloatingipTestObj(self.neutron, self.nb_api))
|
||||
self.fip.create({'floating_network_id': ext_net_id,
|
||||
'port_id': self.port.port.port_id})
|
||||
except Exception:
|
||||
if self.topology:
|
||||
self.topology.close()
|
||||
raise
|
||||
self.topology = app_testing_objects.Topology(self.neutron,
|
||||
self.nb_api)
|
||||
self.addCleanup(self.topology.close)
|
||||
self.subnet = self.topology.create_subnet()
|
||||
self.port = self.subnet.create_port()
|
||||
self.router = self.topology.create_router([
|
||||
self.subnet.subnet_id
|
||||
])
|
||||
ext_net_id = self.topology.create_external_network([
|
||||
self.router.router_id
|
||||
])
|
||||
self.fip = objects.FloatingipTestObj(self.neutron, self.nb_api)
|
||||
self.addCleanup(self.fip.close)
|
||||
self.fip.create({'floating_network_id': ext_net_id,
|
||||
'port_id': self.port.port.port_id})
|
||||
|
||||
def _create_icmp_test_port_policies(self, icmp_filter):
|
||||
ignore_action = app_testing_objects.IgnoreAction()
|
||||
|
@ -129,20 +121,19 @@ class TestDNATApp(test_base.DFTestBase):
|
|||
self.topology.external_network.get_gw_ip(),
|
||||
ryu.lib.packet.ipv4.inet.IPPROTO_ICMP,
|
||||
ttl=1)
|
||||
policy = self.store(
|
||||
app_testing_objects.Policy(
|
||||
initial_actions=[
|
||||
app_testing_objects.SendAction(
|
||||
self.subnet.subnet_id,
|
||||
self.port.port_id,
|
||||
initial_packet,
|
||||
),
|
||||
],
|
||||
port_policies=self._create_icmp_test_port_policies(
|
||||
app_testing_objects.RyuICMPTimeExceedFilter),
|
||||
unknown_port_action=ignore_action
|
||||
)
|
||||
policy = app_testing_objects.Policy(
|
||||
initial_actions=[
|
||||
app_testing_objects.SendAction(
|
||||
self.subnet.subnet_id,
|
||||
self.port.port_id,
|
||||
initial_packet,
|
||||
),
|
||||
],
|
||||
port_policies=self._create_icmp_test_port_policies(
|
||||
app_testing_objects.RyuICMPTimeExceedFilter),
|
||||
unknown_port_action=ignore_action
|
||||
)
|
||||
self.addCleanup(policy.close)
|
||||
apps.start_policy(policy, self.topology,
|
||||
const.DEFAULT_RESOURCE_READY_TIMEOUT)
|
||||
|
||||
|
@ -188,20 +179,19 @@ class TestDNATApp(test_base.DFTestBase):
|
|||
self.port.port_id,
|
||||
initial_packet)
|
||||
ignore_action = app_testing_objects.IgnoreAction()
|
||||
policy = self.store(
|
||||
app_testing_objects.Policy(
|
||||
initial_actions=[
|
||||
send_action,
|
||||
send_action,
|
||||
send_action,
|
||||
send_action,
|
||||
],
|
||||
port_policies=self._create_rate_limit_port_policies(
|
||||
cfg.CONF.df_dnat_app.dnat_ttl_invalid_max_rate,
|
||||
app_testing_objects.RyuICMPTimeExceedFilter),
|
||||
unknown_port_action=ignore_action
|
||||
)
|
||||
policy = app_testing_objects.Policy(
|
||||
initial_actions=[
|
||||
send_action,
|
||||
send_action,
|
||||
send_action,
|
||||
send_action,
|
||||
],
|
||||
port_policies=self._create_rate_limit_port_policies(
|
||||
cfg.CONF.df_dnat_app.dnat_ttl_invalid_max_rate,
|
||||
app_testing_objects.RyuICMPTimeExceedFilter),
|
||||
unknown_port_action=ignore_action
|
||||
)
|
||||
self.addCleanup(policy.close)
|
||||
policy.start(self.topology)
|
||||
# Since the rate limit, we expect timeout to wait for 4th packet hit
|
||||
# the policy.
|
||||
|
@ -219,20 +209,19 @@ class TestDNATApp(test_base.DFTestBase):
|
|||
initial_packet = self._create_packet(
|
||||
self.topology.external_network.get_gw_ip(),
|
||||
ryu.lib.packet.ipv4.inet.IPPROTO_UDP)
|
||||
policy = self.store(
|
||||
app_testing_objects.Policy(
|
||||
initial_actions=[
|
||||
app_testing_objects.SendAction(
|
||||
self.subnet.subnet_id,
|
||||
self.port.port_id,
|
||||
initial_packet,
|
||||
),
|
||||
],
|
||||
port_policies=self._create_icmp_test_port_policies(
|
||||
app_testing_objects.RyuICMPUnreachFilter),
|
||||
unknown_port_action=ignore_action
|
||||
)
|
||||
policy = app_testing_objects.Policy(
|
||||
initial_actions=[
|
||||
app_testing_objects.SendAction(
|
||||
self.subnet.subnet_id,
|
||||
self.port.port_id,
|
||||
initial_packet,
|
||||
),
|
||||
],
|
||||
port_policies=self._create_icmp_test_port_policies(
|
||||
app_testing_objects.RyuICMPUnreachFilter),
|
||||
unknown_port_action=ignore_action
|
||||
)
|
||||
self.addCleanup(policy.close)
|
||||
policy.start(self.topology)
|
||||
policy.wait(const.DEFAULT_RESOURCE_READY_TIMEOUT)
|
||||
if len(policy.exceptions) > 0:
|
||||
|
@ -248,20 +237,19 @@ class TestDNATApp(test_base.DFTestBase):
|
|||
self.port.port_id,
|
||||
initial_packet)
|
||||
ignore_action = app_testing_objects.IgnoreAction()
|
||||
policy = self.store(
|
||||
app_testing_objects.Policy(
|
||||
initial_actions=[
|
||||
send_action,
|
||||
send_action,
|
||||
send_action,
|
||||
send_action,
|
||||
],
|
||||
port_policies=self._create_rate_limit_port_policies(
|
||||
cfg.CONF.df_dnat_app.dnat_icmp_error_max_rate,
|
||||
app_testing_objects.RyuICMPUnreachFilter),
|
||||
unknown_port_action=ignore_action
|
||||
)
|
||||
policy = app_testing_objects.Policy(
|
||||
initial_actions=[
|
||||
send_action,
|
||||
send_action,
|
||||
send_action,
|
||||
send_action,
|
||||
],
|
||||
port_policies=self._create_rate_limit_port_policies(
|
||||
cfg.CONF.df_dnat_app.dnat_icmp_error_max_rate,
|
||||
app_testing_objects.RyuICMPUnreachFilter),
|
||||
unknown_port_action=ignore_action
|
||||
)
|
||||
self.addCleanup(policy.close)
|
||||
policy.start(self.topology)
|
||||
# Since the rate limit, we expect timeout to wait for 4th packet hit
|
||||
# the policy.
|
||||
|
|
|
@ -34,62 +34,57 @@ class TestArpResponder(test_base.DFTestBase):
|
|||
super(TestArpResponder, self).setUp()
|
||||
self.topology = None
|
||||
self.policy = None
|
||||
try:
|
||||
self.topology = app_testing_objects.Topology(
|
||||
self.neutron,
|
||||
self.nb_api)
|
||||
subnet1 = self.topology.create_subnet(cidr='192.168.10.0/24')
|
||||
port1 = subnet1.create_port()
|
||||
port2 = subnet1.create_port()
|
||||
time.sleep(const.DEFAULT_RESOURCE_READY_TIMEOUT)
|
||||
# Create policy
|
||||
arp_packet = self._create_arp_request(
|
||||
src_port=port1.port.get_logical_port(),
|
||||
dst_port=port2.port.get_logical_port(),
|
||||
)
|
||||
send_arp_request = app_testing_objects.SendAction(
|
||||
subnet1.subnet_id,
|
||||
port1.port_id,
|
||||
arp_packet,
|
||||
)
|
||||
ignore_action = app_testing_objects.IgnoreAction()
|
||||
log_action = app_testing_objects.LogAction()
|
||||
key1 = (subnet1.subnet_id, port1.port_id)
|
||||
port_policies = {
|
||||
key1: app_testing_objects.PortPolicy(
|
||||
rules=[
|
||||
app_testing_objects.PortPolicyRule(
|
||||
# Detect arp replies
|
||||
app_testing_objects.RyuARPReplyFilter(),
|
||||
actions=[
|
||||
log_action,
|
||||
app_testing_objects.StopSimulationAction()
|
||||
]
|
||||
),
|
||||
app_testing_objects.PortPolicyRule(
|
||||
# Ignore IPv6 packets
|
||||
app_testing_objects.RyuIPv6Filter(),
|
||||
actions=[
|
||||
ignore_action
|
||||
]
|
||||
),
|
||||
],
|
||||
default_action=app_testing_objects.RaiseAction(
|
||||
"Unexpected packet"
|
||||
)
|
||||
),
|
||||
}
|
||||
self.policy = app_testing_objects.Policy(
|
||||
initial_actions=[send_arp_request],
|
||||
port_policies=port_policies,
|
||||
unknown_port_action=ignore_action
|
||||
)
|
||||
except Exception:
|
||||
if self.topology:
|
||||
self.topology.close()
|
||||
raise
|
||||
self.store(self.topology)
|
||||
self.store(self.policy)
|
||||
self.topology = app_testing_objects.Topology(
|
||||
self.neutron,
|
||||
self.nb_api)
|
||||
self.addCleanup(self.topology.close)
|
||||
subnet1 = self.topology.create_subnet(cidr='192.168.10.0/24')
|
||||
port1 = subnet1.create_port()
|
||||
port2 = subnet1.create_port()
|
||||
time.sleep(const.DEFAULT_RESOURCE_READY_TIMEOUT)
|
||||
# Create policy
|
||||
arp_packet = self._create_arp_request(
|
||||
src_port=port1.port.get_logical_port(),
|
||||
dst_port=port2.port.get_logical_port(),
|
||||
)
|
||||
send_arp_request = app_testing_objects.SendAction(
|
||||
subnet1.subnet_id,
|
||||
port1.port_id,
|
||||
arp_packet,
|
||||
)
|
||||
ignore_action = app_testing_objects.IgnoreAction()
|
||||
log_action = app_testing_objects.LogAction()
|
||||
key1 = (subnet1.subnet_id, port1.port_id)
|
||||
port_policies = {
|
||||
key1: app_testing_objects.PortPolicy(
|
||||
rules=[
|
||||
app_testing_objects.PortPolicyRule(
|
||||
# Detect arp replies
|
||||
app_testing_objects.RyuARPReplyFilter(),
|
||||
actions=[
|
||||
log_action,
|
||||
app_testing_objects.StopSimulationAction()
|
||||
]
|
||||
),
|
||||
app_testing_objects.PortPolicyRule(
|
||||
# Ignore IPv6 packets
|
||||
app_testing_objects.RyuIPv6Filter(),
|
||||
actions=[
|
||||
ignore_action
|
||||
]
|
||||
),
|
||||
],
|
||||
default_action=app_testing_objects.RaiseAction(
|
||||
"Unexpected packet"
|
||||
)
|
||||
),
|
||||
}
|
||||
self.policy = app_testing_objects.Policy(
|
||||
initial_actions=[send_arp_request],
|
||||
port_policies=port_policies,
|
||||
unknown_port_action=ignore_action
|
||||
)
|
||||
self.addCleanup(self.policy.close)
|
||||
|
||||
def _create_arp_request(self, src_port, dst_port):
|
||||
ethernet = ryu.lib.packet.ethernet.ethernet(
|
||||
|
@ -133,73 +128,68 @@ class TestNeighborAdvertiser(test_base.DFTestBase):
|
|||
super(TestNeighborAdvertiser, self).setUp()
|
||||
self.topology = None
|
||||
self.policy = None
|
||||
try:
|
||||
# Disable Duplicate Address Detection requests from the interface
|
||||
self.dad_conf = utils.execute(['sysctl', '-n',
|
||||
'net.ipv6.conf.default.accept_dad'])
|
||||
utils.execute(['sysctl', '-w',
|
||||
'net.ipv6.conf.default.accept_dad=0'],
|
||||
run_as_root=True)
|
||||
# Disable Router Solicitation requests from the interface
|
||||
self.router_solicit_conf = utils.execute(
|
||||
['sysctl', '-n', 'net.ipv6.conf.default.router_solicitations'])
|
||||
utils.execute(['sysctl', '-w',
|
||||
'net.ipv6.conf.default.router_solicitations=0'],
|
||||
run_as_root=True)
|
||||
self.topology = app_testing_objects.Topology(
|
||||
self.neutron,
|
||||
self.nb_api)
|
||||
subnet1 = self.topology.create_subnet(cidr='1111:1111:1111::/64')
|
||||
port1 = subnet1.create_port()
|
||||
port2 = subnet1.create_port()
|
||||
time.sleep(const.DEFAULT_RESOURCE_READY_TIMEOUT)
|
||||
# Create Neighbor Solicitation packet
|
||||
ns_packet = self._create_ns_request(
|
||||
src_port=port1.port.get_logical_port(),
|
||||
dst_port=port2.port.get_logical_port(),
|
||||
)
|
||||
send_ns_request = app_testing_objects.SendAction(
|
||||
subnet1.subnet_id,
|
||||
port1.port_id,
|
||||
ns_packet,
|
||||
)
|
||||
ignore_action = app_testing_objects.IgnoreAction()
|
||||
log_action = app_testing_objects.LogAction()
|
||||
key1 = (subnet1.subnet_id, port1.port_id)
|
||||
adv_filter = app_testing_objects.RyuNeighborAdvertisementFilter()
|
||||
port_policies = {
|
||||
key1: app_testing_objects.PortPolicy(
|
||||
rules=[
|
||||
app_testing_objects.PortPolicyRule(
|
||||
# Detect advertisements
|
||||
adv_filter,
|
||||
actions=[
|
||||
log_action,
|
||||
app_testing_objects.StopSimulationAction()
|
||||
]
|
||||
),
|
||||
app_testing_objects.PortPolicyRule(
|
||||
# Filter local VM's Multicast requests
|
||||
app_testing_objects.RyuIpv6MulticastFilter(),
|
||||
actions=[ignore_action]
|
||||
)
|
||||
],
|
||||
default_action=app_testing_objects.RaiseAction(
|
||||
"Unexpected packet"
|
||||
# Disable Duplicate Address Detection requests from the interface
|
||||
self.dad_conf = utils.execute(['sysctl', '-n',
|
||||
'net.ipv6.conf.default.accept_dad'])
|
||||
utils.execute(['sysctl', '-w',
|
||||
'net.ipv6.conf.default.accept_dad=0'],
|
||||
run_as_root=True)
|
||||
# Disable Router Solicitation requests from the interface
|
||||
self.router_solicit_conf = utils.execute(
|
||||
['sysctl', '-n', 'net.ipv6.conf.default.router_solicitations'])
|
||||
utils.execute(['sysctl', '-w',
|
||||
'net.ipv6.conf.default.router_solicitations=0'],
|
||||
run_as_root=True)
|
||||
self.topology = app_testing_objects.Topology(
|
||||
self.neutron,
|
||||
self.nb_api)
|
||||
self.addCleanup(self.topology.close)
|
||||
subnet1 = self.topology.create_subnet(cidr='1111:1111:1111::/64')
|
||||
port1 = subnet1.create_port()
|
||||
port2 = subnet1.create_port()
|
||||
time.sleep(const.DEFAULT_RESOURCE_READY_TIMEOUT)
|
||||
# Create Neighbor Solicitation packet
|
||||
ns_packet = self._create_ns_request(
|
||||
src_port=port1.port.get_logical_port(),
|
||||
dst_port=port2.port.get_logical_port(),
|
||||
)
|
||||
send_ns_request = app_testing_objects.SendAction(
|
||||
subnet1.subnet_id,
|
||||
port1.port_id,
|
||||
ns_packet,
|
||||
)
|
||||
ignore_action = app_testing_objects.IgnoreAction()
|
||||
log_action = app_testing_objects.LogAction()
|
||||
key1 = (subnet1.subnet_id, port1.port_id)
|
||||
adv_filter = app_testing_objects.RyuNeighborAdvertisementFilter()
|
||||
port_policies = {
|
||||
key1: app_testing_objects.PortPolicy(
|
||||
rules=[
|
||||
app_testing_objects.PortPolicyRule(
|
||||
# Detect advertisements
|
||||
adv_filter,
|
||||
actions=[
|
||||
log_action,
|
||||
app_testing_objects.StopSimulationAction()
|
||||
]
|
||||
),
|
||||
app_testing_objects.PortPolicyRule(
|
||||
# Filter local VM's Multicast requests
|
||||
app_testing_objects.RyuIpv6MulticastFilter(),
|
||||
actions=[ignore_action]
|
||||
)
|
||||
),
|
||||
}
|
||||
self.policy = app_testing_objects.Policy(
|
||||
initial_actions=[send_ns_request],
|
||||
port_policies=port_policies,
|
||||
unknown_port_action=ignore_action
|
||||
)
|
||||
except Exception:
|
||||
if self.topology:
|
||||
self.topology.close()
|
||||
raise
|
||||
self.store(self.topology)
|
||||
self.store(self.policy)
|
||||
],
|
||||
default_action=app_testing_objects.RaiseAction(
|
||||
"Unexpected packet"
|
||||
)
|
||||
),
|
||||
}
|
||||
self.policy = app_testing_objects.Policy(
|
||||
initial_actions=[send_ns_request],
|
||||
port_policies=port_policies,
|
||||
unknown_port_action=ignore_action
|
||||
)
|
||||
self.addCleanup(self.policy.close)
|
||||
|
||||
def _create_ns_request(self, src_port, dst_port):
|
||||
ethernet = ryu.lib.packet.ethernet.ethernet(
|
||||
|
|
|
@ -37,26 +37,18 @@ class TestL3App(test_base.DFTestBase):
|
|||
self.topology = None
|
||||
self.policy = None
|
||||
self._ping = None
|
||||
try:
|
||||
self.topology = self.store(
|
||||
app_testing_objects.Topology(
|
||||
self.neutron,
|
||||
self.nb_api
|
||||
)
|
||||
)
|
||||
self.subnet1 = self.topology.create_subnet(cidr='192.168.12.0/24')
|
||||
self.subnet2 = self.topology.create_subnet(cidr='192.168.13.0/24')
|
||||
self.port1 = self.subnet1.create_port()
|
||||
self.port2 = self.subnet2.create_port()
|
||||
self.router = self.topology.create_router([
|
||||
self.subnet1.subnet_id,
|
||||
self.subnet2.subnet_id,
|
||||
])
|
||||
time.sleep(const.DEFAULT_RESOURCE_READY_TIMEOUT)
|
||||
except Exception:
|
||||
if self.topology:
|
||||
self.topology.close()
|
||||
raise
|
||||
self.topology = app_testing_objects.Topology(self.neutron,
|
||||
self.nb_api)
|
||||
self.addCleanup(self.topology.close)
|
||||
self.subnet1 = self.topology.create_subnet(cidr='192.168.12.0/24')
|
||||
self.subnet2 = self.topology.create_subnet(cidr='192.168.13.0/24')
|
||||
self.port1 = self.subnet1.create_port()
|
||||
self.port2 = self.subnet2.create_port()
|
||||
self.router = self.topology.create_router([
|
||||
self.subnet1.subnet_id,
|
||||
self.subnet2.subnet_id,
|
||||
])
|
||||
time.sleep(const.DEFAULT_RESOURCE_READY_TIMEOUT)
|
||||
|
||||
def _create_port_policies(self, connected=True):
|
||||
ignore_action = app_testing_objects.IgnoreAction()
|
||||
|
@ -236,19 +228,18 @@ class TestL3App(test_base.DFTestBase):
|
|||
port_policies = self._create_port_policies()
|
||||
initial_packet = self._create_packet(
|
||||
dst_ip, ryu.lib.packet.ipv4.inet.IPPROTO_ICMP)
|
||||
policy = self.store(
|
||||
app_testing_objects.Policy(
|
||||
initial_actions=[
|
||||
app_testing_objects.SendAction(
|
||||
self.subnet1.subnet_id,
|
||||
self.port1.port_id,
|
||||
initial_packet,
|
||||
),
|
||||
],
|
||||
port_policies=port_policies,
|
||||
unknown_port_action=app_testing_objects.IgnoreAction()
|
||||
)
|
||||
policy = app_testing_objects.Policy(
|
||||
initial_actions=[
|
||||
app_testing_objects.SendAction(
|
||||
self.subnet1.subnet_id,
|
||||
self.port1.port_id,
|
||||
initial_packet,
|
||||
),
|
||||
],
|
||||
port_policies=port_policies,
|
||||
unknown_port_action=app_testing_objects.IgnoreAction()
|
||||
)
|
||||
self.addCleanup(policy.close)
|
||||
apps.start_policy(policy, self.topology,
|
||||
const.DEFAULT_RESOURCE_READY_TIMEOUT)
|
||||
|
||||
|
@ -298,19 +289,18 @@ class TestL3App(test_base.DFTestBase):
|
|||
port_policies = self._create_port_policies(connected=False)
|
||||
initial_packet = self._create_packet(
|
||||
dst_ip, ryu.lib.packet.ipv4.inet.IPPROTO_ICMP)
|
||||
policy = self.store(
|
||||
app_testing_objects.Policy(
|
||||
initial_actions=[
|
||||
app_testing_objects.SendAction(
|
||||
self.subnet1.subnet_id,
|
||||
self.port1.port_id,
|
||||
initial_packet,
|
||||
),
|
||||
],
|
||||
port_policies=port_policies,
|
||||
unknown_port_action=app_testing_objects.IgnoreAction()
|
||||
)
|
||||
policy = app_testing_objects.Policy(
|
||||
initial_actions=[
|
||||
app_testing_objects.SendAction(
|
||||
self.subnet1.subnet_id,
|
||||
self.port1.port_id,
|
||||
initial_packet,
|
||||
),
|
||||
],
|
||||
port_policies=port_policies,
|
||||
unknown_port_action=app_testing_objects.IgnoreAction()
|
||||
)
|
||||
self.addCleanup(policy.close)
|
||||
policy.start(self.topology)
|
||||
# Since there is no OpenFlow in vswitch, we are expecting timeout
|
||||
# exception here.
|
||||
|
@ -405,18 +395,17 @@ class TestL3App(test_base.DFTestBase):
|
|||
self.subnet1.subnet_id,
|
||||
self.port1.port_id,
|
||||
initial_packet)
|
||||
policy = self.store(
|
||||
app_testing_objects.Policy(
|
||||
initial_actions=[
|
||||
send_action,
|
||||
send_action,
|
||||
send_action,
|
||||
send_action
|
||||
],
|
||||
port_policies=port_policy,
|
||||
unknown_port_action=ignore_action
|
||||
)
|
||||
policy = app_testing_objects.Policy(
|
||||
initial_actions=[
|
||||
send_action,
|
||||
send_action,
|
||||
send_action,
|
||||
send_action
|
||||
],
|
||||
port_policies=port_policy,
|
||||
unknown_port_action=ignore_action
|
||||
)
|
||||
self.addCleanup(policy.close)
|
||||
policy.start(self.topology)
|
||||
# Since the rate limit, we expect timeout to wait for 4th packet hit
|
||||
# the policy.
|
||||
|
@ -437,19 +426,18 @@ class TestL3App(test_base.DFTestBase):
|
|||
app_testing_objects.RyuICMPUnreachFilter)
|
||||
initial_packet = self._create_packet(
|
||||
"192.168.12.1", ryu.lib.packet.ipv4.inet.IPPROTO_UDP)
|
||||
policy = self.store(
|
||||
app_testing_objects.Policy(
|
||||
initial_actions=[
|
||||
app_testing_objects.SendAction(
|
||||
self.subnet1.subnet_id,
|
||||
self.port1.port_id,
|
||||
initial_packet,
|
||||
),
|
||||
],
|
||||
port_policies=port_policy,
|
||||
unknown_port_action=ignore_action
|
||||
)
|
||||
policy = app_testing_objects.Policy(
|
||||
initial_actions=[
|
||||
app_testing_objects.SendAction(
|
||||
self.subnet1.subnet_id,
|
||||
self.port1.port_id,
|
||||
initial_packet,
|
||||
),
|
||||
],
|
||||
port_policies=port_policy,
|
||||
unknown_port_action=ignore_action
|
||||
)
|
||||
self.addCleanup(policy.close)
|
||||
apps.start_policy(policy, self.topology,
|
||||
const.DEFAULT_RESOURCE_READY_TIMEOUT)
|
||||
|
||||
|
@ -491,18 +479,17 @@ class TestL3App(test_base.DFTestBase):
|
|||
self.port1.port_id,
|
||||
initial_packet)
|
||||
|
||||
policy = self.store(
|
||||
app_testing_objects.Policy(
|
||||
initial_actions=[
|
||||
send_action,
|
||||
send_action,
|
||||
send_action,
|
||||
send_action
|
||||
],
|
||||
port_policies=port_policy,
|
||||
unknown_port_action=ignore_action
|
||||
)
|
||||
policy = app_testing_objects.Policy(
|
||||
initial_actions=[
|
||||
send_action,
|
||||
send_action,
|
||||
send_action,
|
||||
send_action
|
||||
],
|
||||
port_policies=port_policy,
|
||||
unknown_port_action=ignore_action
|
||||
)
|
||||
self.addCleanup(policy.close)
|
||||
policy.start(self.topology)
|
||||
# Since the rate limit, we expect timeout to wait for 4th packet hit
|
||||
# the policy.
|
||||
|
@ -561,14 +548,13 @@ class TestL3App(test_base.DFTestBase):
|
|||
self.subnet1.subnet_id,
|
||||
self.port1.port_id,
|
||||
initial_packet)
|
||||
policy = self.store(
|
||||
app_testing_objects.Policy(
|
||||
initial_actions=[
|
||||
send_action
|
||||
],
|
||||
port_policies=port_policy,
|
||||
unknown_port_action=ignore_action
|
||||
)
|
||||
policy = app_testing_objects.Policy(
|
||||
initial_actions=[
|
||||
send_action
|
||||
],
|
||||
port_policies=port_policy,
|
||||
unknown_port_action=ignore_action
|
||||
)
|
||||
self.addCleanup(policy.close)
|
||||
apps.start_policy(policy, self.topology,
|
||||
const.DEFAULT_RESOURCE_READY_TIMEOUT)
|
||||
|
|
|
@ -39,34 +39,33 @@ class TestPortSecApp(test_base.DFTestBase):
|
|||
time.sleep(const.DEFAULT_RESOURCE_READY_TIMEOUT)
|
||||
|
||||
port_policies = self._create_port_policies()
|
||||
self.policy = self.store(
|
||||
app_testing_objects.Policy(
|
||||
initial_actions=[
|
||||
app_testing_objects.SendAction(
|
||||
self.subnet.subnet_id,
|
||||
self.port1.port_id,
|
||||
self._create_ping_using_fake_ip
|
||||
),
|
||||
app_testing_objects.SendAction(
|
||||
self.subnet.subnet_id,
|
||||
self.port1.port_id,
|
||||
self._create_ping_using_fake_mac
|
||||
),
|
||||
app_testing_objects.SendAction(
|
||||
self.subnet.subnet_id,
|
||||
self.port1.port_id,
|
||||
self._create_ping_using_vm_ip_mac
|
||||
),
|
||||
app_testing_objects.SendAction(
|
||||
self.subnet.subnet_id,
|
||||
self.port1.port_id,
|
||||
self._create_ping_using_allowed_address_pair
|
||||
),
|
||||
],
|
||||
port_policies=port_policies,
|
||||
unknown_port_action=app_testing_objects.IgnoreAction()
|
||||
)
|
||||
self.policy = app_testing_objects.Policy(
|
||||
initial_actions=[
|
||||
app_testing_objects.SendAction(
|
||||
self.subnet.subnet_id,
|
||||
self.port1.port_id,
|
||||
self._create_ping_using_fake_ip
|
||||
),
|
||||
app_testing_objects.SendAction(
|
||||
self.subnet.subnet_id,
|
||||
self.port1.port_id,
|
||||
self._create_ping_using_fake_mac
|
||||
),
|
||||
app_testing_objects.SendAction(
|
||||
self.subnet.subnet_id,
|
||||
self.port1.port_id,
|
||||
self._create_ping_using_vm_ip_mac
|
||||
),
|
||||
app_testing_objects.SendAction(
|
||||
self.subnet.subnet_id,
|
||||
self.port1.port_id,
|
||||
self._create_ping_using_allowed_address_pair
|
||||
),
|
||||
],
|
||||
port_policies=port_policies,
|
||||
unknown_port_action=app_testing_objects.IgnoreAction()
|
||||
)
|
||||
self.addCleanup(self.policy.close)
|
||||
except Exception:
|
||||
if self.topology:
|
||||
self.topology.close()
|
||||
|
@ -74,9 +73,8 @@ class TestPortSecApp(test_base.DFTestBase):
|
|||
|
||||
def _init_topology(self):
|
||||
network = netaddr.IPNetwork(self.cidr)
|
||||
security_group = self.store(objects.SecGroupTestObj(
|
||||
self.neutron,
|
||||
self.nb_api))
|
||||
security_group = objects.SecGroupTestObj(self.neutron, self.nb_api)
|
||||
self.addCleanup(security_group.close)
|
||||
security_group_id = security_group.create()
|
||||
self.assertTrue(security_group.exists())
|
||||
|
||||
|
@ -94,12 +92,9 @@ class TestPortSecApp(test_base.DFTestBase):
|
|||
ingress_rule_id = security_group.rule_create(
|
||||
secrule=ingress_rule_info)
|
||||
self.assertTrue(security_group.rule_exists(ingress_rule_id))
|
||||
self.topology = self.store(
|
||||
app_testing_objects.Topology(
|
||||
self.neutron,
|
||||
self.nb_api
|
||||
)
|
||||
)
|
||||
self.topology = app_testing_objects.Topology(self.neutron,
|
||||
self.nb_api)
|
||||
self.addCleanup(self.topology.close)
|
||||
|
||||
self.subnet = self.topology.create_subnet(
|
||||
cidr=self.cidr
|
||||
|
|
|
@ -32,43 +32,33 @@ class TestSGApp(test_base.DFTestBase):
|
|||
super(TestSGApp, self).setUp()
|
||||
self.topology = None
|
||||
self.policy = None
|
||||
try:
|
||||
self.security_group = self.store(objects.SecGroupTestObj(
|
||||
self.neutron,
|
||||
self.nb_api))
|
||||
security_group_id = self.security_group.create()
|
||||
self.assertTrue(self.security_group.exists())
|
||||
self.security_group = objects.SecGroupTestObj(self.neutron,
|
||||
self.nb_api)
|
||||
self.addCleanup(self.security_group.close)
|
||||
security_group_id = self.security_group.create()
|
||||
self.assertTrue(self.security_group.exists())
|
||||
|
||||
self.security_group2 = self.store(objects.SecGroupTestObj(
|
||||
self.neutron,
|
||||
self.nb_api))
|
||||
security_group_id2 = self.security_group2.create()
|
||||
self.assertTrue(self.security_group2.exists())
|
||||
self.security_group2 = objects.SecGroupTestObj(self.neutron,
|
||||
self.nb_api)
|
||||
self.addCleanup(self.security_group2.close)
|
||||
security_group_id2 = self.security_group2.create()
|
||||
self.assertTrue(self.security_group2.exists())
|
||||
|
||||
self.security_group3 = self.store(objects.SecGroupTestObj(
|
||||
self.neutron,
|
||||
self.nb_api))
|
||||
security_group_id3 = self.security_group3.create()
|
||||
self.assertTrue(self.security_group3.exists())
|
||||
self.security_group3 = objects.SecGroupTestObj(self.neutron,
|
||||
self.nb_api)
|
||||
self.addCleanup(self.security_group3.close)
|
||||
security_group_id3 = self.security_group3.create()
|
||||
self.assertTrue(self.security_group3.exists())
|
||||
|
||||
self.topology = self.store(
|
||||
app_testing_objects.Topology(
|
||||
self.neutron,
|
||||
self.nb_api
|
||||
)
|
||||
)
|
||||
self.topology = app_testing_objects.Topology(self.neutron, self.nb_api)
|
||||
self.addCleanup(self.topology.close)
|
||||
|
||||
self.active_security_group_id = security_group_id
|
||||
self.inactive_security_group_id = security_group_id2
|
||||
self.allowed_address_pairs_security_group_id = security_group_id3
|
||||
self.active_security_group_id = security_group_id
|
||||
self.inactive_security_group_id = security_group_id2
|
||||
self.allowed_address_pairs_security_group_id = security_group_id3
|
||||
|
||||
self.permit_icmp_request = self._get_icmp_request1
|
||||
self.no_permit_icmp_request = self._get_icmp_request2
|
||||
|
||||
except Exception:
|
||||
if self.topology:
|
||||
self.topology.close()
|
||||
raise
|
||||
self.permit_icmp_request = self._get_icmp_request1
|
||||
self.no_permit_icmp_request = self._get_icmp_request2
|
||||
|
||||
def _setup_subnet(self, cidr):
|
||||
network = netaddr.IPNetwork(cidr)
|
||||
|
@ -147,24 +137,19 @@ class TestSGApp(test_base.DFTestBase):
|
|||
|
||||
port_policies = self._create_port_policies()
|
||||
|
||||
self.policy = self.store(
|
||||
app_testing_objects.Policy(
|
||||
initial_actions=[
|
||||
app_testing_objects.SendAction(
|
||||
self.subnet.subnet_id,
|
||||
self.port1.port_id,
|
||||
packet1.data,
|
||||
),
|
||||
app_testing_objects.SendAction(
|
||||
self.subnet.subnet_id,
|
||||
self.port2.port_id,
|
||||
packet2.data,
|
||||
)
|
||||
],
|
||||
port_policies=port_policies,
|
||||
unknown_port_action=app_testing_objects.IgnoreAction()
|
||||
)
|
||||
self.policy = app_testing_objects.Policy(
|
||||
initial_actions=[
|
||||
app_testing_objects.SendAction(self.subnet.subnet_id,
|
||||
self.port1.port_id,
|
||||
packet1.data),
|
||||
app_testing_objects.SendAction(self.subnet.subnet_id,
|
||||
self.port2.port_id,
|
||||
packet2.data)
|
||||
],
|
||||
port_policies=port_policies,
|
||||
unknown_port_action=app_testing_objects.IgnoreAction()
|
||||
)
|
||||
self.addCleanup(self.policy.close)
|
||||
|
||||
def _create_allowed_address_pairs_policy(self):
|
||||
packet1, self.allowed_address_pairs_icmp_request = \
|
||||
|
@ -172,19 +157,16 @@ class TestSGApp(test_base.DFTestBase):
|
|||
|
||||
port_policies = self._create_allowed_address_pairs_port_policies()
|
||||
|
||||
self.allowed_address_pairs_policy = self.store(
|
||||
app_testing_objects.Policy(
|
||||
initial_actions=[
|
||||
app_testing_objects.SendAction(
|
||||
self.subnet.subnet_id,
|
||||
self.port4.port_id,
|
||||
packet1.data,
|
||||
)
|
||||
],
|
||||
port_policies=port_policies,
|
||||
unknown_port_action=app_testing_objects.IgnoreAction()
|
||||
)
|
||||
self.allowed_address_pairs_policy = app_testing_objects.Policy(
|
||||
initial_actions=[
|
||||
app_testing_objects.SendAction(self.subnet.subnet_id,
|
||||
self.port4.port_id,
|
||||
packet1.data)
|
||||
],
|
||||
port_policies=port_policies,
|
||||
unknown_port_action=app_testing_objects.IgnoreAction()
|
||||
)
|
||||
self.addCleanup(self.allowed_address_pairs_policy.close)
|
||||
|
||||
def _get_icmp_request1(self):
|
||||
return self.icmp_request1
|
||||
|
|
|
@ -61,12 +61,8 @@ class TestSNat(test_base.DFTestBase):
|
|||
const.DEFAULT_RESOURCE_READY_TIMEOUT)
|
||||
|
||||
def _create_topology(self):
|
||||
self.topology = self.store(
|
||||
app_testing_objects.Topology(
|
||||
self.neutron,
|
||||
self.nb_api
|
||||
)
|
||||
)
|
||||
self.topology = app_testing_objects.Topology(self.neutron, self.nb_api)
|
||||
self.addCleanup(self.topology.close)
|
||||
self.subnet1 = self.topology.create_subnet(cidr='192.168.15.0/24')
|
||||
self.port1 = self.subnet1.create_port()
|
||||
self.router = self.topology.create_router([self.subnet1.subnet_id])
|
||||
|
@ -77,19 +73,16 @@ class TestSNat(test_base.DFTestBase):
|
|||
port_policies = self._create_port_policies()
|
||||
initial_packet = self._create_packet(
|
||||
'10.0.1.2', ryu.lib.packet.ipv4.inet.IPPROTO_ICMP)
|
||||
policy = self.store(
|
||||
app_testing_objects.Policy(
|
||||
initial_actions=[
|
||||
app_testing_objects.SendAction(
|
||||
self.subnet1.subnet_id,
|
||||
self.port1.port_id,
|
||||
initial_packet
|
||||
),
|
||||
],
|
||||
port_policies=port_policies,
|
||||
unknown_port_action=app_testing_objects.IgnoreAction()
|
||||
)
|
||||
policy = app_testing_objects.Policy(
|
||||
initial_actions=[
|
||||
app_testing_objects.SendAction(self.subnet1.subnet_id,
|
||||
self.port1.port_id,
|
||||
initial_packet),
|
||||
],
|
||||
port_policies=port_policies,
|
||||
unknown_port_action=app_testing_objects.IgnoreAction()
|
||||
)
|
||||
self.addCleanup(policy.close)
|
||||
return policy
|
||||
|
||||
def _create_port_policies(self):
|
||||
|
|
|
@ -81,13 +81,12 @@ class TestOVSFlowsForActivePortDectionApp(test_base.DFTestBase):
|
|||
if not self.check_app_loaded("active_port_detection"):
|
||||
self.skipTest("ActivePortDetectionApp is not enabled")
|
||||
|
||||
network = self.store(objects.NetworkTestObj(self.neutron, self.nb_api))
|
||||
network = objects.NetworkTestObj(self.neutron, self.nb_api)
|
||||
self.addCleanup(network.close)
|
||||
network_id = network.create(network={'name': 'aap_test'})
|
||||
subnet_obj = self.store(objects.SubnetTestObj(
|
||||
self.neutron,
|
||||
self.nb_api,
|
||||
network_id,
|
||||
))
|
||||
subnet_obj = objects.SubnetTestObj(self.neutron, self.nb_api,
|
||||
network_id)
|
||||
self.addCleanup(subnet_obj.close)
|
||||
|
||||
subnet = {'network_id': network_id,
|
||||
'cidr': '192.168.97.0/24',
|
||||
|
@ -97,7 +96,8 @@ class TestOVSFlowsForActivePortDectionApp(test_base.DFTestBase):
|
|||
'enable_dhcp': True}
|
||||
subnet_obj.create(subnet)
|
||||
|
||||
vm = self.store(objects.VMTestObj(self, self.neutron))
|
||||
vm = objects.VMTestObj(self, self.neutron)
|
||||
self.addCleanup(vm.close)
|
||||
vm_id = vm.create(network=network)
|
||||
|
||||
vm_port_id = self.vswitch_api.get_port_id_by_vm_id(vm_id)
|
||||
|
|
|
@ -68,8 +68,6 @@ class DFTestBase(base.BaseTestCase):
|
|||
self.nb_api.set_db_change_callback(self._db_change_callback)
|
||||
|
||||
self.mgt_ip = self.conf.management_ip
|
||||
self.__objects_to_close = []
|
||||
self.addCleanup(self._close_stored_objects)
|
||||
|
||||
self.vswitch_api = utils.OvsTestApi(self.mgt_ip)
|
||||
self.vswitch_api.initialize(self._db_change_callback)
|
||||
|
@ -123,11 +121,6 @@ class DFTestBase(base.BaseTestCase):
|
|||
return True
|
||||
return False
|
||||
|
||||
def _close_stored_objects(self):
|
||||
while self.__objects_to_close:
|
||||
close_func = self.__objects_to_close.pop()
|
||||
close_func()
|
||||
|
||||
def get_default_subnetpool(self):
|
||||
default_subnetpool = None
|
||||
subnetpool_filter = {'is_default': True,
|
||||
|
@ -147,14 +140,9 @@ class DFTestBase(base.BaseTestCase):
|
|||
self.neutron.create_subnetpool(
|
||||
body={'subnetpool': default_subnetpool})
|
||||
|
||||
def store(self, obj, close_func=None):
|
||||
close_func = close_func if close_func else obj.close
|
||||
self.__objects_to_close.append(close_func)
|
||||
return obj
|
||||
|
||||
def start_subscribing(self):
|
||||
self._topology = self.store(
|
||||
test_objects.Topology(self.neutron, self.nb_api))
|
||||
self._topology = test_objects.Topology(self.neutron, self.nb_api)
|
||||
self.addCleanup(self._topology.close)
|
||||
subnet = self._topology.create_subnet(cidr="192.168.200.0/24")
|
||||
port = subnet.create_port()
|
||||
utils.wait_until_true(
|
||||
|
|
|
@ -40,12 +40,11 @@ class TestDbConsistent(test_base.DFTestBase):
|
|||
|
||||
def test_db_consistent(self):
|
||||
self.db_sync_time = self.conf.db_sync_time
|
||||
network = self.store(objects.NetworkTestObj(self.neutron, self.nb_api))
|
||||
network = objects.NetworkTestObj(self.neutron, self.nb_api)
|
||||
network_id = network.create()
|
||||
self.addCleanup(network.close)
|
||||
topic = network.get_topic()
|
||||
subnet = self.store(objects.SubnetTestObj(self.neutron, self.nb_api,
|
||||
network_id))
|
||||
subnet = objects.SubnetTestObj(self.neutron, self.nb_api, network_id)
|
||||
subnet_body = {'network_id': network_id,
|
||||
'cidr': '10.50.0.0/24',
|
||||
'gateway_ip': '10.50.0.1',
|
||||
|
@ -58,7 +57,7 @@ class TestDbConsistent(test_base.DFTestBase):
|
|||
self.assertTrue(network.exists())
|
||||
self.assertTrue(subnet.exists())
|
||||
|
||||
vm = self.store(objects.VMTestObj(self, self.neutron))
|
||||
vm = objects.VMTestObj(self, self.neutron)
|
||||
vm.create(network=network)
|
||||
self.addCleanup(vm.close)
|
||||
self.assertIsNotNone(vm.server.addresses['mynetwork'])
|
||||
|
|
|
@ -50,7 +50,8 @@ class TestOVSFlowsForDHCP(test_base.DFTestBase):
|
|||
self.assertTrue(found_dhcp_cast_flow)
|
||||
|
||||
def _create_network(self):
|
||||
network = self.store(objects.NetworkTestObj(self.neutron, self.nb_api))
|
||||
network = objects.NetworkTestObj(self.neutron, self.nb_api)
|
||||
self.addCleanup(network.close)
|
||||
network_id = network.create()
|
||||
self.assertTrue(network.exists())
|
||||
lean_lswitch = l2.LogicalSwitch(id=network_id)
|
||||
|
@ -139,7 +140,8 @@ class TestOVSFlowsForDHCP(test_base.DFTestBase):
|
|||
def test_create_router_interface(self):
|
||||
ovs = utils.OvsFlowsParser()
|
||||
flows_before_change = ovs.dump(self.integration_bridge)
|
||||
router = self.store(objects.RouterTestObj(self.neutron, self.nb_api))
|
||||
router = objects.RouterTestObj(self.neutron, self.nb_api)
|
||||
self.addCleanup(router.close)
|
||||
network, network_id, network_key = self._create_network()
|
||||
subnet = {'network_id': network_id,
|
||||
'cidr': '10.30.0.0/24',
|
||||
|
|
|
@ -44,7 +44,8 @@ class TestL2FLows(test_base.DFTestBase):
|
|||
if self._check_tunneling_app_enable() is False:
|
||||
return
|
||||
|
||||
network = self.store(objects.NetworkTestObj(self.neutron, self.nb_api))
|
||||
network = objects.NetworkTestObj(self.neutron, self.nb_api)
|
||||
self.addCleanup(network.close)
|
||||
network_id = network.create()
|
||||
network_params = network.get_network()
|
||||
segmentation_id = network_params['network']['provider:segmentation_id']
|
||||
|
@ -58,7 +59,8 @@ class TestL2FLows(test_base.DFTestBase):
|
|||
self.assertIsNotNone(subnet)
|
||||
|
||||
ovs = utils.OvsFlowsParser()
|
||||
vm = self.store(objects.VMTestObj(self, self.neutron))
|
||||
vm = objects.VMTestObj(self, self.neutron)
|
||||
self.addCleanup(vm.close)
|
||||
vm.create(network=network)
|
||||
ip = vm.get_first_ipv4()
|
||||
self.assertIsNotNone(ip)
|
||||
|
@ -98,7 +100,8 @@ class TestL2FLows(test_base.DFTestBase):
|
|||
return
|
||||
|
||||
# Create network
|
||||
network = self.store(objects.NetworkTestObj(self.neutron, self.nb_api))
|
||||
network = objects.NetworkTestObj(self.neutron, self.nb_api)
|
||||
self.addCleanup(network.close)
|
||||
network_params = {"name": "vlan_1",
|
||||
"provider:network_type": "vlan",
|
||||
"provider:physical_network": physical_network,
|
||||
|
@ -117,7 +120,8 @@ class TestL2FLows(test_base.DFTestBase):
|
|||
|
||||
# Create VM
|
||||
ovs = utils.OvsFlowsParser()
|
||||
vm = self.store(objects.VMTestObj(self, self.neutron))
|
||||
vm = objects.VMTestObj(self, self.neutron)
|
||||
self.addCleanup(vm.close)
|
||||
vm.create(network=network)
|
||||
ip = vm.get_first_ipv4()
|
||||
self.assertIsNotNone(ip)
|
||||
|
@ -271,7 +275,8 @@ class TestL2FLows(test_base.DFTestBase):
|
|||
return
|
||||
|
||||
# Create network
|
||||
network = self.store(objects.NetworkTestObj(self.neutron, self.nb_api))
|
||||
network = objects.NetworkTestObj(self.neutron, self.nb_api)
|
||||
self.addCleanup(network.close)
|
||||
network_params = {"name": "flat_1",
|
||||
"provider:network_type": "flat",
|
||||
"provider:physical_network": physical_network}
|
||||
|
@ -290,7 +295,8 @@ class TestL2FLows(test_base.DFTestBase):
|
|||
|
||||
# Create VM
|
||||
ovs = utils.OvsFlowsParser()
|
||||
vm = self.store(objects.VMTestObj(self, self.neutron))
|
||||
vm = objects.VMTestObj(self, self.neutron)
|
||||
self.addCleanup(vm.close)
|
||||
vm.create(network=network)
|
||||
ip = vm.get_first_ipv4()
|
||||
self.assertIsNotNone(ip)
|
||||
|
@ -474,7 +480,8 @@ class TestL2FLows(test_base.DFTestBase):
|
|||
return None
|
||||
|
||||
def test_vm_multicast(self):
|
||||
network = self.store(objects.NetworkTestObj(self.neutron, self.nb_api))
|
||||
network = objects.NetworkTestObj(self.neutron, self.nb_api)
|
||||
self.addCleanup(network.close)
|
||||
network_id = network.create()
|
||||
subnet = {'network_id': network_id,
|
||||
'cidr': '10.200.0.0/24',
|
||||
|
@ -485,7 +492,8 @@ class TestL2FLows(test_base.DFTestBase):
|
|||
subnet = self.neutron.create_subnet({'subnet': subnet})
|
||||
|
||||
ovs = utils.OvsFlowsParser()
|
||||
vm = self.store(objects.VMTestObj(self, self.neutron))
|
||||
vm = objects.VMTestObj(self, self.neutron)
|
||||
self.addCleanup(vm.close)
|
||||
vm.create(network=network)
|
||||
ip = vm.get_first_ipv4()
|
||||
self.assertIsNotNone(ip)
|
||||
|
|
|
@ -23,20 +23,14 @@ class TestL3Flows(test_base.DFTestBase):
|
|||
def setUp(self):
|
||||
super(TestL3Flows, self).setUp()
|
||||
self.topology = None
|
||||
try:
|
||||
self.topology = app_testing_objects.Topology(
|
||||
self.neutron,
|
||||
self.nb_api)
|
||||
self.subnet1 = self.topology.create_subnet(cidr='192.168.10.0/24')
|
||||
self.port1 = self.subnet1.create_port()
|
||||
self.router = self.topology.create_router([
|
||||
self.subnet1.subnet_id])
|
||||
|
||||
except Exception:
|
||||
if self.topology:
|
||||
self.topology.close()
|
||||
raise
|
||||
self.store(self.topology)
|
||||
self.topology = app_testing_objects.Topology(
|
||||
self.neutron,
|
||||
self.nb_api)
|
||||
self.addCleanup(self.topology.close)
|
||||
self.subnet1 = self.topology.create_subnet(cidr='192.168.10.0/24')
|
||||
self.port1 = self.subnet1.create_port()
|
||||
self.router = self.topology.create_router([
|
||||
self.subnet1.subnet_id])
|
||||
|
||||
def test_router_add_extra_route(self):
|
||||
lport = self.port1.port.get_logical_port()
|
||||
|
|
|
@ -31,14 +31,16 @@ class TestNeutronAPIandDB(test_base.DFTestBase):
|
|||
super(TestNeutronAPIandDB, self).setUp()
|
||||
|
||||
def test_create_network(self):
|
||||
network = self.store(objects.NetworkTestObj(self.neutron, self.nb_api))
|
||||
network = objects.NetworkTestObj(self.neutron, self.nb_api)
|
||||
self.addCleanup(network.close)
|
||||
network.create()
|
||||
self.assertTrue(network.exists())
|
||||
network.close()
|
||||
self.assertFalse(network.exists())
|
||||
|
||||
def test_create_network_with_mtu(self):
|
||||
network = self.store(objects.NetworkTestObj(self.neutron, self.nb_api))
|
||||
network = objects.NetworkTestObj(self.neutron, self.nb_api)
|
||||
self.addCleanup(network.close)
|
||||
network.create()
|
||||
self.assertTrue(network.exists())
|
||||
netobj = network.get_network()
|
||||
|
@ -50,7 +52,8 @@ class TestNeutronAPIandDB(test_base.DFTestBase):
|
|||
self.assertFalse(network.exists())
|
||||
|
||||
def test_dhcp_port_created(self):
|
||||
network = self.store(objects.NetworkTestObj(self.neutron, self.nb_api))
|
||||
network = objects.NetworkTestObj(self.neutron, self.nb_api)
|
||||
self.addCleanup(network.close)
|
||||
network_id = network.create()
|
||||
self.assertTrue(network.exists())
|
||||
subnet = {'network_id': network_id,
|
||||
|
@ -80,14 +83,12 @@ class TestNeutronAPIandDB(test_base.DFTestBase):
|
|||
self.assertEqual(0, dhcp_ports_found)
|
||||
|
||||
def test_create_delete_subnet(self):
|
||||
network = self.store(objects.NetworkTestObj(self.neutron, self.nb_api))
|
||||
network = objects.NetworkTestObj(self.neutron, self.nb_api)
|
||||
self.addCleanup(network.close)
|
||||
network_id = network.create()
|
||||
self.assertTrue(network.exists())
|
||||
subnet = self.store(objects.SubnetTestObj(
|
||||
self.neutron,
|
||||
self.nb_api,
|
||||
network_id,
|
||||
))
|
||||
subnet = objects.SubnetTestObj(self.neutron, self.nb_api, network_id)
|
||||
self.addCleanup(subnet.close)
|
||||
subnet_id = subnet.create()
|
||||
self.assertTrue(subnet.exists())
|
||||
self.assertEqual(subnet_id, subnet.get_subnet().id)
|
||||
|
@ -96,14 +97,12 @@ class TestNeutronAPIandDB(test_base.DFTestBase):
|
|||
network.close()
|
||||
|
||||
def test_create_subnet_with_host_routes(self):
|
||||
network = self.store(objects.NetworkTestObj(self.neutron, self.nb_api))
|
||||
network = objects.NetworkTestObj(self.neutron, self.nb_api)
|
||||
self.addCleanup(network.close)
|
||||
network_id = network.create()
|
||||
self.assertTrue(network.exists())
|
||||
subnet = self.store(objects.SubnetTestObj(
|
||||
self.neutron,
|
||||
self.nb_api,
|
||||
network_id,
|
||||
))
|
||||
subnet = objects.SubnetTestObj(self.neutron, self.nb_api, network_id)
|
||||
self.addCleanup(subnet.close)
|
||||
subnet_data = {
|
||||
'cidr': '192.168.199.0/24',
|
||||
'ip_version': 4,
|
||||
|
@ -126,7 +125,8 @@ class TestNeutronAPIandDB(test_base.DFTestBase):
|
|||
for host_route in db_subnet.host_routes])
|
||||
|
||||
def test_create_delete_router(self):
|
||||
router = self.store(objects.RouterTestObj(self.neutron, self.nb_api))
|
||||
router = objects.RouterTestObj(self.neutron, self.nb_api)
|
||||
self.addCleanup(router.close)
|
||||
router_id = router.create()
|
||||
self.assertTrue(router.exists())
|
||||
version1 = self.nb_api.get(l3.LogicalRouter(id=router_id)).version
|
||||
|
@ -138,15 +138,14 @@ class TestNeutronAPIandDB(test_base.DFTestBase):
|
|||
self.assertFalse(router.exists())
|
||||
|
||||
def test_create_router_interface(self):
|
||||
router = self.store(objects.RouterTestObj(self.neutron, self.nb_api))
|
||||
network = self.store(objects.NetworkTestObj(self.neutron, self.nb_api))
|
||||
router = objects.RouterTestObj(self.neutron, self.nb_api)
|
||||
self.addCleanup(router.close)
|
||||
network = objects.NetworkTestObj(self.neutron, self.nb_api)
|
||||
self.addCleanup(network.close)
|
||||
network_id = network.create()
|
||||
self.assertTrue(network.exists())
|
||||
subnet = self.store(objects.SubnetTestObj(
|
||||
self.neutron,
|
||||
self.nb_api,
|
||||
network_id,
|
||||
))
|
||||
subnet = objects.SubnetTestObj(self.neutron, self.nb_api, network_id)
|
||||
self.addCleanup(subnet.close)
|
||||
subnet_id = subnet.create()
|
||||
router_id = router.create()
|
||||
self.assertTrue(router.exists())
|
||||
|
@ -165,11 +164,12 @@ class TestNeutronAPIandDB(test_base.DFTestBase):
|
|||
self.assertFalse(network.exists())
|
||||
|
||||
def test_create_port(self):
|
||||
network = self.store(objects.NetworkTestObj(self.neutron, self.nb_api))
|
||||
network = objects.NetworkTestObj(self.neutron, self.nb_api)
|
||||
self.addCleanup(network.close)
|
||||
network_id = network.create()
|
||||
self.assertTrue(network.exists())
|
||||
port = self.store(objects.PortTestObj(self.neutron,
|
||||
self.nb_api, network_id))
|
||||
port = objects.PortTestObj(self.neutron, self.nb_api, network_id)
|
||||
self.addCleanup(port.close)
|
||||
port.create()
|
||||
self.assertTrue(port.exists())
|
||||
self.assertEqual(network_id, port.get_logical_port().lswitch.id)
|
||||
|
@ -179,18 +179,18 @@ class TestNeutronAPIandDB(test_base.DFTestBase):
|
|||
self.assertFalse(network.exists())
|
||||
|
||||
def test_create_port_with_qospolicy(self):
|
||||
network = self.store(objects.NetworkTestObj(self.neutron, self.nb_api))
|
||||
network = objects.NetworkTestObj(self.neutron, self.nb_api)
|
||||
self.addCleanup(network.close)
|
||||
network_id = network.create()
|
||||
self.assertTrue(network.exists())
|
||||
|
||||
qospolicy = self.store(objects.QosPolicyTestObj(self.neutron,
|
||||
self.nb_api))
|
||||
qospolicy = objects.QosPolicyTestObj(self.neutron, self.nb_api)
|
||||
self.addCleanup(qospolicy.close)
|
||||
qos_policy_id = qospolicy.create()
|
||||
self.assertTrue(qospolicy.exists())
|
||||
|
||||
port = self.store(objects.PortTestObj(self.neutron,
|
||||
self.nb_api,
|
||||
network_id))
|
||||
port = objects.PortTestObj(self.neutron, self.nb_api, network_id)
|
||||
self.addCleanup(port.close)
|
||||
port_param = {
|
||||
'admin_state_up': True,
|
||||
'name': 'port1',
|
||||
|
@ -210,18 +210,18 @@ class TestNeutronAPIandDB(test_base.DFTestBase):
|
|||
self.assertFalse(qospolicy.exists())
|
||||
|
||||
def test_update_port_with_qospolicy(self):
|
||||
network = self.store(objects.NetworkTestObj(self.neutron, self.nb_api))
|
||||
network = objects.NetworkTestObj(self.neutron, self.nb_api)
|
||||
self.addCleanup(network.close)
|
||||
network_id = network.create()
|
||||
self.assertTrue(network.exists())
|
||||
|
||||
qospolicy = self.store(objects.QosPolicyTestObj(self.neutron,
|
||||
self.nb_api))
|
||||
qospolicy = objects.QosPolicyTestObj(self.neutron, self.nb_api)
|
||||
self.addCleanup(qospolicy.close)
|
||||
qos_policy_id = qospolicy.create()
|
||||
self.assertTrue(qospolicy.exists())
|
||||
|
||||
port = self.store(objects.PortTestObj(self.neutron,
|
||||
self.nb_api,
|
||||
network_id))
|
||||
port = objects.PortTestObj(self.neutron, self.nb_api, network_id)
|
||||
self.addCleanup(port.close)
|
||||
port.create()
|
||||
self.assertTrue(port.exists())
|
||||
|
||||
|
@ -242,15 +242,14 @@ class TestNeutronAPIandDB(test_base.DFTestBase):
|
|||
self.assertFalse(qospolicy.exists())
|
||||
|
||||
def test_delete_router_interface_port(self):
|
||||
router = self.store(objects.RouterTestObj(self.neutron, self.nb_api))
|
||||
network = self.store(objects.NetworkTestObj(self.neutron, self.nb_api))
|
||||
router = objects.RouterTestObj(self.neutron, self.nb_api)
|
||||
self.addCleanup(router.close)
|
||||
network = objects.NetworkTestObj(self.neutron, self.nb_api)
|
||||
self.addCleanup(network.close)
|
||||
network_id = network.create()
|
||||
self.assertTrue(network.exists())
|
||||
subnet = self.store(objects.SubnetTestObj(
|
||||
self.neutron,
|
||||
self.nb_api,
|
||||
network_id,
|
||||
))
|
||||
subnet = objects.SubnetTestObj(self.neutron, self.nb_api, network_id)
|
||||
self.addCleanup(subnet.close)
|
||||
subnet_id = subnet.create({
|
||||
'cidr': '91.126.188.0/24',
|
||||
'ip_version': 4,
|
||||
|
@ -286,8 +285,8 @@ class TestNeutronAPIandDB(test_base.DFTestBase):
|
|||
self.assertFalse(network.exists())
|
||||
|
||||
def test_create_delete_security_group(self):
|
||||
secgroup = self.store(
|
||||
objects.SecGroupTestObj(self.neutron, self.nb_api))
|
||||
secgroup = objects.SecGroupTestObj(self.neutron, self.nb_api)
|
||||
self.addCleanup(secgroup.close)
|
||||
sg_id = secgroup.create()
|
||||
self.assertTrue(secgroup.exists())
|
||||
secgroup_obj = secgroups.SecurityGroup(id=sg_id)
|
||||
|
@ -301,8 +300,8 @@ class TestNeutronAPIandDB(test_base.DFTestBase):
|
|||
self.assertFalse(secgroup.exists())
|
||||
|
||||
def test_create_delete_security_group_rule(self):
|
||||
secgroup = self.store(
|
||||
objects.SecGroupTestObj(self.neutron, self.nb_api))
|
||||
secgroup = objects.SecGroupTestObj(self.neutron, self.nb_api)
|
||||
self.addCleanup(secgroup.close)
|
||||
secgroup.create()
|
||||
self.assertTrue(secgroup.exists())
|
||||
secrule_id = secgroup.rule_create()
|
||||
|
@ -313,8 +312,8 @@ class TestNeutronAPIandDB(test_base.DFTestBase):
|
|||
self.assertFalse(secgroup.exists())
|
||||
|
||||
def test_create_delete_qos_policy(self):
|
||||
qospolicy = self.store(
|
||||
objects.QosPolicyTestObj(self.neutron, self.nb_api))
|
||||
qospolicy = objects.QosPolicyTestObj(self.neutron, self.nb_api)
|
||||
self.addCleanup(qospolicy.close)
|
||||
policy_id = qospolicy.create()
|
||||
self.assertTrue(qospolicy.exists())
|
||||
rule = {'max_kbps': '1000', 'max_burst_kbps': '100'}
|
||||
|
@ -328,16 +327,14 @@ class TestNeutronAPIandDB(test_base.DFTestBase):
|
|||
external_net = objects.find_first_network(self.neutron,
|
||||
{'router:external': True})
|
||||
if not external_net:
|
||||
network = self.store(
|
||||
objects.NetworkTestObj(self.neutron, self.nb_api))
|
||||
network = objects.NetworkTestObj(self.neutron, self.nb_api)
|
||||
self.addCleanup(network.close)
|
||||
external_net_para = {'name': 'public', 'router:external': True}
|
||||
external_network_id = network.create(network=external_net_para)
|
||||
self.assertTrue(network.exists())
|
||||
ext_subnet = self.store(objects.SubnetTestObj(
|
||||
self.neutron,
|
||||
self.nb_api,
|
||||
external_network_id,
|
||||
))
|
||||
ext_subnet = objects.SubnetTestObj(self.neutron, self.nb_api,
|
||||
external_network_id)
|
||||
self.addCleanup(ext_subnet.close)
|
||||
external_subnet_para = {'cidr': '192.168.199.0/24',
|
||||
'ip_version': 4,
|
||||
'network_id': external_network_id}
|
||||
|
@ -359,10 +356,10 @@ class TestNeutronAPIandDB(test_base.DFTestBase):
|
|||
@lockutils.synchronized('need-external-net')
|
||||
def test_associate_floatingip(self):
|
||||
with self._prepare_ext_net() as external_network_id:
|
||||
router = self.store(
|
||||
objects.RouterTestObj(self.neutron, self.nb_api))
|
||||
fip = self.store(
|
||||
objects.FloatingipTestObj(self.neutron, self.nb_api))
|
||||
router = objects.RouterTestObj(self.neutron, self.nb_api)
|
||||
self.addCleanup(router.close)
|
||||
fip = objects.FloatingipTestObj(self.neutron, self.nb_api)
|
||||
self.addCleanup(fip.close)
|
||||
|
||||
router_para = {
|
||||
'name': 'myrouter1', 'admin_state_up': True,
|
||||
|
@ -371,17 +368,15 @@ class TestNeutronAPIandDB(test_base.DFTestBase):
|
|||
self.assertTrue(router.exists())
|
||||
|
||||
# private network
|
||||
private_network = self.store(
|
||||
objects.NetworkTestObj(self.neutron, self.nb_api))
|
||||
private_network = objects.NetworkTestObj(self.neutron, self.nb_api)
|
||||
self.addCleanup(private_network.close)
|
||||
private_network_id = private_network.create()
|
||||
self.assertTrue(private_network.exists())
|
||||
|
||||
# private subnet
|
||||
priv_subnet = self.store(objects.SubnetTestObj(
|
||||
self.neutron,
|
||||
self.nb_api,
|
||||
private_network_id,
|
||||
))
|
||||
priv_subnet = objects.SubnetTestObj(self.neutron, self.nb_api,
|
||||
private_network_id)
|
||||
self.addCleanup(priv_subnet.close)
|
||||
private_subnet_para = {'cidr': '10.0.0.0/24',
|
||||
'ip_version': 4,
|
||||
'network_id': private_network_id}
|
||||
|
@ -392,9 +387,9 @@ class TestNeutronAPIandDB(test_base.DFTestBase):
|
|||
l2.LogicalPort(id=router_interface['port_id']))
|
||||
self.assertIsNotNone(router_lport)
|
||||
|
||||
port = self.store(
|
||||
objects.PortTestObj(self.neutron,
|
||||
self.nb_api, private_network_id))
|
||||
port = objects.PortTestObj(self.neutron, self.nb_api,
|
||||
private_network_id)
|
||||
self.addCleanup(port.close)
|
||||
port_id = port.create()
|
||||
self.assertIsNotNone(port.get_logical_port())
|
||||
|
||||
|
@ -420,10 +415,10 @@ class TestNeutronAPIandDB(test_base.DFTestBase):
|
|||
@lockutils.synchronized('need-external-net')
|
||||
def test_disassociate_floatingip(self):
|
||||
with self._prepare_ext_net() as external_network_id:
|
||||
router = self.store(
|
||||
objects.RouterTestObj(self.neutron, self.nb_api))
|
||||
fip = self.store(
|
||||
objects.FloatingipTestObj(self.neutron, self.nb_api))
|
||||
router = objects.RouterTestObj(self.neutron, self.nb_api)
|
||||
self.addCleanup(router.close)
|
||||
fip = objects.FloatingipTestObj(self.neutron, self.nb_api)
|
||||
self.addCleanup(fip.close)
|
||||
|
||||
router_para = {
|
||||
'name': 'myrouter1', 'admin_state_up': True,
|
||||
|
@ -432,16 +427,14 @@ class TestNeutronAPIandDB(test_base.DFTestBase):
|
|||
self.assertTrue(router.exists())
|
||||
|
||||
# private network
|
||||
private_network = self.store(
|
||||
objects.NetworkTestObj(self.neutron, self.nb_api))
|
||||
private_network = objects.NetworkTestObj(self.neutron, self.nb_api)
|
||||
self.addCleanup(private_network.close)
|
||||
private_network_id = private_network.create()
|
||||
self.assertTrue(private_network.exists())
|
||||
# private subnet
|
||||
priv_subnet = self.store(objects.SubnetTestObj(
|
||||
self.neutron,
|
||||
self.nb_api,
|
||||
private_network_id,
|
||||
))
|
||||
priv_subnet = objects.SubnetTestObj(self.neutron, self.nb_api,
|
||||
private_network_id)
|
||||
self.addCleanup(priv_subnet.close)
|
||||
private_subnet_para = {'cidr': '10.0.0.0/24',
|
||||
'ip_version': 4,
|
||||
'network_id': private_network_id}
|
||||
|
@ -452,9 +445,9 @@ class TestNeutronAPIandDB(test_base.DFTestBase):
|
|||
l2.LogicalPort(id=router_interface['port_id']))
|
||||
self.assertIsNotNone(router_lport)
|
||||
|
||||
port = self.store(
|
||||
objects.PortTestObj(self.neutron,
|
||||
self.nb_api, private_network_id))
|
||||
port = objects.PortTestObj(self.neutron, self.nb_api,
|
||||
private_network_id)
|
||||
self.addCleanup(port.close)
|
||||
port_id = port.create()
|
||||
self.assertIsNotNone(port.get_logical_port())
|
||||
|
||||
|
@ -479,22 +472,20 @@ class TestNeutronAPIandDB(test_base.DFTestBase):
|
|||
self.assertFalse(priv_subnet.exists())
|
||||
|
||||
def test_enable_disable_portsec(self):
|
||||
network = self.store(objects.NetworkTestObj(self.neutron, self.nb_api))
|
||||
network = objects.NetworkTestObj(self.neutron, self.nb_api)
|
||||
self.addCleanup(network.close)
|
||||
network_id = network.create()
|
||||
self.assertTrue(network.exists())
|
||||
|
||||
network2 = self.store(objects.NetworkTestObj(self.neutron,
|
||||
self.nb_api))
|
||||
network2 = objects.NetworkTestObj(self.neutron, self.nb_api)
|
||||
self.addCleanup(network2.close)
|
||||
network_id2 = network2.create({'name': 'mynetwork1',
|
||||
'admin_state_up': True,
|
||||
'port_security_enabled': False})
|
||||
self.assertTrue(network2.exists())
|
||||
|
||||
subnet = self.store(objects.SubnetTestObj(
|
||||
self.neutron,
|
||||
self.nb_api,
|
||||
network_id,
|
||||
))
|
||||
subnet = objects.SubnetTestObj(self.neutron, self.nb_api, network_id)
|
||||
self.addCleanup(subnet.close)
|
||||
subnet.create({
|
||||
'cidr': '192.168.125.0/24',
|
||||
'ip_version': 4,
|
||||
|
@ -502,11 +493,8 @@ class TestNeutronAPIandDB(test_base.DFTestBase):
|
|||
})
|
||||
self.assertTrue(subnet.exists())
|
||||
|
||||
subnet2 = self.store(objects.SubnetTestObj(
|
||||
self.neutron,
|
||||
self.nb_api,
|
||||
network_id2,
|
||||
))
|
||||
subnet2 = objects.SubnetTestObj(self.neutron, self.nb_api, network_id2)
|
||||
self.addCleanup(subnet2.close)
|
||||
subnet2.create({
|
||||
'cidr': '192.168.126.0/24',
|
||||
'ip_version': 4,
|
||||
|
@ -515,8 +503,8 @@ class TestNeutronAPIandDB(test_base.DFTestBase):
|
|||
self.assertTrue(subnet2.exists())
|
||||
|
||||
network_portsec_switch = True
|
||||
port = self.store(
|
||||
objects.PortTestObj(self.neutron, self.nb_api, network_id))
|
||||
port = objects.PortTestObj(self.neutron, self.nb_api, network_id)
|
||||
self.addCleanup(port.close)
|
||||
port.create()
|
||||
lport = port.get_logical_port()
|
||||
self.assertIsNotNone(lport)
|
||||
|
@ -524,16 +512,16 @@ class TestNeutronAPIandDB(test_base.DFTestBase):
|
|||
self.assertEqual(network_portsec_switch, real_switch)
|
||||
|
||||
network_portsec_switch = False
|
||||
port = self.store(
|
||||
objects.PortTestObj(self.neutron, self.nb_api, network_id2))
|
||||
port = objects.PortTestObj(self.neutron, self.nb_api, network_id2)
|
||||
self.addCleanup(port.close)
|
||||
port.create()
|
||||
lport = port.get_logical_port()
|
||||
self.assertIsNotNone(lport)
|
||||
real_switch = lport.port_security_enabled
|
||||
self.assertEqual(network_portsec_switch, real_switch)
|
||||
|
||||
port = self.store(
|
||||
objects.PortTestObj(self.neutron, self.nb_api, network_id))
|
||||
port = objects.PortTestObj(self.neutron, self.nb_api, network_id)
|
||||
self.addCleanup(port.close)
|
||||
expected_switch = False
|
||||
port.create({
|
||||
'admin_state_up': True,
|
||||
|
@ -554,15 +542,13 @@ class TestNeutronAPIandDB(test_base.DFTestBase):
|
|||
self.assertEqual(expected_switch, real_switch)
|
||||
|
||||
def test_add_remove_allowed_address_pairs(self):
|
||||
network = self.store(objects.NetworkTestObj(self.neutron, self.nb_api))
|
||||
network = objects.NetworkTestObj(self.neutron, self.nb_api)
|
||||
self.addCleanup(network.close)
|
||||
network_id = network.create()
|
||||
self.assertTrue(network.exists())
|
||||
|
||||
subnet = self.store(objects.SubnetTestObj(
|
||||
self.neutron,
|
||||
self.nb_api,
|
||||
network_id,
|
||||
))
|
||||
subnet = objects.SubnetTestObj(self.neutron, self.nb_api, network_id)
|
||||
self.addCleanup(subnet.close)
|
||||
subnet.create({
|
||||
'cidr': '192.168.127.0/24',
|
||||
'ip_version': 4,
|
||||
|
@ -570,8 +556,8 @@ class TestNeutronAPIandDB(test_base.DFTestBase):
|
|||
})
|
||||
self.assertTrue(subnet.exists())
|
||||
|
||||
port = self.store(
|
||||
objects.PortTestObj(self.neutron, self.nb_api, network_id))
|
||||
port = objects.PortTestObj(self.neutron, self.nb_api, network_id)
|
||||
self.addCleanup(port.close)
|
||||
expected_pairs = [
|
||||
{"ip_address": "192.168.127.201",
|
||||
"mac_address": "00:22:33:44:55:66"},
|
||||
|
@ -602,26 +588,26 @@ class TestNeutronAPIandDB(test_base.DFTestBase):
|
|||
self.assertItemsEqual(expected_pairs, real_pairs)
|
||||
|
||||
def test_create_delete_bgp_peer(self):
|
||||
bgp_peer = self.store(
|
||||
objects.BGPPeerTestObj(self.neutron, self.nb_api))
|
||||
bgp_peer = objects.BGPPeerTestObj(self.neutron, self.nb_api)
|
||||
self.addCleanup(bgp_peer.close)
|
||||
bgp_peer.create()
|
||||
self.assertTrue(bgp_peer.exists())
|
||||
bgp_peer.close()
|
||||
self.assertFalse(bgp_peer.exists())
|
||||
|
||||
def test_create_delete_bgp_speaker(self):
|
||||
bgp_speaker = self.store(
|
||||
objects.BGPSpeakerTestObj(self.neutron, self.nb_api))
|
||||
bgp_speaker = objects.BGPSpeakerTestObj(self.neutron, self.nb_api)
|
||||
self.addCleanup(bgp_speaker.close)
|
||||
bgp_speaker.create()
|
||||
self.assertTrue(bgp_speaker.exists())
|
||||
bgp_speaker.close()
|
||||
self.assertFalse(bgp_speaker.exists())
|
||||
|
||||
def test_add_remove_bgp_peer(self):
|
||||
bgp_peer = self.store(
|
||||
objects.BGPPeerTestObj(self.neutron, self.nb_api))
|
||||
bgp_speaker = self.store(
|
||||
objects.BGPSpeakerTestObj(self.neutron, self.nb_api))
|
||||
bgp_peer = objects.BGPPeerTestObj(self.neutron, self.nb_api)
|
||||
self.addCleanup(bgp_peer.close)
|
||||
bgp_speaker = objects.BGPSpeakerTestObj(self.neutron, self.nb_api)
|
||||
self.addCleanup(bgp_speaker.close)
|
||||
bgp_peer.create()
|
||||
bgp_speaker.create()
|
||||
bgp_speaker.add_peer(bgp_peer.peer_id)
|
||||
|
@ -635,10 +621,10 @@ class TestNeutronAPIandDB(test_base.DFTestBase):
|
|||
self.assertNotIn(bgp_peer.peer_id, nb_bgp_speaker.peers)
|
||||
|
||||
def test_delete_bgp_peer_update_bgp_speaker(self):
|
||||
bgp_peer = self.store(
|
||||
objects.BGPPeerTestObj(self.neutron, self.nb_api))
|
||||
bgp_speaker = self.store(
|
||||
objects.BGPSpeakerTestObj(self.neutron, self.nb_api))
|
||||
bgp_peer = objects.BGPPeerTestObj(self.neutron, self.nb_api)
|
||||
self.addCleanup(bgp_peer.close)
|
||||
bgp_speaker = objects.BGPSpeakerTestObj(self.neutron, self.nb_api)
|
||||
self.addCleanup(bgp_speaker.close)
|
||||
bgp_peer.create()
|
||||
bgp_speaker.create()
|
||||
bgp_speaker.add_peer(bgp_peer.peer_id)
|
||||
|
@ -653,52 +639,51 @@ class TestNeutronAPIandDB(test_base.DFTestBase):
|
|||
|
||||
@lockutils.synchronized('need-external-net')
|
||||
def test_add_remove_bgp_network(self):
|
||||
bgp_speaker = self.store(
|
||||
objects.BGPSpeakerTestObj(self.neutron, self.nb_api))
|
||||
bgp_speaker = objects.BGPSpeakerTestObj(self.neutron, self.nb_api)
|
||||
self.addCleanup(bgp_speaker.close)
|
||||
bgp_speaker.create()
|
||||
address_scope = self.store(
|
||||
objects.AddressScopeTestObj(self.neutron, self.nb_api))
|
||||
address_scope = objects.AddressScopeTestObj(self.neutron, self.nb_api)
|
||||
self.addCleanup(address_scope.close)
|
||||
as_id = address_scope.create()
|
||||
private_subnetpool = self.store(
|
||||
objects.SubnetPoolTestObj(self.neutron, self.nb_api))
|
||||
private_subnetpool = objects.SubnetPoolTestObj(self.neutron,
|
||||
self.nb_api)
|
||||
self.addCleanup(private_subnetpool.close)
|
||||
private_sp_id = private_subnetpool.create(
|
||||
subnetpool={'name': "private_sp",
|
||||
'default_prefixlen': 24,
|
||||
'prefixes': ["20.0.0.0/8"],
|
||||
'address_scope_id': as_id})
|
||||
public_subnetpool = self.store(
|
||||
objects.SubnetPoolTestObj(self.neutron, self.nb_api))
|
||||
public_subnetpool = objects.SubnetPoolTestObj(self.neutron,
|
||||
self.nb_api)
|
||||
self.addCleanup(public_subnetpool.close)
|
||||
public_sp_id = public_subnetpool.create(
|
||||
subnetpool={'name': "public_sp",
|
||||
'default_prefixlen': 24,
|
||||
'prefixes': ["172.24.4.0/24"],
|
||||
'address_scope_id': as_id})
|
||||
public_network = self.store(
|
||||
objects.NetworkTestObj(self.neutron, self.nb_api))
|
||||
public_network = objects.NetworkTestObj(self.neutron, self.nb_api)
|
||||
self.addCleanup(public_network.close)
|
||||
public_network_id = public_network.create(
|
||||
network={'name': 'public', 'router:external': True})
|
||||
public_subnet = self.store(objects.SubnetTestObj(
|
||||
self.neutron,
|
||||
self.nb_api,
|
||||
public_network_id,
|
||||
))
|
||||
public_subnet = objects.SubnetTestObj(self.neutron, self.nb_api,
|
||||
public_network_id)
|
||||
self.addCleanup(public_subnet.close)
|
||||
public_subnet.create(subnet={'ip_version': 4,
|
||||
'network_id': public_network_id,
|
||||
'subnetpool_id': public_sp_id})
|
||||
private_network = self.store(
|
||||
objects.NetworkTestObj(self.neutron, self.nb_api))
|
||||
private_network = objects.NetworkTestObj(self.neutron, self.nb_api)
|
||||
self.addCleanup(private_network.close)
|
||||
private_network_id = private_network.create(network={'name': "public"})
|
||||
private_subnet = self.store(objects.SubnetTestObj(
|
||||
self.neutron,
|
||||
self.nb_api,
|
||||
private_network_id,
|
||||
))
|
||||
private_subnet = objects.SubnetTestObj(self.neutron, self.nb_api,
|
||||
private_network_id)
|
||||
self.addCleanup(private_subnet.close)
|
||||
private_sn_id = private_subnet.create(
|
||||
subnet={'ip_version': 4,
|
||||
'network_id': private_network_id,
|
||||
'subnetpool_id': private_sp_id})
|
||||
bgp_speaker.add_network(public_network_id)
|
||||
router = self.store(objects.RouterTestObj(self.neutron, self.nb_api))
|
||||
router = objects.RouterTestObj(self.neutron, self.nb_api)
|
||||
self.addCleanup(router.close)
|
||||
router_id = router.create()
|
||||
self.neutron.add_interface_router(
|
||||
router_id, body={'subnet_id': private_sn_id})
|
||||
|
@ -708,11 +693,13 @@ class TestNeutronAPIandDB(test_base.DFTestBase):
|
|||
nb_bgp_speaker = bgp_speaker.get_nb_bgp_speaker()
|
||||
self.assertEqual(1, len(nb_bgp_speaker.prefix_routes))
|
||||
|
||||
vm = self.store(objects.VMTestObj(self, self.neutron))
|
||||
vm = objects.VMTestObj(self, self.neutron)
|
||||
self.addCleanup(vm.close)
|
||||
vm_id = vm.create(network=private_network)
|
||||
vm_port = self.neutron.list_ports(device_id=vm_id).get('ports')[0]
|
||||
vm_port_id = vm_port.get('id')
|
||||
fip = self.store(objects.FloatingipTestObj(self.neutron, self.nb_api))
|
||||
fip = objects.FloatingipTestObj(self.neutron, self.nb_api)
|
||||
self.addCleanup(fip.close)
|
||||
fip.create({'floating_network_id': public_network_id,
|
||||
'port_id': vm_port_id})
|
||||
fip_addr = fip.get_floatingip().floating_ip_address
|
||||
|
|
|
@ -28,14 +28,15 @@ class TestObjectVersion(test_base.DFTestBase):
|
|||
super(TestObjectVersion, self).setUp()
|
||||
|
||||
def test_network_version(self):
|
||||
network = self.store(objects.NetworkTestObj(self.neutron, self.nb_api))
|
||||
network = objects.NetworkTestObj(self.neutron, self.nb_api)
|
||||
self.addCleanup(network.close)
|
||||
network_id = network.create()
|
||||
lean_lswitch = l2.LogicalSwitch(id=network_id)
|
||||
self.assertTrue(network.exists())
|
||||
version = self.nb_api.get(lean_lswitch).version
|
||||
|
||||
subnet = self.store(objects.SubnetTestObj(
|
||||
self.neutron, self.nb_api, network_id))
|
||||
subnet = objects.SubnetTestObj(self.neutron, self.nb_api, network_id)
|
||||
self.addCleanup(subnet.close)
|
||||
subnet.create()
|
||||
self.assertTrue(subnet.exists())
|
||||
new_version = self.nb_api.get(lean_lswitch).version
|
||||
|
@ -51,17 +52,18 @@ class TestObjectVersion(test_base.DFTestBase):
|
|||
self.assertFalse(network.exists())
|
||||
|
||||
def test_port_version(self):
|
||||
network = self.store(objects.NetworkTestObj(self.neutron, self.nb_api))
|
||||
network = objects.NetworkTestObj(self.neutron, self.nb_api)
|
||||
self.addCleanup(network.close)
|
||||
network_id = network.create()
|
||||
self.assertTrue(network.exists())
|
||||
|
||||
subnet = self.store(objects.SubnetTestObj(
|
||||
self.neutron, self.nb_api, network_id))
|
||||
subnet = objects.SubnetTestObj(self.neutron, self.nb_api, network_id)
|
||||
self.addCleanup(subnet.close)
|
||||
subnet.create()
|
||||
self.assertTrue(subnet.exists())
|
||||
|
||||
port = self.store(objects.PortTestObj(
|
||||
self.neutron, self.nb_api, network_id))
|
||||
port = objects.PortTestObj(self.neutron, self.nb_api, network_id)
|
||||
self.addCleanup(port.close)
|
||||
port_id = port.create()
|
||||
self.assertTrue(port.exists())
|
||||
prev_version = self.nb_api.get(l2.LogicalPort(id=port_id)).version
|
||||
|
@ -79,17 +81,16 @@ class TestObjectVersion(test_base.DFTestBase):
|
|||
self.assertFalse(network.exists())
|
||||
|
||||
def test_router_version(self):
|
||||
network = self.store(objects.NetworkTestObj(self.neutron, self.nb_api))
|
||||
network = objects.NetworkTestObj(self.neutron, self.nb_api)
|
||||
self.addCleanup(network.close)
|
||||
network_id = network.create()
|
||||
self.assertTrue(network.exists())
|
||||
subnet = self.store(objects.SubnetTestObj(
|
||||
self.neutron,
|
||||
self.nb_api,
|
||||
network_id,
|
||||
))
|
||||
subnet = objects.SubnetTestObj(self.neutron, self.nb_api, network_id)
|
||||
self.addCleanup(subnet.close)
|
||||
subnet_id = subnet.create()
|
||||
self.assertTrue(subnet.exists())
|
||||
router = self.store(objects.RouterTestObj(self.neutron, self.nb_api))
|
||||
router = objects.RouterTestObj(self.neutron, self.nb_api)
|
||||
self.addCleanup(router.close)
|
||||
router_id = router.create()
|
||||
self.assertTrue(router.exists())
|
||||
prev_version = self.nb_api.get(l3.LogicalRouter(id=router_id)).version
|
||||
|
@ -112,8 +113,8 @@ class TestObjectVersion(test_base.DFTestBase):
|
|||
self.assertFalse(network.exists())
|
||||
|
||||
def test_sg_version(self):
|
||||
secgroup = self.store(
|
||||
objects.SecGroupTestObj(self.neutron, self.nb_api))
|
||||
secgroup = objects.SecGroupTestObj(self.neutron, self.nb_api)
|
||||
self.addCleanup(secgroup.close)
|
||||
sg_id = secgroup.create()
|
||||
self.assertTrue(secgroup.exists())
|
||||
sg_obj = secgroups.SecurityGroup(id=sg_id)
|
||||
|
@ -136,8 +137,8 @@ class TestObjectVersion(test_base.DFTestBase):
|
|||
self.assertFalse(secgroup.exists())
|
||||
|
||||
def test_qospolicy_version(self):
|
||||
qospolicy = self.store(objects.QosPolicyTestObj(self.neutron,
|
||||
self.nb_api))
|
||||
qospolicy = objects.QosPolicyTestObj(self.neutron, self.nb_api)
|
||||
self.addCleanup(qospolicy.close)
|
||||
policy_id = qospolicy.create()
|
||||
self.assertTrue(qospolicy.exists())
|
||||
version = self.nb_api.get(qos.QosPolicy(id=policy_id)).version
|
||||
|
@ -156,16 +157,14 @@ class TestObjectVersion(test_base.DFTestBase):
|
|||
external_net = objects.find_first_network(self.neutron,
|
||||
{'router:external': True})
|
||||
if not external_net:
|
||||
network = self.store(
|
||||
objects.NetworkTestObj(self.neutron, self.nb_api))
|
||||
network = objects.NetworkTestObj(self.neutron, self.nb_api)
|
||||
self.addCleanup(network.close)
|
||||
external_net_para = {'name': 'public', 'router:external': True}
|
||||
external_network_id = network.create(network=external_net_para)
|
||||
self.assertTrue(network.exists())
|
||||
ext_subnet = self.store(objects.SubnetTestObj(
|
||||
self.neutron,
|
||||
self.nb_api,
|
||||
external_network_id,
|
||||
))
|
||||
ext_subnet = objects.SubnetTestObj(self.neutron, self.nb_api,
|
||||
external_network_id)
|
||||
self.addCleanup(ext_subnet.close)
|
||||
external_subnet_para = {'cidr': '192.168.199.0/24',
|
||||
'ip_version': 4,
|
||||
'network_id': external_network_id}
|
||||
|
@ -187,22 +186,20 @@ class TestObjectVersion(test_base.DFTestBase):
|
|||
@lockutils.synchronized('need-external-net')
|
||||
def test_floatingip_version(self):
|
||||
with self._prepare_ext_net() as external_network_id:
|
||||
private_network = self.store(
|
||||
objects.NetworkTestObj(self.neutron, self.nb_api))
|
||||
private_network = objects.NetworkTestObj(self.neutron, self.nb_api)
|
||||
self.addCleanup(private_network.close)
|
||||
private_network_id = private_network.create()
|
||||
self.assertTrue(private_network.exists())
|
||||
priv_subnet = self.store(objects.SubnetTestObj(
|
||||
self.neutron,
|
||||
self.nb_api,
|
||||
private_network_id,
|
||||
))
|
||||
router = self.store(
|
||||
objects.RouterTestObj(self.neutron, self.nb_api))
|
||||
port = self.store(
|
||||
objects.PortTestObj(self.neutron,
|
||||
self.nb_api, private_network_id))
|
||||
fip = self.store(
|
||||
objects.FloatingipTestObj(self.neutron, self.nb_api))
|
||||
priv_subnet = objects.SubnetTestObj(self.neutron, self.nb_api,
|
||||
private_network_id)
|
||||
self.addCleanup(priv_subnet.close)
|
||||
router = objects.RouterTestObj(self.neutron, self.nb_api)
|
||||
self.addCleanup(router.close)
|
||||
port = objects.PortTestObj(self.neutron,
|
||||
self.nb_api, private_network_id)
|
||||
self.addCleanup(port.close)
|
||||
fip = objects.FloatingipTestObj(self.neutron, self.nb_api)
|
||||
self.addCleanup(fip.close)
|
||||
|
||||
router_para = {
|
||||
'name': 'myrouter1', 'admin_state_up': True,
|
||||
|
|
|
@ -94,9 +94,10 @@ class TestOvsdbMonitor(test_base.DFTestBase):
|
|||
|
||||
def test_notify_message(self):
|
||||
network = objects.NetworkTestObj(self.neutron, self.nb_api)
|
||||
self.addCleanup(network.close)
|
||||
network_id = network.create()
|
||||
subnet = self.store(objects.SubnetTestObj(self.neutron, self.nb_api,
|
||||
network_id))
|
||||
subnet = objects.SubnetTestObj(self.neutron, self.nb_api, network_id)
|
||||
self.addCleanup(subnet.close)
|
||||
subnet_body = {'network_id': network_id,
|
||||
'cidr': '10.10.0.0/24',
|
||||
'gateway_ip': '10.10.0.1',
|
||||
|
@ -107,7 +108,8 @@ class TestOvsdbMonitor(test_base.DFTestBase):
|
|||
self.assertTrue(network.exists())
|
||||
self.assertTrue(subnet.exists())
|
||||
|
||||
vm = self.store(objects.VMTestObj(self, self.neutron))
|
||||
vm = objects.VMTestObj(self, self.neutron)
|
||||
self.addCleanup(vm.close)
|
||||
vm.create(network=network)
|
||||
self.assertIsNotNone(vm.server.addresses['mynetwork'])
|
||||
mac = vm.server.addresses['mynetwork'][0]['OS-EXT-IPS-MAC:mac_addr']
|
||||
|
@ -133,10 +135,11 @@ class TestOvsdbMonitor(test_base.DFTestBase):
|
|||
)
|
||||
|
||||
def test_reply_message(self):
|
||||
network = self.store(objects.NetworkTestObj(self.neutron, self.nb_api))
|
||||
network = objects.NetworkTestObj(self.neutron, self.nb_api)
|
||||
self.addCleanup(network.close)
|
||||
network_id = network.create()
|
||||
subnet = self.store(objects.SubnetTestObj(self.neutron, self.nb_api,
|
||||
network_id))
|
||||
subnet = objects.SubnetTestObj(self.neutron, self.nb_api, network_id)
|
||||
self.addCleanup(subnet.close)
|
||||
subnet_body = {'network_id': network_id,
|
||||
'cidr': '10.20.0.0/24',
|
||||
'gateway_ip': '10.20.0.1',
|
||||
|
@ -147,13 +150,15 @@ class TestOvsdbMonitor(test_base.DFTestBase):
|
|||
self.assertTrue(network.exists())
|
||||
self.assertTrue(subnet.exists())
|
||||
|
||||
vm1 = self.store(objects.VMTestObj(self, self.neutron))
|
||||
vm1 = objects.VMTestObj(self, self.neutron)
|
||||
self.addCleanup(vm1.close)
|
||||
vm1.create(network=network)
|
||||
self.assertIsNotNone(vm1.server.addresses['mynetwork'])
|
||||
mac1 = vm1.server.addresses['mynetwork'][0]['OS-EXT-IPS-MAC:mac_addr']
|
||||
self.assertIsNotNone(mac1)
|
||||
|
||||
vm2 = self.store(objects.VMTestObj(self, self.neutron))
|
||||
vm2 = objects.VMTestObj(self, self.neutron)
|
||||
self.addCleanup(vm2.close)
|
||||
vm2.create(network=network)
|
||||
self.assertIsNotNone(vm2.server.addresses['mynetwork'])
|
||||
mac2 = vm2.server.addresses['mynetwork'][0]['OS-EXT-IPS-MAC:mac_addr']
|
||||
|
|
|
@ -19,16 +19,19 @@ from dragonflow.tests.fullstack import test_objects as objects
|
|||
|
||||
class TestPortQos(test_base.DFTestBase):
|
||||
def test_port_with_qospolicy(self):
|
||||
network = self.store(objects.NetworkTestObj(self.neutron, self.nb_api))
|
||||
network = objects.NetworkTestObj(self.neutron, self.nb_api)
|
||||
self.addCleanup(network.close)
|
||||
network_id = network.create()
|
||||
self.assertTrue(network.exists())
|
||||
|
||||
subnet = self.store(objects.SubnetTestObj(self.neutron, self.nb_api,
|
||||
network_id=network_id))
|
||||
subnet = objects.SubnetTestObj(self.neutron, self.nb_api,
|
||||
network_id=network_id)
|
||||
self.addCleanup(subnet.close)
|
||||
subnet.create()
|
||||
self.assertTrue(subnet.exists())
|
||||
|
||||
vm = self.store(objects.VMTestObj(self, self.neutron))
|
||||
vm = objects.VMTestObj(self, self.neutron)
|
||||
self.addCleanup(vm.close)
|
||||
vm_id = vm.create(network=network)
|
||||
|
||||
vm_port_id = self.vswitch_api.get_port_id_by_vm_id(vm_id)
|
||||
|
@ -36,8 +39,8 @@ class TestPortQos(test_base.DFTestBase):
|
|||
port = objects.PortTestObj(self.neutron, self.nb_api, network_id,
|
||||
vm_port_id)
|
||||
|
||||
qospolicy = self.store(objects.QosPolicyTestObj(self.neutron,
|
||||
self.nb_api))
|
||||
qospolicy = objects.QosPolicyTestObj(self.neutron, self.nb_api)
|
||||
self.addCleanup(qospolicy.close)
|
||||
qos_policy_id = qospolicy.create()
|
||||
time.sleep(const.DEFAULT_CMD_TIMEOUT)
|
||||
self.assertTrue(qospolicy.exists())
|
||||
|
|
|
@ -211,18 +211,20 @@ class TestOVSFlowsForPortSecurity(test_base.DFTestBase):
|
|||
|
||||
def _test_anti_spoof_flows(self, subnet_info):
|
||||
|
||||
network = self.store(objects.NetworkTestObj(self.neutron, self.nb_api))
|
||||
network = objects.NetworkTestObj(self.neutron, self.nb_api)
|
||||
self.addCleanup(network.close)
|
||||
network_id = network.create()
|
||||
self.assertTrue(network.exists())
|
||||
|
||||
subnet_info['network_id'] = network_id
|
||||
subnet = self.store(objects.SubnetTestObj(self.neutron,
|
||||
self.nb_api,
|
||||
network_id=network_id))
|
||||
subnet = objects.SubnetTestObj(self.neutron, self.nb_api,
|
||||
network_id=network_id)
|
||||
self.addCleanup(subnet.close)
|
||||
subnet.create(subnet_info)
|
||||
self.assertTrue(subnet.exists())
|
||||
|
||||
vm = self.store(objects.VMTestObj(self, self.neutron))
|
||||
vm = objects.VMTestObj(self, self.neutron)
|
||||
self.addCleanup(vm.close)
|
||||
vm.create(network=network)
|
||||
|
||||
addresses = vm.server.addresses['mynetwork']
|
||||
|
|
|
@ -88,7 +88,8 @@ class TestPubSub(PubSubTestBase):
|
|||
events_num += 1
|
||||
subscriber = self._get_subscriber(_db_change_callback)
|
||||
self.addCleanup(subscriber.close)
|
||||
network = self.store(objects.NetworkTestObj(self.neutron, self.nb_api))
|
||||
network = objects.NetworkTestObj(self.neutron, self.nb_api)
|
||||
self.addCleanup(network.close)
|
||||
network_id = network.create()
|
||||
if cfg.CONF.df.enable_selective_topology_distribution:
|
||||
topic = network.get_topic()
|
||||
|
@ -97,11 +98,8 @@ class TestPubSub(PubSubTestBase):
|
|||
time.sleep(const.DEFAULT_RESOURCE_READY_TIMEOUT)
|
||||
self.assertNotEqual(local_event_num, events_num)
|
||||
local_event_num = events_num
|
||||
port = self.store(objects.PortTestObj(
|
||||
self.neutron,
|
||||
self.nb_api,
|
||||
network_id
|
||||
))
|
||||
port = objects.PortTestObj(self.neutron, self.nb_api, network_id)
|
||||
self.addCleanup(port.close)
|
||||
port.create()
|
||||
time.sleep(const.DEFAULT_RESOURCE_READY_TIMEOUT)
|
||||
|
||||
|
@ -132,7 +130,8 @@ class TestPubSub(PubSubTestBase):
|
|||
|
||||
subscriber = self._get_subscriber(_db_change_callback)
|
||||
self.addCleanup(subscriber.close)
|
||||
network = self.store(objects.NetworkTestObj(self.neutron, self.nb_api))
|
||||
network = objects.NetworkTestObj(self.neutron, self.nb_api)
|
||||
self.addCleanup(network.close)
|
||||
network_id = network.create()
|
||||
if cfg.CONF.df.enable_selective_topology_distribution:
|
||||
topic = network.get_topic()
|
||||
|
@ -140,11 +139,8 @@ class TestPubSub(PubSubTestBase):
|
|||
else:
|
||||
time.sleep(const.DEFAULT_RESOURCE_READY_TIMEOUT)
|
||||
self.assertNotEqual(local_event_num, ns.events_num)
|
||||
port = self.store(objects.PortTestObj(
|
||||
self.neutron,
|
||||
self.nb_api,
|
||||
network_id
|
||||
))
|
||||
port = objects.PortTestObj(self.neutron, self.nb_api, network_id)
|
||||
self.addCleanup(port.close)
|
||||
local_event_num = ns.events_num
|
||||
port_id = port.create()
|
||||
time.sleep(const.DEFAULT_RESOURCE_READY_TIMEOUT)
|
||||
|
|
|
@ -22,7 +22,8 @@ from dragonflow.tests.fullstack import test_objects as objects
|
|||
class TestRemotePort(test_base.DFTestBase):
|
||||
|
||||
def test_remote_port(self):
|
||||
network = self.store(objects.NetworkTestObj(self.neutron, self.nb_api))
|
||||
network = objects.NetworkTestObj(self.neutron, self.nb_api)
|
||||
self.addCleanup(network.close)
|
||||
network_id = network.create()
|
||||
self.assertTrue(network.exists())
|
||||
|
||||
|
@ -32,14 +33,14 @@ class TestRemotePort(test_base.DFTestBase):
|
|||
'ip_version': 4,
|
||||
'name': 'subnet1',
|
||||
'enable_dhcp': True}
|
||||
subnet = self.store(objects.SubnetTestObj(self.neutron,
|
||||
self.nb_api,
|
||||
network_id=network_id))
|
||||
subnet = objects.SubnetTestObj(self.neutron, self.nb_api,
|
||||
network_id=network_id)
|
||||
self.addCleanup(subnet.close)
|
||||
subnet.create(subnet_info)
|
||||
self.assertTrue(subnet.exists())
|
||||
|
||||
port = self.store(objects.PortTestObj(
|
||||
self.neutron, self.nb_api, network_id))
|
||||
port = objects.PortTestObj(self.neutron, self.nb_api, network_id)
|
||||
self.addCleanup(port.close)
|
||||
port_body = {
|
||||
'admin_state_up': True,
|
||||
'name': 'port1',
|
||||
|
|
|
@ -54,13 +54,12 @@ class ArpResponderTest(test_base.DFTestBase):
|
|||
"""
|
||||
Add a VM. Verify it's ARP flow is there.
|
||||
"""
|
||||
network = self.store(objects.NetworkTestObj(self.neutron, self.nb_api))
|
||||
network = objects.NetworkTestObj(self.neutron, self.nb_api)
|
||||
self.addCleanup(network.close)
|
||||
network_id = network.create(network={'name': 'arp_responder_test'})
|
||||
subnet_obj = self.store(objects.SubnetTestObj(
|
||||
self.neutron,
|
||||
self.nb_api,
|
||||
network_id,
|
||||
))
|
||||
subnet_obj = objects.SubnetTestObj(self.neutron, self.nb_api,
|
||||
network_id)
|
||||
self.addCleanup(subnet_obj.close)
|
||||
|
||||
subnet = {'network_id': network_id,
|
||||
'cidr': '10.10.10.0/24',
|
||||
|
@ -71,7 +70,8 @@ class ArpResponderTest(test_base.DFTestBase):
|
|||
subnet = subnet_obj.create(subnet)
|
||||
|
||||
flows_before = self._get_arp_table_flows()
|
||||
vm = self.store(objects.VMTestObj(self, self.neutron))
|
||||
vm = objects.VMTestObj(self, self.neutron)
|
||||
self.addCleanup(vm.close)
|
||||
vm.create(network=network)
|
||||
ip = vm.get_first_ipv4()
|
||||
self.assertIsNotNone(ip)
|
||||
|
@ -134,13 +134,12 @@ class ICMPResponderTest(test_base.DFTestBase):
|
|||
"""
|
||||
Add a VM. Verify the icmp flow is there.
|
||||
"""
|
||||
network = self.store(objects.NetworkTestObj(self.neutron, self.nb_api))
|
||||
network = objects.NetworkTestObj(self.neutron, self.nb_api)
|
||||
self.addCleanup(network.close)
|
||||
network_id = network.create(network={'name': 'icmp_responder_test'})
|
||||
subnet_obj = self.store(objects.SubnetTestObj(
|
||||
self.neutron,
|
||||
self.nb_api,
|
||||
network_id,
|
||||
))
|
||||
subnet_obj = objects.SubnetTestObj(self.neutron, self.nb_api,
|
||||
network_id)
|
||||
self.addCleanup(subnet_obj.close)
|
||||
|
||||
subnet = {'network_id': network_id,
|
||||
'cidr': '10.10.10.0/24',
|
||||
|
@ -152,7 +151,8 @@ class ICMPResponderTest(test_base.DFTestBase):
|
|||
|
||||
flows_before = self._get_l3_lookup_table_flows()
|
||||
|
||||
router = self.store(objects.RouterTestObj(self.neutron, self.nb_api))
|
||||
router = objects.RouterTestObj(self.neutron, self.nb_api)
|
||||
self.addCleanup(router.close)
|
||||
router_id = router.create()
|
||||
subnet_msg = {'subnet_id': subnet_id}
|
||||
interface = self.neutron.add_interface_router(router_id,
|
||||
|
@ -165,7 +165,8 @@ class ICMPResponderTest(test_base.DFTestBase):
|
|||
router_ip = ip['ip_address']
|
||||
break
|
||||
|
||||
vm = self.store(objects.VMTestObj(self, self.neutron))
|
||||
vm = objects.VMTestObj(self, self.neutron)
|
||||
self.addCleanup(vm.close)
|
||||
vm.create(network=network)
|
||||
ip = vm.get_first_ipv4()
|
||||
self.assertIsNotNone(ip)
|
||||
|
|
|
@ -115,7 +115,8 @@ class SfcTestsCommonBase(test_base.DFTestBase):
|
|||
ingress = self._create_sf_port()
|
||||
egress = self._create_sf_port()
|
||||
|
||||
pp = self.store(objects.PortPairTestObj(self.neutron, self.nb_api))
|
||||
pp = objects.PortPairTestObj(self.neutron, self.nb_api)
|
||||
self.addCleanup(pp.close)
|
||||
pp.create_from_ports(
|
||||
ingress=ingress,
|
||||
egress=egress,
|
||||
|
@ -126,25 +127,24 @@ class SfcTestsCommonBase(test_base.DFTestBase):
|
|||
|
||||
def _create_ppg(self, width):
|
||||
pps = [self._create_pp() for _ in range(width)]
|
||||
ppg = self.store(
|
||||
objects.PortPairGroupTestObj(self.neutron, self.nb_api))
|
||||
ppg = objects.PortPairGroupTestObj(self.neutron, self.nb_api)
|
||||
self.addCleanup(ppg.close)
|
||||
ppg.create_from_portpairs(pps)
|
||||
return ppg
|
||||
|
||||
def _create_pc(self, fc, layout):
|
||||
ppgs = [self._create_ppg(w) for w in layout]
|
||||
pc = self.store(
|
||||
objects.PortChainTestObj(self.neutron, self.nb_api))
|
||||
pc = objects.PortChainTestObj(self.neutron, self.nb_api)
|
||||
self.addCleanup(pc.close)
|
||||
pc.create_from_fcs_ppgs([fc], ppgs)
|
||||
return pc
|
||||
|
||||
def setUp(self):
|
||||
super(SfcTestsCommonBase, self).setUp()
|
||||
|
||||
self.security_group = self.store(objects.SecGroupTestObj(
|
||||
self.neutron,
|
||||
self.nb_api))
|
||||
|
||||
self.security_group = objects.SecGroupTestObj(self.neutron,
|
||||
self.nb_api)
|
||||
self.addCleanup(self.security_group.close)
|
||||
security_group_id = self.security_group.create()
|
||||
self.assertTrue(self.security_group.exists())
|
||||
|
||||
|
@ -163,12 +163,8 @@ class SfcTestsCommonBase(test_base.DFTestBase):
|
|||
rule_id = self.security_group.rule_create(secrule=rule)
|
||||
self.assertTrue(self.security_group.rule_exists(rule_id))
|
||||
|
||||
self.topology = self.store(
|
||||
app_testing_objects.Topology(
|
||||
self.neutron,
|
||||
self.nb_api,
|
||||
),
|
||||
)
|
||||
self.topology = app_testing_objects.Topology(self.neutron, self.nb_api)
|
||||
self.addCleanup(self.topology.close)
|
||||
|
||||
self.subnet = self.topology.create_subnet(
|
||||
cidr=IPV4_CIDR,
|
||||
|
@ -520,9 +516,8 @@ class TestFcApp(SfcTestsCommonBase):
|
|||
return packet
|
||||
|
||||
def test_fc(self):
|
||||
fc = self.store(
|
||||
objects.FlowClassifierTestObj(self.neutron, self.nb_api),
|
||||
)
|
||||
fc = objects.FlowClassifierTestObj(self.neutron, self.nb_api)
|
||||
self.addCleanup(fc.close)
|
||||
fc.create(self._fc_params)
|
||||
pc = self._create_pc(fc, [1])
|
||||
time.sleep(_QUICK_RESOURCE_READY_TIMEOUT)
|
||||
|
@ -541,19 +536,18 @@ class TestFcApp(SfcTestsCommonBase):
|
|||
),
|
||||
}
|
||||
port_policies.update(self._create_port_policies(pc))
|
||||
policy = self.store(
|
||||
app_testing_objects.Policy(
|
||||
initial_actions=[
|
||||
app_testing_objects.SendAction(
|
||||
self.subnet.subnet_id,
|
||||
self.src_port.port_id,
|
||||
self._initial_packet,
|
||||
),
|
||||
],
|
||||
port_policies=port_policies,
|
||||
unknown_port_action=app_testing_objects.LogAction()
|
||||
),
|
||||
policy = app_testing_objects.Policy(
|
||||
initial_actions=[
|
||||
app_testing_objects.SendAction(
|
||||
self.subnet.subnet_id,
|
||||
self.src_port.port_id,
|
||||
self._initial_packet,
|
||||
),
|
||||
],
|
||||
port_policies=port_policies,
|
||||
unknown_port_action=app_testing_objects.LogAction()
|
||||
)
|
||||
self.addCleanup(policy.close)
|
||||
policy.start(self.topology)
|
||||
policy.wait(10)
|
||||
|
||||
|
@ -588,9 +582,8 @@ class TestSfcApp(SfcTestsCommonBase):
|
|||
self._gen_udp(src_port=SRC_PORT, dst_port=DST_PORT) /
|
||||
('{len}'.format(len=len(self.layout)) * 64)
|
||||
)
|
||||
fc = self.store(
|
||||
objects.FlowClassifierTestObj(self.neutron, self.nb_api),
|
||||
)
|
||||
fc = objects.FlowClassifierTestObj(self.neutron, self.nb_api)
|
||||
self.addCleanup(fc.close)
|
||||
fc.create(
|
||||
{
|
||||
'logical_source_port': self.src_port.port.port_id
|
||||
|
@ -611,19 +604,18 @@ class TestSfcApp(SfcTestsCommonBase):
|
|||
),
|
||||
}
|
||||
port_policies.update(self._create_port_policies(pc))
|
||||
policy = self.store(
|
||||
app_testing_objects.Policy(
|
||||
initial_actions=[
|
||||
app_testing_objects.SendAction(
|
||||
self.subnet.subnet_id,
|
||||
self.src_port.port_id,
|
||||
initial_packet,
|
||||
),
|
||||
],
|
||||
port_policies=port_policies,
|
||||
unknown_port_action=app_testing_objects.LogAction()
|
||||
),
|
||||
policy = app_testing_objects.Policy(
|
||||
initial_actions=[
|
||||
app_testing_objects.SendAction(
|
||||
self.subnet.subnet_id,
|
||||
self.src_port.port_id,
|
||||
initial_packet,
|
||||
),
|
||||
],
|
||||
port_policies=port_policies,
|
||||
unknown_port_action=app_testing_objects.LogAction()
|
||||
)
|
||||
self.addCleanup(policy.close)
|
||||
policy.start(self.topology)
|
||||
policy.wait(10)
|
||||
|
||||
|
|
|
@ -47,7 +47,8 @@ class TestSnatFlows(test_base.DFTestBase):
|
|||
if not self._check_if_app_enabled():
|
||||
return
|
||||
|
||||
network = self.store(objects.NetworkTestObj(self.neutron, self.nb_api))
|
||||
network = objects.NetworkTestObj(self.neutron, self.nb_api)
|
||||
self.addCleanup(network.close)
|
||||
network_id = network.create()
|
||||
subnet = {'network_id': network_id,
|
||||
'cidr': '10.200.0.0/24',
|
||||
|
@ -67,7 +68,8 @@ class TestSnatFlows(test_base.DFTestBase):
|
|||
|
||||
# Create VM
|
||||
ovs = utils.OvsFlowsParser()
|
||||
vm = self.store(objects.VMTestObj(self, self.neutron))
|
||||
vm = objects.VMTestObj(self, self.neutron)
|
||||
self.addCleanup(vm.close)
|
||||
vm.create(network=network)
|
||||
ip = vm.get_first_ipv4()
|
||||
self.assertIsNotNone(ip)
|
||||
|
|
|
@ -41,20 +41,19 @@ class TestTopology(test_base.DFTestBase):
|
|||
self._remove_vm(vm2)
|
||||
|
||||
def _create_network(self):
|
||||
network = self.store(objects.NetworkTestObj(self.neutron, self.nb_api))
|
||||
network = objects.NetworkTestObj(self.neutron, self.nb_api)
|
||||
self.addCleanup(network.close)
|
||||
network_id = network.create()
|
||||
self.assertTrue(network.exists())
|
||||
subnet = self.store(objects.SubnetTestObj(
|
||||
self.neutron,
|
||||
self.nb_api,
|
||||
network_id,
|
||||
))
|
||||
subnet = objects.SubnetTestObj(self.neutron, self.nb_api, network_id)
|
||||
self.addCleanup(subnet.close)
|
||||
subnet.create()
|
||||
self.assertTrue(subnet.exists())
|
||||
return network
|
||||
|
||||
def _create_vm(self, network):
|
||||
vm = self.store(objects.VMTestObj(self, self.neutron))
|
||||
vm = objects.VMTestObj(self, self.neutron)
|
||||
self.addCleanup(vm.close)
|
||||
vm.create(network=network)
|
||||
vm_mac = vm.get_first_mac()
|
||||
self.assertTrue(vm_mac is not None)
|
||||
|
|
Loading…
Reference in New Issue