Change all tests to use the standard addCleanup

Currently a lot of tests are using a custom self.store() method to
perform automatic cleanup on tearDown.
We should change to the standard addCleanup method that is being called
on every object (also if setUp fails and tearDown is not called).
See [1] for details.

[1] https://docs.python.org/2/library/unittest.html#unittest.TestCase.addCleanup

Change-Id: Ic62dd9a5e07435d1179793ec2c5cdc6e4eddef60
This commit is contained in:
Shachar Snapiri 2018-07-23 11:45:47 +03:00
parent 7d9ebea601
commit 53db6e21df
25 changed files with 701 additions and 809 deletions

View File

@ -63,41 +63,35 @@ class TestAllowedAddressPairsDetectActive(test_base.DFTestBase):
self.policy = None
self.allowed_address_pair_ip_address = None
self.allowed_address_pair_mac_address = None
try:
self.topology = self.store(app_testing_objects.Topology(
self.neutron,
self.nb_api))
subnet = self.topology.create_subnet(cidr='192.168.98.0/24')
self.subnet = subnet
port = subnet.create_port()
self.port = port
self.topology = app_testing_objects.Topology(self.neutron,
self.nb_api)
self.addCleanup(self.topology.close)
subnet = self.topology.create_subnet(cidr='192.168.98.0/24')
self.subnet = subnet
port = subnet.create_port()
self.port = port
time.sleep(const.DEFAULT_RESOURCE_READY_TIMEOUT)
time.sleep(const.DEFAULT_RESOURCE_READY_TIMEOUT)
port.update({'allowed_address_pairs': [{
'ip_address': '192.168.98.100'}]})
port.update({'allowed_address_pairs': [{
'ip_address': '192.168.98.100'}]})
time.sleep(const.DEFAULT_RESOURCE_READY_TIMEOUT)
time.sleep(const.DEFAULT_RESOURCE_READY_TIMEOUT)
port_lport = port.port.get_logical_port()
self.assertIsNotNone(port_lport)
port_lport = port.port.get_logical_port()
self.assertIsNotNone(port_lport)
self.allowed_address_pair_mac_address, \
self.allowed_address_pair_ip_address = \
apps.get_port_mac_and_ip(port, True)
self.allowed_address_pair_mac_address, \
self.allowed_address_pair_ip_address = \
apps.get_port_mac_and_ip(port, True)
# Create policy to reply arp request sent from controller
self.policy = self.store(
app_testing_objects.Policy(
initial_actions=[],
port_policies=self._create_policy_to_reply_arp_request(),
unknown_port_action=app_testing_objects.IgnoreAction()
)
)
except Exception:
if self.topology:
self.topology.close()
raise
# Create policy to reply arp request sent from controller
self.policy = app_testing_objects.Policy(
initial_actions=[],
port_policies=self._create_policy_to_reply_arp_request(),
unknown_port_action=app_testing_objects.IgnoreAction()
)
self.addCleanup(self.policy.close)
def _create_arp_response(self, buf):
pkt = ryu.lib.packet.packet.Packet(buf)

View File

@ -30,22 +30,13 @@ LOG = log.getLogger(__name__)
class TestDHCPApp(test_base.DFTestBase):
def _create_topology(self, enable_dhcp=True, cidr='192.168.11.0/24'):
try:
self.topology = self.store(
app_testing_objects.Topology(
self.neutron,
self.nb_api
)
)
self.subnet1 = self.topology.create_subnet(
cidr=cidr, enable_dhcp=enable_dhcp)
self.port1 = self.subnet1.create_port()
self.port2 = self.subnet1.create_port()
time.sleep(const.DEFAULT_RESOURCE_READY_TIMEOUT)
except Exception:
if self.topology:
self.topology.close()
raise
self.topology = app_testing_objects.Topology(self.neutron, self.nb_api)
self.addCleanup(self.topology.close)
self.subnet1 = self.topology.create_subnet(
cidr=cidr, enable_dhcp=enable_dhcp)
self.port1 = self.subnet1.create_port()
self.port2 = self.subnet1.create_port()
time.sleep(const.DEFAULT_RESOURCE_READY_TIMEOUT)
def _create_udp_packet_for_dhcp(self,
dst_mac=constants.BROADCAST_MAC,
@ -275,16 +266,15 @@ class TestDHCPApp(test_base.DFTestBase):
)
port_policies = self._create_port_policies(disable_rule=False)
policy = self.store(
app_testing_objects.Policy(
initial_actions=[send_dhcp_offer,
send_dhcp_offer,
send_dhcp_offer,
send_dhcp_offer],
port_policies=port_policies,
unknown_port_action=app_testing_objects.IgnoreAction()
)
policy = app_testing_objects.Policy(
initial_actions=[send_dhcp_offer,
send_dhcp_offer,
send_dhcp_offer,
send_dhcp_offer],
port_policies=port_policies,
unknown_port_action=app_testing_objects.IgnoreAction()
)
self.addCleanup(policy.close)
policy.start(self.topology)
test_utils.wait_until_true(internal_predicate,
@ -322,13 +312,12 @@ class TestDHCPApp(test_base.DFTestBase):
rules=rules,
default_action=raise_action
)
policy = self.store(
app_testing_objects.Policy(
initial_actions=[send_dhcp_offer],
port_policies={key: port_policy},
unknown_port_action=app_testing_objects.IgnoreAction()
)
policy = app_testing_objects.Policy(
initial_actions=[send_dhcp_offer],
port_policies={key: port_policy},
unknown_port_action=app_testing_objects.IgnoreAction()
)
self.addCleanup(policy.close)
policy.start(self.topology)
# Since there is no dhcp response, we are expecting timeout
# exception here.
@ -349,13 +338,12 @@ class TestDHCPApp(test_base.DFTestBase):
dhcp_packet,
)
port_policies = self._create_port_policies()
policy = self.store(
app_testing_objects.Policy(
initial_actions=[send_dhcp_offer],
port_policies=port_policies,
unknown_port_action=app_testing_objects.IgnoreAction()
)
policy = app_testing_objects.Policy(
initial_actions=[send_dhcp_offer],
port_policies=port_policies,
unknown_port_action=app_testing_objects.IgnoreAction()
)
self.addCleanup(policy.close)
apps.start_policy(policy, self.topology,
const.DEFAULT_RESOURCE_READY_TIMEOUT)

View File

@ -28,29 +28,21 @@ class TestDNATApp(test_base.DFTestBase):
super(TestDNATApp, self).setUp()
self.topology = None
try:
self.topology = self.store(
app_testing_objects.Topology(
self.neutron,
self.nb_api
)
)
self.subnet = self.topology.create_subnet()
self.port = self.subnet.create_port()
self.router = self.topology.create_router([
self.subnet.subnet_id
])
ext_net_id = self.topology.create_external_network([
self.router.router_id
])
self.fip = self.store(
objects.FloatingipTestObj(self.neutron, self.nb_api))
self.fip.create({'floating_network_id': ext_net_id,
'port_id': self.port.port.port_id})
except Exception:
if self.topology:
self.topology.close()
raise
self.topology = app_testing_objects.Topology(self.neutron,
self.nb_api)
self.addCleanup(self.topology.close)
self.subnet = self.topology.create_subnet()
self.port = self.subnet.create_port()
self.router = self.topology.create_router([
self.subnet.subnet_id
])
ext_net_id = self.topology.create_external_network([
self.router.router_id
])
self.fip = objects.FloatingipTestObj(self.neutron, self.nb_api)
self.addCleanup(self.fip.close)
self.fip.create({'floating_network_id': ext_net_id,
'port_id': self.port.port.port_id})
def _create_icmp_test_port_policies(self, icmp_filter):
ignore_action = app_testing_objects.IgnoreAction()
@ -129,20 +121,19 @@ class TestDNATApp(test_base.DFTestBase):
self.topology.external_network.get_gw_ip(),
ryu.lib.packet.ipv4.inet.IPPROTO_ICMP,
ttl=1)
policy = self.store(
app_testing_objects.Policy(
initial_actions=[
app_testing_objects.SendAction(
self.subnet.subnet_id,
self.port.port_id,
initial_packet,
),
],
port_policies=self._create_icmp_test_port_policies(
app_testing_objects.RyuICMPTimeExceedFilter),
unknown_port_action=ignore_action
)
policy = app_testing_objects.Policy(
initial_actions=[
app_testing_objects.SendAction(
self.subnet.subnet_id,
self.port.port_id,
initial_packet,
),
],
port_policies=self._create_icmp_test_port_policies(
app_testing_objects.RyuICMPTimeExceedFilter),
unknown_port_action=ignore_action
)
self.addCleanup(policy.close)
apps.start_policy(policy, self.topology,
const.DEFAULT_RESOURCE_READY_TIMEOUT)
@ -188,20 +179,19 @@ class TestDNATApp(test_base.DFTestBase):
self.port.port_id,
initial_packet)
ignore_action = app_testing_objects.IgnoreAction()
policy = self.store(
app_testing_objects.Policy(
initial_actions=[
send_action,
send_action,
send_action,
send_action,
],
port_policies=self._create_rate_limit_port_policies(
cfg.CONF.df_dnat_app.dnat_ttl_invalid_max_rate,
app_testing_objects.RyuICMPTimeExceedFilter),
unknown_port_action=ignore_action
)
policy = app_testing_objects.Policy(
initial_actions=[
send_action,
send_action,
send_action,
send_action,
],
port_policies=self._create_rate_limit_port_policies(
cfg.CONF.df_dnat_app.dnat_ttl_invalid_max_rate,
app_testing_objects.RyuICMPTimeExceedFilter),
unknown_port_action=ignore_action
)
self.addCleanup(policy.close)
policy.start(self.topology)
# Since the rate limit, we expect timeout to wait for 4th packet hit
# the policy.
@ -219,20 +209,19 @@ class TestDNATApp(test_base.DFTestBase):
initial_packet = self._create_packet(
self.topology.external_network.get_gw_ip(),
ryu.lib.packet.ipv4.inet.IPPROTO_UDP)
policy = self.store(
app_testing_objects.Policy(
initial_actions=[
app_testing_objects.SendAction(
self.subnet.subnet_id,
self.port.port_id,
initial_packet,
),
],
port_policies=self._create_icmp_test_port_policies(
app_testing_objects.RyuICMPUnreachFilter),
unknown_port_action=ignore_action
)
policy = app_testing_objects.Policy(
initial_actions=[
app_testing_objects.SendAction(
self.subnet.subnet_id,
self.port.port_id,
initial_packet,
),
],
port_policies=self._create_icmp_test_port_policies(
app_testing_objects.RyuICMPUnreachFilter),
unknown_port_action=ignore_action
)
self.addCleanup(policy.close)
policy.start(self.topology)
policy.wait(const.DEFAULT_RESOURCE_READY_TIMEOUT)
if len(policy.exceptions) > 0:
@ -248,20 +237,19 @@ class TestDNATApp(test_base.DFTestBase):
self.port.port_id,
initial_packet)
ignore_action = app_testing_objects.IgnoreAction()
policy = self.store(
app_testing_objects.Policy(
initial_actions=[
send_action,
send_action,
send_action,
send_action,
],
port_policies=self._create_rate_limit_port_policies(
cfg.CONF.df_dnat_app.dnat_icmp_error_max_rate,
app_testing_objects.RyuICMPUnreachFilter),
unknown_port_action=ignore_action
)
policy = app_testing_objects.Policy(
initial_actions=[
send_action,
send_action,
send_action,
send_action,
],
port_policies=self._create_rate_limit_port_policies(
cfg.CONF.df_dnat_app.dnat_icmp_error_max_rate,
app_testing_objects.RyuICMPUnreachFilter),
unknown_port_action=ignore_action
)
self.addCleanup(policy.close)
policy.start(self.topology)
# Since the rate limit, we expect timeout to wait for 4th packet hit
# the policy.

View File

@ -34,62 +34,57 @@ class TestArpResponder(test_base.DFTestBase):
super(TestArpResponder, self).setUp()
self.topology = None
self.policy = None
try:
self.topology = app_testing_objects.Topology(
self.neutron,
self.nb_api)
subnet1 = self.topology.create_subnet(cidr='192.168.10.0/24')
port1 = subnet1.create_port()
port2 = subnet1.create_port()
time.sleep(const.DEFAULT_RESOURCE_READY_TIMEOUT)
# Create policy
arp_packet = self._create_arp_request(
src_port=port1.port.get_logical_port(),
dst_port=port2.port.get_logical_port(),
)
send_arp_request = app_testing_objects.SendAction(
subnet1.subnet_id,
port1.port_id,
arp_packet,
)
ignore_action = app_testing_objects.IgnoreAction()
log_action = app_testing_objects.LogAction()
key1 = (subnet1.subnet_id, port1.port_id)
port_policies = {
key1: app_testing_objects.PortPolicy(
rules=[
app_testing_objects.PortPolicyRule(
# Detect arp replies
app_testing_objects.RyuARPReplyFilter(),
actions=[
log_action,
app_testing_objects.StopSimulationAction()
]
),
app_testing_objects.PortPolicyRule(
# Ignore IPv6 packets
app_testing_objects.RyuIPv6Filter(),
actions=[
ignore_action
]
),
],
default_action=app_testing_objects.RaiseAction(
"Unexpected packet"
)
),
}
self.policy = app_testing_objects.Policy(
initial_actions=[send_arp_request],
port_policies=port_policies,
unknown_port_action=ignore_action
)
except Exception:
if self.topology:
self.topology.close()
raise
self.store(self.topology)
self.store(self.policy)
self.topology = app_testing_objects.Topology(
self.neutron,
self.nb_api)
self.addCleanup(self.topology.close)
subnet1 = self.topology.create_subnet(cidr='192.168.10.0/24')
port1 = subnet1.create_port()
port2 = subnet1.create_port()
time.sleep(const.DEFAULT_RESOURCE_READY_TIMEOUT)
# Create policy
arp_packet = self._create_arp_request(
src_port=port1.port.get_logical_port(),
dst_port=port2.port.get_logical_port(),
)
send_arp_request = app_testing_objects.SendAction(
subnet1.subnet_id,
port1.port_id,
arp_packet,
)
ignore_action = app_testing_objects.IgnoreAction()
log_action = app_testing_objects.LogAction()
key1 = (subnet1.subnet_id, port1.port_id)
port_policies = {
key1: app_testing_objects.PortPolicy(
rules=[
app_testing_objects.PortPolicyRule(
# Detect arp replies
app_testing_objects.RyuARPReplyFilter(),
actions=[
log_action,
app_testing_objects.StopSimulationAction()
]
),
app_testing_objects.PortPolicyRule(
# Ignore IPv6 packets
app_testing_objects.RyuIPv6Filter(),
actions=[
ignore_action
]
),
],
default_action=app_testing_objects.RaiseAction(
"Unexpected packet"
)
),
}
self.policy = app_testing_objects.Policy(
initial_actions=[send_arp_request],
port_policies=port_policies,
unknown_port_action=ignore_action
)
self.addCleanup(self.policy.close)
def _create_arp_request(self, src_port, dst_port):
ethernet = ryu.lib.packet.ethernet.ethernet(
@ -133,73 +128,68 @@ class TestNeighborAdvertiser(test_base.DFTestBase):
super(TestNeighborAdvertiser, self).setUp()
self.topology = None
self.policy = None
try:
# Disable Duplicate Address Detection requests from the interface
self.dad_conf = utils.execute(['sysctl', '-n',
'net.ipv6.conf.default.accept_dad'])
utils.execute(['sysctl', '-w',
'net.ipv6.conf.default.accept_dad=0'],
run_as_root=True)
# Disable Router Solicitation requests from the interface
self.router_solicit_conf = utils.execute(
['sysctl', '-n', 'net.ipv6.conf.default.router_solicitations'])
utils.execute(['sysctl', '-w',
'net.ipv6.conf.default.router_solicitations=0'],
run_as_root=True)
self.topology = app_testing_objects.Topology(
self.neutron,
self.nb_api)
subnet1 = self.topology.create_subnet(cidr='1111:1111:1111::/64')
port1 = subnet1.create_port()
port2 = subnet1.create_port()
time.sleep(const.DEFAULT_RESOURCE_READY_TIMEOUT)
# Create Neighbor Solicitation packet
ns_packet = self._create_ns_request(
src_port=port1.port.get_logical_port(),
dst_port=port2.port.get_logical_port(),
)
send_ns_request = app_testing_objects.SendAction(
subnet1.subnet_id,
port1.port_id,
ns_packet,
)
ignore_action = app_testing_objects.IgnoreAction()
log_action = app_testing_objects.LogAction()
key1 = (subnet1.subnet_id, port1.port_id)
adv_filter = app_testing_objects.RyuNeighborAdvertisementFilter()
port_policies = {
key1: app_testing_objects.PortPolicy(
rules=[
app_testing_objects.PortPolicyRule(
# Detect advertisements
adv_filter,
actions=[
log_action,
app_testing_objects.StopSimulationAction()
]
),
app_testing_objects.PortPolicyRule(
# Filter local VM's Multicast requests
app_testing_objects.RyuIpv6MulticastFilter(),
actions=[ignore_action]
)
],
default_action=app_testing_objects.RaiseAction(
"Unexpected packet"
# Disable Duplicate Address Detection requests from the interface
self.dad_conf = utils.execute(['sysctl', '-n',
'net.ipv6.conf.default.accept_dad'])
utils.execute(['sysctl', '-w',
'net.ipv6.conf.default.accept_dad=0'],
run_as_root=True)
# Disable Router Solicitation requests from the interface
self.router_solicit_conf = utils.execute(
['sysctl', '-n', 'net.ipv6.conf.default.router_solicitations'])
utils.execute(['sysctl', '-w',
'net.ipv6.conf.default.router_solicitations=0'],
run_as_root=True)
self.topology = app_testing_objects.Topology(
self.neutron,
self.nb_api)
self.addCleanup(self.topology.close)
subnet1 = self.topology.create_subnet(cidr='1111:1111:1111::/64')
port1 = subnet1.create_port()
port2 = subnet1.create_port()
time.sleep(const.DEFAULT_RESOURCE_READY_TIMEOUT)
# Create Neighbor Solicitation packet
ns_packet = self._create_ns_request(
src_port=port1.port.get_logical_port(),
dst_port=port2.port.get_logical_port(),
)
send_ns_request = app_testing_objects.SendAction(
subnet1.subnet_id,
port1.port_id,
ns_packet,
)
ignore_action = app_testing_objects.IgnoreAction()
log_action = app_testing_objects.LogAction()
key1 = (subnet1.subnet_id, port1.port_id)
adv_filter = app_testing_objects.RyuNeighborAdvertisementFilter()
port_policies = {
key1: app_testing_objects.PortPolicy(
rules=[
app_testing_objects.PortPolicyRule(
# Detect advertisements
adv_filter,
actions=[
log_action,
app_testing_objects.StopSimulationAction()
]
),
app_testing_objects.PortPolicyRule(
# Filter local VM's Multicast requests
app_testing_objects.RyuIpv6MulticastFilter(),
actions=[ignore_action]
)
),
}
self.policy = app_testing_objects.Policy(
initial_actions=[send_ns_request],
port_policies=port_policies,
unknown_port_action=ignore_action
)
except Exception:
if self.topology:
self.topology.close()
raise
self.store(self.topology)
self.store(self.policy)
],
default_action=app_testing_objects.RaiseAction(
"Unexpected packet"
)
),
}
self.policy = app_testing_objects.Policy(
initial_actions=[send_ns_request],
port_policies=port_policies,
unknown_port_action=ignore_action
)
self.addCleanup(self.policy.close)
def _create_ns_request(self, src_port, dst_port):
ethernet = ryu.lib.packet.ethernet.ethernet(

View File

@ -37,26 +37,18 @@ class TestL3App(test_base.DFTestBase):
self.topology = None
self.policy = None
self._ping = None
try:
self.topology = self.store(
app_testing_objects.Topology(
self.neutron,
self.nb_api
)
)
self.subnet1 = self.topology.create_subnet(cidr='192.168.12.0/24')
self.subnet2 = self.topology.create_subnet(cidr='192.168.13.0/24')
self.port1 = self.subnet1.create_port()
self.port2 = self.subnet2.create_port()
self.router = self.topology.create_router([
self.subnet1.subnet_id,
self.subnet2.subnet_id,
])
time.sleep(const.DEFAULT_RESOURCE_READY_TIMEOUT)
except Exception:
if self.topology:
self.topology.close()
raise
self.topology = app_testing_objects.Topology(self.neutron,
self.nb_api)
self.addCleanup(self.topology.close)
self.subnet1 = self.topology.create_subnet(cidr='192.168.12.0/24')
self.subnet2 = self.topology.create_subnet(cidr='192.168.13.0/24')
self.port1 = self.subnet1.create_port()
self.port2 = self.subnet2.create_port()
self.router = self.topology.create_router([
self.subnet1.subnet_id,
self.subnet2.subnet_id,
])
time.sleep(const.DEFAULT_RESOURCE_READY_TIMEOUT)
def _create_port_policies(self, connected=True):
ignore_action = app_testing_objects.IgnoreAction()
@ -236,19 +228,18 @@ class TestL3App(test_base.DFTestBase):
port_policies = self._create_port_policies()
initial_packet = self._create_packet(
dst_ip, ryu.lib.packet.ipv4.inet.IPPROTO_ICMP)
policy = self.store(
app_testing_objects.Policy(
initial_actions=[
app_testing_objects.SendAction(
self.subnet1.subnet_id,
self.port1.port_id,
initial_packet,
),
],
port_policies=port_policies,
unknown_port_action=app_testing_objects.IgnoreAction()
)
policy = app_testing_objects.Policy(
initial_actions=[
app_testing_objects.SendAction(
self.subnet1.subnet_id,
self.port1.port_id,
initial_packet,
),
],
port_policies=port_policies,
unknown_port_action=app_testing_objects.IgnoreAction()
)
self.addCleanup(policy.close)
apps.start_policy(policy, self.topology,
const.DEFAULT_RESOURCE_READY_TIMEOUT)
@ -298,19 +289,18 @@ class TestL3App(test_base.DFTestBase):
port_policies = self._create_port_policies(connected=False)
initial_packet = self._create_packet(
dst_ip, ryu.lib.packet.ipv4.inet.IPPROTO_ICMP)
policy = self.store(
app_testing_objects.Policy(
initial_actions=[
app_testing_objects.SendAction(
self.subnet1.subnet_id,
self.port1.port_id,
initial_packet,
),
],
port_policies=port_policies,
unknown_port_action=app_testing_objects.IgnoreAction()
)
policy = app_testing_objects.Policy(
initial_actions=[
app_testing_objects.SendAction(
self.subnet1.subnet_id,
self.port1.port_id,
initial_packet,
),
],
port_policies=port_policies,
unknown_port_action=app_testing_objects.IgnoreAction()
)
self.addCleanup(policy.close)
policy.start(self.topology)
# Since there is no OpenFlow in vswitch, we are expecting timeout
# exception here.
@ -405,18 +395,17 @@ class TestL3App(test_base.DFTestBase):
self.subnet1.subnet_id,
self.port1.port_id,
initial_packet)
policy = self.store(
app_testing_objects.Policy(
initial_actions=[
send_action,
send_action,
send_action,
send_action
],
port_policies=port_policy,
unknown_port_action=ignore_action
)
policy = app_testing_objects.Policy(
initial_actions=[
send_action,
send_action,
send_action,
send_action
],
port_policies=port_policy,
unknown_port_action=ignore_action
)
self.addCleanup(policy.close)
policy.start(self.topology)
# Since the rate limit, we expect timeout to wait for 4th packet hit
# the policy.
@ -437,19 +426,18 @@ class TestL3App(test_base.DFTestBase):
app_testing_objects.RyuICMPUnreachFilter)
initial_packet = self._create_packet(
"192.168.12.1", ryu.lib.packet.ipv4.inet.IPPROTO_UDP)
policy = self.store(
app_testing_objects.Policy(
initial_actions=[
app_testing_objects.SendAction(
self.subnet1.subnet_id,
self.port1.port_id,
initial_packet,
),
],
port_policies=port_policy,
unknown_port_action=ignore_action
)
policy = app_testing_objects.Policy(
initial_actions=[
app_testing_objects.SendAction(
self.subnet1.subnet_id,
self.port1.port_id,
initial_packet,
),
],
port_policies=port_policy,
unknown_port_action=ignore_action
)
self.addCleanup(policy.close)
apps.start_policy(policy, self.topology,
const.DEFAULT_RESOURCE_READY_TIMEOUT)
@ -491,18 +479,17 @@ class TestL3App(test_base.DFTestBase):
self.port1.port_id,
initial_packet)
policy = self.store(
app_testing_objects.Policy(
initial_actions=[
send_action,
send_action,
send_action,
send_action
],
port_policies=port_policy,
unknown_port_action=ignore_action
)
policy = app_testing_objects.Policy(
initial_actions=[
send_action,
send_action,
send_action,
send_action
],
port_policies=port_policy,
unknown_port_action=ignore_action
)
self.addCleanup(policy.close)
policy.start(self.topology)
# Since the rate limit, we expect timeout to wait for 4th packet hit
# the policy.
@ -561,14 +548,13 @@ class TestL3App(test_base.DFTestBase):
self.subnet1.subnet_id,
self.port1.port_id,
initial_packet)
policy = self.store(
app_testing_objects.Policy(
initial_actions=[
send_action
],
port_policies=port_policy,
unknown_port_action=ignore_action
)
policy = app_testing_objects.Policy(
initial_actions=[
send_action
],
port_policies=port_policy,
unknown_port_action=ignore_action
)
self.addCleanup(policy.close)
apps.start_policy(policy, self.topology,
const.DEFAULT_RESOURCE_READY_TIMEOUT)

View File

@ -39,34 +39,33 @@ class TestPortSecApp(test_base.DFTestBase):
time.sleep(const.DEFAULT_RESOURCE_READY_TIMEOUT)
port_policies = self._create_port_policies()
self.policy = self.store(
app_testing_objects.Policy(
initial_actions=[
app_testing_objects.SendAction(
self.subnet.subnet_id,
self.port1.port_id,
self._create_ping_using_fake_ip
),
app_testing_objects.SendAction(
self.subnet.subnet_id,
self.port1.port_id,
self._create_ping_using_fake_mac
),
app_testing_objects.SendAction(
self.subnet.subnet_id,
self.port1.port_id,
self._create_ping_using_vm_ip_mac
),
app_testing_objects.SendAction(
self.subnet.subnet_id,
self.port1.port_id,
self._create_ping_using_allowed_address_pair
),
],
port_policies=port_policies,
unknown_port_action=app_testing_objects.IgnoreAction()
)
self.policy = app_testing_objects.Policy(
initial_actions=[
app_testing_objects.SendAction(
self.subnet.subnet_id,
self.port1.port_id,
self._create_ping_using_fake_ip
),
app_testing_objects.SendAction(
self.subnet.subnet_id,
self.port1.port_id,
self._create_ping_using_fake_mac
),
app_testing_objects.SendAction(
self.subnet.subnet_id,
self.port1.port_id,
self._create_ping_using_vm_ip_mac
),
app_testing_objects.SendAction(
self.subnet.subnet_id,
self.port1.port_id,
self._create_ping_using_allowed_address_pair
),
],
port_policies=port_policies,
unknown_port_action=app_testing_objects.IgnoreAction()
)
self.addCleanup(self.policy.close)
except Exception:
if self.topology:
self.topology.close()
@ -74,9 +73,8 @@ class TestPortSecApp(test_base.DFTestBase):
def _init_topology(self):
network = netaddr.IPNetwork(self.cidr)
security_group = self.store(objects.SecGroupTestObj(
self.neutron,
self.nb_api))
security_group = objects.SecGroupTestObj(self.neutron, self.nb_api)
self.addCleanup(security_group.close)
security_group_id = security_group.create()
self.assertTrue(security_group.exists())
@ -94,12 +92,9 @@ class TestPortSecApp(test_base.DFTestBase):
ingress_rule_id = security_group.rule_create(
secrule=ingress_rule_info)
self.assertTrue(security_group.rule_exists(ingress_rule_id))
self.topology = self.store(
app_testing_objects.Topology(
self.neutron,
self.nb_api
)
)
self.topology = app_testing_objects.Topology(self.neutron,
self.nb_api)
self.addCleanup(self.topology.close)
self.subnet = self.topology.create_subnet(
cidr=self.cidr

View File

@ -32,43 +32,33 @@ class TestSGApp(test_base.DFTestBase):
super(TestSGApp, self).setUp()
self.topology = None
self.policy = None
try:
self.security_group = self.store(objects.SecGroupTestObj(
self.neutron,
self.nb_api))
security_group_id = self.security_group.create()
self.assertTrue(self.security_group.exists())
self.security_group = objects.SecGroupTestObj(self.neutron,
self.nb_api)
self.addCleanup(self.security_group.close)
security_group_id = self.security_group.create()
self.assertTrue(self.security_group.exists())
self.security_group2 = self.store(objects.SecGroupTestObj(
self.neutron,
self.nb_api))
security_group_id2 = self.security_group2.create()
self.assertTrue(self.security_group2.exists())
self.security_group2 = objects.SecGroupTestObj(self.neutron,
self.nb_api)
self.addCleanup(self.security_group2.close)
security_group_id2 = self.security_group2.create()
self.assertTrue(self.security_group2.exists())
self.security_group3 = self.store(objects.SecGroupTestObj(
self.neutron,
self.nb_api))
security_group_id3 = self.security_group3.create()
self.assertTrue(self.security_group3.exists())
self.security_group3 = objects.SecGroupTestObj(self.neutron,
self.nb_api)
self.addCleanup(self.security_group3.close)
security_group_id3 = self.security_group3.create()
self.assertTrue(self.security_group3.exists())
self.topology = self.store(
app_testing_objects.Topology(
self.neutron,
self.nb_api
)
)
self.topology = app_testing_objects.Topology(self.neutron, self.nb_api)
self.addCleanup(self.topology.close)
self.active_security_group_id = security_group_id
self.inactive_security_group_id = security_group_id2
self.allowed_address_pairs_security_group_id = security_group_id3
self.active_security_group_id = security_group_id
self.inactive_security_group_id = security_group_id2
self.allowed_address_pairs_security_group_id = security_group_id3
self.permit_icmp_request = self._get_icmp_request1
self.no_permit_icmp_request = self._get_icmp_request2
except Exception:
if self.topology:
self.topology.close()
raise
self.permit_icmp_request = self._get_icmp_request1
self.no_permit_icmp_request = self._get_icmp_request2
def _setup_subnet(self, cidr):
network = netaddr.IPNetwork(cidr)
@ -147,24 +137,19 @@ class TestSGApp(test_base.DFTestBase):
port_policies = self._create_port_policies()
self.policy = self.store(
app_testing_objects.Policy(
initial_actions=[
app_testing_objects.SendAction(
self.subnet.subnet_id,
self.port1.port_id,
packet1.data,
),
app_testing_objects.SendAction(
self.subnet.subnet_id,
self.port2.port_id,
packet2.data,
)
],
port_policies=port_policies,
unknown_port_action=app_testing_objects.IgnoreAction()
)
self.policy = app_testing_objects.Policy(
initial_actions=[
app_testing_objects.SendAction(self.subnet.subnet_id,
self.port1.port_id,
packet1.data),
app_testing_objects.SendAction(self.subnet.subnet_id,
self.port2.port_id,
packet2.data)
],
port_policies=port_policies,
unknown_port_action=app_testing_objects.IgnoreAction()
)
self.addCleanup(self.policy.close)
def _create_allowed_address_pairs_policy(self):
packet1, self.allowed_address_pairs_icmp_request = \
@ -172,19 +157,16 @@ class TestSGApp(test_base.DFTestBase):
port_policies = self._create_allowed_address_pairs_port_policies()
self.allowed_address_pairs_policy = self.store(
app_testing_objects.Policy(
initial_actions=[
app_testing_objects.SendAction(
self.subnet.subnet_id,
self.port4.port_id,
packet1.data,
)
],
port_policies=port_policies,
unknown_port_action=app_testing_objects.IgnoreAction()
)
self.allowed_address_pairs_policy = app_testing_objects.Policy(
initial_actions=[
app_testing_objects.SendAction(self.subnet.subnet_id,
self.port4.port_id,
packet1.data)
],
port_policies=port_policies,
unknown_port_action=app_testing_objects.IgnoreAction()
)
self.addCleanup(self.allowed_address_pairs_policy.close)
def _get_icmp_request1(self):
return self.icmp_request1

View File

@ -61,12 +61,8 @@ class TestSNat(test_base.DFTestBase):
const.DEFAULT_RESOURCE_READY_TIMEOUT)
def _create_topology(self):
self.topology = self.store(
app_testing_objects.Topology(
self.neutron,
self.nb_api
)
)
self.topology = app_testing_objects.Topology(self.neutron, self.nb_api)
self.addCleanup(self.topology.close)
self.subnet1 = self.topology.create_subnet(cidr='192.168.15.0/24')
self.port1 = self.subnet1.create_port()
self.router = self.topology.create_router([self.subnet1.subnet_id])
@ -77,19 +73,16 @@ class TestSNat(test_base.DFTestBase):
port_policies = self._create_port_policies()
initial_packet = self._create_packet(
'10.0.1.2', ryu.lib.packet.ipv4.inet.IPPROTO_ICMP)
policy = self.store(
app_testing_objects.Policy(
initial_actions=[
app_testing_objects.SendAction(
self.subnet1.subnet_id,
self.port1.port_id,
initial_packet
),
],
port_policies=port_policies,
unknown_port_action=app_testing_objects.IgnoreAction()
)
policy = app_testing_objects.Policy(
initial_actions=[
app_testing_objects.SendAction(self.subnet1.subnet_id,
self.port1.port_id,
initial_packet),
],
port_policies=port_policies,
unknown_port_action=app_testing_objects.IgnoreAction()
)
self.addCleanup(policy.close)
return policy
def _create_port_policies(self):

View File

@ -81,13 +81,12 @@ class TestOVSFlowsForActivePortDectionApp(test_base.DFTestBase):
if not self.check_app_loaded("active_port_detection"):
self.skipTest("ActivePortDetectionApp is not enabled")
network = self.store(objects.NetworkTestObj(self.neutron, self.nb_api))
network = objects.NetworkTestObj(self.neutron, self.nb_api)
self.addCleanup(network.close)
network_id = network.create(network={'name': 'aap_test'})
subnet_obj = self.store(objects.SubnetTestObj(
self.neutron,
self.nb_api,
network_id,
))
subnet_obj = objects.SubnetTestObj(self.neutron, self.nb_api,
network_id)
self.addCleanup(subnet_obj.close)
subnet = {'network_id': network_id,
'cidr': '192.168.97.0/24',
@ -97,7 +96,8 @@ class TestOVSFlowsForActivePortDectionApp(test_base.DFTestBase):
'enable_dhcp': True}
subnet_obj.create(subnet)
vm = self.store(objects.VMTestObj(self, self.neutron))
vm = objects.VMTestObj(self, self.neutron)
self.addCleanup(vm.close)
vm_id = vm.create(network=network)
vm_port_id = self.vswitch_api.get_port_id_by_vm_id(vm_id)

View File

@ -68,8 +68,6 @@ class DFTestBase(base.BaseTestCase):
self.nb_api.set_db_change_callback(self._db_change_callback)
self.mgt_ip = self.conf.management_ip
self.__objects_to_close = []
self.addCleanup(self._close_stored_objects)
self.vswitch_api = utils.OvsTestApi(self.mgt_ip)
self.vswitch_api.initialize(self._db_change_callback)
@ -123,11 +121,6 @@ class DFTestBase(base.BaseTestCase):
return True
return False
def _close_stored_objects(self):
while self.__objects_to_close:
close_func = self.__objects_to_close.pop()
close_func()
def get_default_subnetpool(self):
default_subnetpool = None
subnetpool_filter = {'is_default': True,
@ -147,14 +140,9 @@ class DFTestBase(base.BaseTestCase):
self.neutron.create_subnetpool(
body={'subnetpool': default_subnetpool})
def store(self, obj, close_func=None):
close_func = close_func if close_func else obj.close
self.__objects_to_close.append(close_func)
return obj
def start_subscribing(self):
self._topology = self.store(
test_objects.Topology(self.neutron, self.nb_api))
self._topology = test_objects.Topology(self.neutron, self.nb_api)
self.addCleanup(self._topology.close)
subnet = self._topology.create_subnet(cidr="192.168.200.0/24")
port = subnet.create_port()
utils.wait_until_true(

View File

@ -40,12 +40,11 @@ class TestDbConsistent(test_base.DFTestBase):
def test_db_consistent(self):
self.db_sync_time = self.conf.db_sync_time
network = self.store(objects.NetworkTestObj(self.neutron, self.nb_api))
network = objects.NetworkTestObj(self.neutron, self.nb_api)
network_id = network.create()
self.addCleanup(network.close)
topic = network.get_topic()
subnet = self.store(objects.SubnetTestObj(self.neutron, self.nb_api,
network_id))
subnet = objects.SubnetTestObj(self.neutron, self.nb_api, network_id)
subnet_body = {'network_id': network_id,
'cidr': '10.50.0.0/24',
'gateway_ip': '10.50.0.1',
@ -58,7 +57,7 @@ class TestDbConsistent(test_base.DFTestBase):
self.assertTrue(network.exists())
self.assertTrue(subnet.exists())
vm = self.store(objects.VMTestObj(self, self.neutron))
vm = objects.VMTestObj(self, self.neutron)
vm.create(network=network)
self.addCleanup(vm.close)
self.assertIsNotNone(vm.server.addresses['mynetwork'])

View File

@ -50,7 +50,8 @@ class TestOVSFlowsForDHCP(test_base.DFTestBase):
self.assertTrue(found_dhcp_cast_flow)
def _create_network(self):
network = self.store(objects.NetworkTestObj(self.neutron, self.nb_api))
network = objects.NetworkTestObj(self.neutron, self.nb_api)
self.addCleanup(network.close)
network_id = network.create()
self.assertTrue(network.exists())
lean_lswitch = l2.LogicalSwitch(id=network_id)
@ -139,7 +140,8 @@ class TestOVSFlowsForDHCP(test_base.DFTestBase):
def test_create_router_interface(self):
ovs = utils.OvsFlowsParser()
flows_before_change = ovs.dump(self.integration_bridge)
router = self.store(objects.RouterTestObj(self.neutron, self.nb_api))
router = objects.RouterTestObj(self.neutron, self.nb_api)
self.addCleanup(router.close)
network, network_id, network_key = self._create_network()
subnet = {'network_id': network_id,
'cidr': '10.30.0.0/24',

View File

@ -44,7 +44,8 @@ class TestL2FLows(test_base.DFTestBase):
if self._check_tunneling_app_enable() is False:
return
network = self.store(objects.NetworkTestObj(self.neutron, self.nb_api))
network = objects.NetworkTestObj(self.neutron, self.nb_api)
self.addCleanup(network.close)
network_id = network.create()
network_params = network.get_network()
segmentation_id = network_params['network']['provider:segmentation_id']
@ -58,7 +59,8 @@ class TestL2FLows(test_base.DFTestBase):
self.assertIsNotNone(subnet)
ovs = utils.OvsFlowsParser()
vm = self.store(objects.VMTestObj(self, self.neutron))
vm = objects.VMTestObj(self, self.neutron)
self.addCleanup(vm.close)
vm.create(network=network)
ip = vm.get_first_ipv4()
self.assertIsNotNone(ip)
@ -98,7 +100,8 @@ class TestL2FLows(test_base.DFTestBase):
return
# Create network
network = self.store(objects.NetworkTestObj(self.neutron, self.nb_api))
network = objects.NetworkTestObj(self.neutron, self.nb_api)
self.addCleanup(network.close)
network_params = {"name": "vlan_1",
"provider:network_type": "vlan",
"provider:physical_network": physical_network,
@ -117,7 +120,8 @@ class TestL2FLows(test_base.DFTestBase):
# Create VM
ovs = utils.OvsFlowsParser()
vm = self.store(objects.VMTestObj(self, self.neutron))
vm = objects.VMTestObj(self, self.neutron)
self.addCleanup(vm.close)
vm.create(network=network)
ip = vm.get_first_ipv4()
self.assertIsNotNone(ip)
@ -271,7 +275,8 @@ class TestL2FLows(test_base.DFTestBase):
return
# Create network
network = self.store(objects.NetworkTestObj(self.neutron, self.nb_api))
network = objects.NetworkTestObj(self.neutron, self.nb_api)
self.addCleanup(network.close)
network_params = {"name": "flat_1",
"provider:network_type": "flat",
"provider:physical_network": physical_network}
@ -290,7 +295,8 @@ class TestL2FLows(test_base.DFTestBase):
# Create VM
ovs = utils.OvsFlowsParser()
vm = self.store(objects.VMTestObj(self, self.neutron))
vm = objects.VMTestObj(self, self.neutron)
self.addCleanup(vm.close)
vm.create(network=network)
ip = vm.get_first_ipv4()
self.assertIsNotNone(ip)
@ -474,7 +480,8 @@ class TestL2FLows(test_base.DFTestBase):
return None
def test_vm_multicast(self):
network = self.store(objects.NetworkTestObj(self.neutron, self.nb_api))
network = objects.NetworkTestObj(self.neutron, self.nb_api)
self.addCleanup(network.close)
network_id = network.create()
subnet = {'network_id': network_id,
'cidr': '10.200.0.0/24',
@ -485,7 +492,8 @@ class TestL2FLows(test_base.DFTestBase):
subnet = self.neutron.create_subnet({'subnet': subnet})
ovs = utils.OvsFlowsParser()
vm = self.store(objects.VMTestObj(self, self.neutron))
vm = objects.VMTestObj(self, self.neutron)
self.addCleanup(vm.close)
vm.create(network=network)
ip = vm.get_first_ipv4()
self.assertIsNotNone(ip)

View File

@ -23,20 +23,14 @@ class TestL3Flows(test_base.DFTestBase):
def setUp(self):
super(TestL3Flows, self).setUp()
self.topology = None
try:
self.topology = app_testing_objects.Topology(
self.neutron,
self.nb_api)
self.subnet1 = self.topology.create_subnet(cidr='192.168.10.0/24')
self.port1 = self.subnet1.create_port()
self.router = self.topology.create_router([
self.subnet1.subnet_id])
except Exception:
if self.topology:
self.topology.close()
raise
self.store(self.topology)
self.topology = app_testing_objects.Topology(
self.neutron,
self.nb_api)
self.addCleanup(self.topology.close)
self.subnet1 = self.topology.create_subnet(cidr='192.168.10.0/24')
self.port1 = self.subnet1.create_port()
self.router = self.topology.create_router([
self.subnet1.subnet_id])
def test_router_add_extra_route(self):
lport = self.port1.port.get_logical_port()

View File

@ -31,14 +31,16 @@ class TestNeutronAPIandDB(test_base.DFTestBase):
super(TestNeutronAPIandDB, self).setUp()
def test_create_network(self):
network = self.store(objects.NetworkTestObj(self.neutron, self.nb_api))
network = objects.NetworkTestObj(self.neutron, self.nb_api)
self.addCleanup(network.close)
network.create()
self.assertTrue(network.exists())
network.close()
self.assertFalse(network.exists())
def test_create_network_with_mtu(self):
network = self.store(objects.NetworkTestObj(self.neutron, self.nb_api))
network = objects.NetworkTestObj(self.neutron, self.nb_api)
self.addCleanup(network.close)
network.create()
self.assertTrue(network.exists())
netobj = network.get_network()
@ -50,7 +52,8 @@ class TestNeutronAPIandDB(test_base.DFTestBase):
self.assertFalse(network.exists())
def test_dhcp_port_created(self):
network = self.store(objects.NetworkTestObj(self.neutron, self.nb_api))
network = objects.NetworkTestObj(self.neutron, self.nb_api)
self.addCleanup(network.close)
network_id = network.create()
self.assertTrue(network.exists())
subnet = {'network_id': network_id,
@ -80,14 +83,12 @@ class TestNeutronAPIandDB(test_base.DFTestBase):
self.assertEqual(0, dhcp_ports_found)
def test_create_delete_subnet(self):
network = self.store(objects.NetworkTestObj(self.neutron, self.nb_api))
network = objects.NetworkTestObj(self.neutron, self.nb_api)
self.addCleanup(network.close)
network_id = network.create()
self.assertTrue(network.exists())
subnet = self.store(objects.SubnetTestObj(
self.neutron,
self.nb_api,
network_id,
))
subnet = objects.SubnetTestObj(self.neutron, self.nb_api, network_id)
self.addCleanup(subnet.close)
subnet_id = subnet.create()
self.assertTrue(subnet.exists())
self.assertEqual(subnet_id, subnet.get_subnet().id)
@ -96,14 +97,12 @@ class TestNeutronAPIandDB(test_base.DFTestBase):
network.close()
def test_create_subnet_with_host_routes(self):
network = self.store(objects.NetworkTestObj(self.neutron, self.nb_api))
network = objects.NetworkTestObj(self.neutron, self.nb_api)
self.addCleanup(network.close)
network_id = network.create()
self.assertTrue(network.exists())
subnet = self.store(objects.SubnetTestObj(
self.neutron,
self.nb_api,
network_id,
))
subnet = objects.SubnetTestObj(self.neutron, self.nb_api, network_id)
self.addCleanup(subnet.close)
subnet_data = {
'cidr': '192.168.199.0/24',
'ip_version': 4,
@ -126,7 +125,8 @@ class TestNeutronAPIandDB(test_base.DFTestBase):
for host_route in db_subnet.host_routes])
def test_create_delete_router(self):
router = self.store(objects.RouterTestObj(self.neutron, self.nb_api))
router = objects.RouterTestObj(self.neutron, self.nb_api)
self.addCleanup(router.close)
router_id = router.create()
self.assertTrue(router.exists())
version1 = self.nb_api.get(l3.LogicalRouter(id=router_id)).version
@ -138,15 +138,14 @@ class TestNeutronAPIandDB(test_base.DFTestBase):
self.assertFalse(router.exists())
def test_create_router_interface(self):
router = self.store(objects.RouterTestObj(self.neutron, self.nb_api))
network = self.store(objects.NetworkTestObj(self.neutron, self.nb_api))
router = objects.RouterTestObj(self.neutron, self.nb_api)
self.addCleanup(router.close)
network = objects.NetworkTestObj(self.neutron, self.nb_api)
self.addCleanup(network.close)
network_id = network.create()
self.assertTrue(network.exists())
subnet = self.store(objects.SubnetTestObj(
self.neutron,
self.nb_api,
network_id,
))
subnet = objects.SubnetTestObj(self.neutron, self.nb_api, network_id)
self.addCleanup(subnet.close)
subnet_id = subnet.create()
router_id = router.create()
self.assertTrue(router.exists())
@ -165,11 +164,12 @@ class TestNeutronAPIandDB(test_base.DFTestBase):
self.assertFalse(network.exists())
def test_create_port(self):
network = self.store(objects.NetworkTestObj(self.neutron, self.nb_api))
network = objects.NetworkTestObj(self.neutron, self.nb_api)
self.addCleanup(network.close)
network_id = network.create()
self.assertTrue(network.exists())
port = self.store(objects.PortTestObj(self.neutron,
self.nb_api, network_id))
port = objects.PortTestObj(self.neutron, self.nb_api, network_id)
self.addCleanup(port.close)
port.create()
self.assertTrue(port.exists())
self.assertEqual(network_id, port.get_logical_port().lswitch.id)
@ -179,18 +179,18 @@ class TestNeutronAPIandDB(test_base.DFTestBase):
self.assertFalse(network.exists())
def test_create_port_with_qospolicy(self):
network = self.store(objects.NetworkTestObj(self.neutron, self.nb_api))
network = objects.NetworkTestObj(self.neutron, self.nb_api)
self.addCleanup(network.close)
network_id = network.create()
self.assertTrue(network.exists())
qospolicy = self.store(objects.QosPolicyTestObj(self.neutron,
self.nb_api))
qospolicy = objects.QosPolicyTestObj(self.neutron, self.nb_api)
self.addCleanup(qospolicy.close)
qos_policy_id = qospolicy.create()
self.assertTrue(qospolicy.exists())
port = self.store(objects.PortTestObj(self.neutron,
self.nb_api,
network_id))
port = objects.PortTestObj(self.neutron, self.nb_api, network_id)
self.addCleanup(port.close)
port_param = {
'admin_state_up': True,
'name': 'port1',
@ -210,18 +210,18 @@ class TestNeutronAPIandDB(test_base.DFTestBase):
self.assertFalse(qospolicy.exists())
def test_update_port_with_qospolicy(self):
network = self.store(objects.NetworkTestObj(self.neutron, self.nb_api))
network = objects.NetworkTestObj(self.neutron, self.nb_api)
self.addCleanup(network.close)
network_id = network.create()
self.assertTrue(network.exists())
qospolicy = self.store(objects.QosPolicyTestObj(self.neutron,
self.nb_api))
qospolicy = objects.QosPolicyTestObj(self.neutron, self.nb_api)
self.addCleanup(qospolicy.close)
qos_policy_id = qospolicy.create()
self.assertTrue(qospolicy.exists())
port = self.store(objects.PortTestObj(self.neutron,
self.nb_api,
network_id))
port = objects.PortTestObj(self.neutron, self.nb_api, network_id)
self.addCleanup(port.close)
port.create()
self.assertTrue(port.exists())
@ -242,15 +242,14 @@ class TestNeutronAPIandDB(test_base.DFTestBase):
self.assertFalse(qospolicy.exists())
def test_delete_router_interface_port(self):
router = self.store(objects.RouterTestObj(self.neutron, self.nb_api))
network = self.store(objects.NetworkTestObj(self.neutron, self.nb_api))
router = objects.RouterTestObj(self.neutron, self.nb_api)
self.addCleanup(router.close)
network = objects.NetworkTestObj(self.neutron, self.nb_api)
self.addCleanup(network.close)
network_id = network.create()
self.assertTrue(network.exists())
subnet = self.store(objects.SubnetTestObj(
self.neutron,
self.nb_api,
network_id,
))
subnet = objects.SubnetTestObj(self.neutron, self.nb_api, network_id)
self.addCleanup(subnet.close)
subnet_id = subnet.create({
'cidr': '91.126.188.0/24',
'ip_version': 4,
@ -286,8 +285,8 @@ class TestNeutronAPIandDB(test_base.DFTestBase):
self.assertFalse(network.exists())
def test_create_delete_security_group(self):
secgroup = self.store(
objects.SecGroupTestObj(self.neutron, self.nb_api))
secgroup = objects.SecGroupTestObj(self.neutron, self.nb_api)
self.addCleanup(secgroup.close)
sg_id = secgroup.create()
self.assertTrue(secgroup.exists())
secgroup_obj = secgroups.SecurityGroup(id=sg_id)
@ -301,8 +300,8 @@ class TestNeutronAPIandDB(test_base.DFTestBase):
self.assertFalse(secgroup.exists())
def test_create_delete_security_group_rule(self):
secgroup = self.store(
objects.SecGroupTestObj(self.neutron, self.nb_api))
secgroup = objects.SecGroupTestObj(self.neutron, self.nb_api)
self.addCleanup(secgroup.close)
secgroup.create()
self.assertTrue(secgroup.exists())
secrule_id = secgroup.rule_create()
@ -313,8 +312,8 @@ class TestNeutronAPIandDB(test_base.DFTestBase):
self.assertFalse(secgroup.exists())
def test_create_delete_qos_policy(self):
qospolicy = self.store(
objects.QosPolicyTestObj(self.neutron, self.nb_api))
qospolicy = objects.QosPolicyTestObj(self.neutron, self.nb_api)
self.addCleanup(qospolicy.close)
policy_id = qospolicy.create()
self.assertTrue(qospolicy.exists())
rule = {'max_kbps': '1000', 'max_burst_kbps': '100'}
@ -328,16 +327,14 @@ class TestNeutronAPIandDB(test_base.DFTestBase):
external_net = objects.find_first_network(self.neutron,
{'router:external': True})
if not external_net:
network = self.store(
objects.NetworkTestObj(self.neutron, self.nb_api))
network = objects.NetworkTestObj(self.neutron, self.nb_api)
self.addCleanup(network.close)
external_net_para = {'name': 'public', 'router:external': True}
external_network_id = network.create(network=external_net_para)
self.assertTrue(network.exists())
ext_subnet = self.store(objects.SubnetTestObj(
self.neutron,
self.nb_api,
external_network_id,
))
ext_subnet = objects.SubnetTestObj(self.neutron, self.nb_api,
external_network_id)
self.addCleanup(ext_subnet.close)
external_subnet_para = {'cidr': '192.168.199.0/24',
'ip_version': 4,
'network_id': external_network_id}
@ -359,10 +356,10 @@ class TestNeutronAPIandDB(test_base.DFTestBase):
@lockutils.synchronized('need-external-net')
def test_associate_floatingip(self):
with self._prepare_ext_net() as external_network_id:
router = self.store(
objects.RouterTestObj(self.neutron, self.nb_api))
fip = self.store(
objects.FloatingipTestObj(self.neutron, self.nb_api))
router = objects.RouterTestObj(self.neutron, self.nb_api)
self.addCleanup(router.close)
fip = objects.FloatingipTestObj(self.neutron, self.nb_api)
self.addCleanup(fip.close)
router_para = {
'name': 'myrouter1', 'admin_state_up': True,
@ -371,17 +368,15 @@ class TestNeutronAPIandDB(test_base.DFTestBase):
self.assertTrue(router.exists())
# private network
private_network = self.store(
objects.NetworkTestObj(self.neutron, self.nb_api))
private_network = objects.NetworkTestObj(self.neutron, self.nb_api)
self.addCleanup(private_network.close)
private_network_id = private_network.create()
self.assertTrue(private_network.exists())
# private subnet
priv_subnet = self.store(objects.SubnetTestObj(
self.neutron,
self.nb_api,
private_network_id,
))
priv_subnet = objects.SubnetTestObj(self.neutron, self.nb_api,
private_network_id)
self.addCleanup(priv_subnet.close)
private_subnet_para = {'cidr': '10.0.0.0/24',
'ip_version': 4,
'network_id': private_network_id}
@ -392,9 +387,9 @@ class TestNeutronAPIandDB(test_base.DFTestBase):
l2.LogicalPort(id=router_interface['port_id']))
self.assertIsNotNone(router_lport)
port = self.store(
objects.PortTestObj(self.neutron,
self.nb_api, private_network_id))
port = objects.PortTestObj(self.neutron, self.nb_api,
private_network_id)
self.addCleanup(port.close)
port_id = port.create()
self.assertIsNotNone(port.get_logical_port())
@ -420,10 +415,10 @@ class TestNeutronAPIandDB(test_base.DFTestBase):
@lockutils.synchronized('need-external-net')
def test_disassociate_floatingip(self):
with self._prepare_ext_net() as external_network_id:
router = self.store(
objects.RouterTestObj(self.neutron, self.nb_api))
fip = self.store(
objects.FloatingipTestObj(self.neutron, self.nb_api))
router = objects.RouterTestObj(self.neutron, self.nb_api)
self.addCleanup(router.close)
fip = objects.FloatingipTestObj(self.neutron, self.nb_api)
self.addCleanup(fip.close)
router_para = {
'name': 'myrouter1', 'admin_state_up': True,
@ -432,16 +427,14 @@ class TestNeutronAPIandDB(test_base.DFTestBase):
self.assertTrue(router.exists())
# private network
private_network = self.store(
objects.NetworkTestObj(self.neutron, self.nb_api))
private_network = objects.NetworkTestObj(self.neutron, self.nb_api)
self.addCleanup(private_network.close)
private_network_id = private_network.create()
self.assertTrue(private_network.exists())
# private subnet
priv_subnet = self.store(objects.SubnetTestObj(
self.neutron,
self.nb_api,
private_network_id,
))
priv_subnet = objects.SubnetTestObj(self.neutron, self.nb_api,
private_network_id)
self.addCleanup(priv_subnet.close)
private_subnet_para = {'cidr': '10.0.0.0/24',
'ip_version': 4,
'network_id': private_network_id}
@ -452,9 +445,9 @@ class TestNeutronAPIandDB(test_base.DFTestBase):
l2.LogicalPort(id=router_interface['port_id']))
self.assertIsNotNone(router_lport)
port = self.store(
objects.PortTestObj(self.neutron,
self.nb_api, private_network_id))
port = objects.PortTestObj(self.neutron, self.nb_api,
private_network_id)
self.addCleanup(port.close)
port_id = port.create()
self.assertIsNotNone(port.get_logical_port())
@ -479,22 +472,20 @@ class TestNeutronAPIandDB(test_base.DFTestBase):
self.assertFalse(priv_subnet.exists())
def test_enable_disable_portsec(self):
network = self.store(objects.NetworkTestObj(self.neutron, self.nb_api))
network = objects.NetworkTestObj(self.neutron, self.nb_api)
self.addCleanup(network.close)
network_id = network.create()
self.assertTrue(network.exists())
network2 = self.store(objects.NetworkTestObj(self.neutron,
self.nb_api))
network2 = objects.NetworkTestObj(self.neutron, self.nb_api)
self.addCleanup(network2.close)
network_id2 = network2.create({'name': 'mynetwork1',
'admin_state_up': True,
'port_security_enabled': False})
self.assertTrue(network2.exists())
subnet = self.store(objects.SubnetTestObj(
self.neutron,
self.nb_api,
network_id,
))
subnet = objects.SubnetTestObj(self.neutron, self.nb_api, network_id)
self.addCleanup(subnet.close)
subnet.create({
'cidr': '192.168.125.0/24',
'ip_version': 4,
@ -502,11 +493,8 @@ class TestNeutronAPIandDB(test_base.DFTestBase):
})
self.assertTrue(subnet.exists())
subnet2 = self.store(objects.SubnetTestObj(
self.neutron,
self.nb_api,
network_id2,
))
subnet2 = objects.SubnetTestObj(self.neutron, self.nb_api, network_id2)
self.addCleanup(subnet2.close)
subnet2.create({
'cidr': '192.168.126.0/24',
'ip_version': 4,
@ -515,8 +503,8 @@ class TestNeutronAPIandDB(test_base.DFTestBase):
self.assertTrue(subnet2.exists())
network_portsec_switch = True
port = self.store(
objects.PortTestObj(self.neutron, self.nb_api, network_id))
port = objects.PortTestObj(self.neutron, self.nb_api, network_id)
self.addCleanup(port.close)
port.create()
lport = port.get_logical_port()
self.assertIsNotNone(lport)
@ -524,16 +512,16 @@ class TestNeutronAPIandDB(test_base.DFTestBase):
self.assertEqual(network_portsec_switch, real_switch)
network_portsec_switch = False
port = self.store(
objects.PortTestObj(self.neutron, self.nb_api, network_id2))
port = objects.PortTestObj(self.neutron, self.nb_api, network_id2)
self.addCleanup(port.close)
port.create()
lport = port.get_logical_port()
self.assertIsNotNone(lport)
real_switch = lport.port_security_enabled
self.assertEqual(network_portsec_switch, real_switch)
port = self.store(
objects.PortTestObj(self.neutron, self.nb_api, network_id))
port = objects.PortTestObj(self.neutron, self.nb_api, network_id)
self.addCleanup(port.close)
expected_switch = False
port.create({
'admin_state_up': True,
@ -554,15 +542,13 @@ class TestNeutronAPIandDB(test_base.DFTestBase):
self.assertEqual(expected_switch, real_switch)
def test_add_remove_allowed_address_pairs(self):
network = self.store(objects.NetworkTestObj(self.neutron, self.nb_api))
network = objects.NetworkTestObj(self.neutron, self.nb_api)
self.addCleanup(network.close)
network_id = network.create()
self.assertTrue(network.exists())
subnet = self.store(objects.SubnetTestObj(
self.neutron,
self.nb_api,
network_id,
))
subnet = objects.SubnetTestObj(self.neutron, self.nb_api, network_id)
self.addCleanup(subnet.close)
subnet.create({
'cidr': '192.168.127.0/24',
'ip_version': 4,
@ -570,8 +556,8 @@ class TestNeutronAPIandDB(test_base.DFTestBase):
})
self.assertTrue(subnet.exists())
port = self.store(
objects.PortTestObj(self.neutron, self.nb_api, network_id))
port = objects.PortTestObj(self.neutron, self.nb_api, network_id)
self.addCleanup(port.close)
expected_pairs = [
{"ip_address": "192.168.127.201",
"mac_address": "00:22:33:44:55:66"},
@ -602,26 +588,26 @@ class TestNeutronAPIandDB(test_base.DFTestBase):
self.assertItemsEqual(expected_pairs, real_pairs)
def test_create_delete_bgp_peer(self):
bgp_peer = self.store(
objects.BGPPeerTestObj(self.neutron, self.nb_api))
bgp_peer = objects.BGPPeerTestObj(self.neutron, self.nb_api)
self.addCleanup(bgp_peer.close)
bgp_peer.create()
self.assertTrue(bgp_peer.exists())
bgp_peer.close()
self.assertFalse(bgp_peer.exists())
def test_create_delete_bgp_speaker(self):
bgp_speaker = self.store(
objects.BGPSpeakerTestObj(self.neutron, self.nb_api))
bgp_speaker = objects.BGPSpeakerTestObj(self.neutron, self.nb_api)
self.addCleanup(bgp_speaker.close)
bgp_speaker.create()
self.assertTrue(bgp_speaker.exists())
bgp_speaker.close()
self.assertFalse(bgp_speaker.exists())
def test_add_remove_bgp_peer(self):
bgp_peer = self.store(
objects.BGPPeerTestObj(self.neutron, self.nb_api))
bgp_speaker = self.store(
objects.BGPSpeakerTestObj(self.neutron, self.nb_api))
bgp_peer = objects.BGPPeerTestObj(self.neutron, self.nb_api)
self.addCleanup(bgp_peer.close)
bgp_speaker = objects.BGPSpeakerTestObj(self.neutron, self.nb_api)
self.addCleanup(bgp_speaker.close)
bgp_peer.create()
bgp_speaker.create()
bgp_speaker.add_peer(bgp_peer.peer_id)
@ -635,10 +621,10 @@ class TestNeutronAPIandDB(test_base.DFTestBase):
self.assertNotIn(bgp_peer.peer_id, nb_bgp_speaker.peers)
def test_delete_bgp_peer_update_bgp_speaker(self):
bgp_peer = self.store(
objects.BGPPeerTestObj(self.neutron, self.nb_api))
bgp_speaker = self.store(
objects.BGPSpeakerTestObj(self.neutron, self.nb_api))
bgp_peer = objects.BGPPeerTestObj(self.neutron, self.nb_api)
self.addCleanup(bgp_peer.close)
bgp_speaker = objects.BGPSpeakerTestObj(self.neutron, self.nb_api)
self.addCleanup(bgp_speaker.close)
bgp_peer.create()
bgp_speaker.create()
bgp_speaker.add_peer(bgp_peer.peer_id)
@ -653,52 +639,51 @@ class TestNeutronAPIandDB(test_base.DFTestBase):
@lockutils.synchronized('need-external-net')
def test_add_remove_bgp_network(self):
bgp_speaker = self.store(
objects.BGPSpeakerTestObj(self.neutron, self.nb_api))
bgp_speaker = objects.BGPSpeakerTestObj(self.neutron, self.nb_api)
self.addCleanup(bgp_speaker.close)
bgp_speaker.create()
address_scope = self.store(
objects.AddressScopeTestObj(self.neutron, self.nb_api))
address_scope = objects.AddressScopeTestObj(self.neutron, self.nb_api)
self.addCleanup(address_scope.close)
as_id = address_scope.create()
private_subnetpool = self.store(
objects.SubnetPoolTestObj(self.neutron, self.nb_api))
private_subnetpool = objects.SubnetPoolTestObj(self.neutron,
self.nb_api)
self.addCleanup(private_subnetpool.close)
private_sp_id = private_subnetpool.create(
subnetpool={'name': "private_sp",
'default_prefixlen': 24,
'prefixes': ["20.0.0.0/8"],
'address_scope_id': as_id})
public_subnetpool = self.store(
objects.SubnetPoolTestObj(self.neutron, self.nb_api))
public_subnetpool = objects.SubnetPoolTestObj(self.neutron,
self.nb_api)
self.addCleanup(public_subnetpool.close)
public_sp_id = public_subnetpool.create(
subnetpool={'name': "public_sp",
'default_prefixlen': 24,
'prefixes': ["172.24.4.0/24"],
'address_scope_id': as_id})
public_network = self.store(
objects.NetworkTestObj(self.neutron, self.nb_api))
public_network = objects.NetworkTestObj(self.neutron, self.nb_api)
self.addCleanup(public_network.close)
public_network_id = public_network.create(
network={'name': 'public', 'router:external': True})
public_subnet = self.store(objects.SubnetTestObj(
self.neutron,
self.nb_api,
public_network_id,
))
public_subnet = objects.SubnetTestObj(self.neutron, self.nb_api,
public_network_id)
self.addCleanup(public_subnet.close)
public_subnet.create(subnet={'ip_version': 4,
'network_id': public_network_id,
'subnetpool_id': public_sp_id})
private_network = self.store(
objects.NetworkTestObj(self.neutron, self.nb_api))
private_network = objects.NetworkTestObj(self.neutron, self.nb_api)
self.addCleanup(private_network.close)
private_network_id = private_network.create(network={'name': "public"})
private_subnet = self.store(objects.SubnetTestObj(
self.neutron,
self.nb_api,
private_network_id,
))
private_subnet = objects.SubnetTestObj(self.neutron, self.nb_api,
private_network_id)
self.addCleanup(private_subnet.close)
private_sn_id = private_subnet.create(
subnet={'ip_version': 4,
'network_id': private_network_id,
'subnetpool_id': private_sp_id})
bgp_speaker.add_network(public_network_id)
router = self.store(objects.RouterTestObj(self.neutron, self.nb_api))
router = objects.RouterTestObj(self.neutron, self.nb_api)
self.addCleanup(router.close)
router_id = router.create()
self.neutron.add_interface_router(
router_id, body={'subnet_id': private_sn_id})
@ -708,11 +693,13 @@ class TestNeutronAPIandDB(test_base.DFTestBase):
nb_bgp_speaker = bgp_speaker.get_nb_bgp_speaker()
self.assertEqual(1, len(nb_bgp_speaker.prefix_routes))
vm = self.store(objects.VMTestObj(self, self.neutron))
vm = objects.VMTestObj(self, self.neutron)
self.addCleanup(vm.close)
vm_id = vm.create(network=private_network)
vm_port = self.neutron.list_ports(device_id=vm_id).get('ports')[0]
vm_port_id = vm_port.get('id')
fip = self.store(objects.FloatingipTestObj(self.neutron, self.nb_api))
fip = objects.FloatingipTestObj(self.neutron, self.nb_api)
self.addCleanup(fip.close)
fip.create({'floating_network_id': public_network_id,
'port_id': vm_port_id})
fip_addr = fip.get_floatingip().floating_ip_address

View File

@ -28,14 +28,15 @@ class TestObjectVersion(test_base.DFTestBase):
super(TestObjectVersion, self).setUp()
def test_network_version(self):
network = self.store(objects.NetworkTestObj(self.neutron, self.nb_api))
network = objects.NetworkTestObj(self.neutron, self.nb_api)
self.addCleanup(network.close)
network_id = network.create()
lean_lswitch = l2.LogicalSwitch(id=network_id)
self.assertTrue(network.exists())
version = self.nb_api.get(lean_lswitch).version
subnet = self.store(objects.SubnetTestObj(
self.neutron, self.nb_api, network_id))
subnet = objects.SubnetTestObj(self.neutron, self.nb_api, network_id)
self.addCleanup(subnet.close)
subnet.create()
self.assertTrue(subnet.exists())
new_version = self.nb_api.get(lean_lswitch).version
@ -51,17 +52,18 @@ class TestObjectVersion(test_base.DFTestBase):
self.assertFalse(network.exists())
def test_port_version(self):
network = self.store(objects.NetworkTestObj(self.neutron, self.nb_api))
network = objects.NetworkTestObj(self.neutron, self.nb_api)
self.addCleanup(network.close)
network_id = network.create()
self.assertTrue(network.exists())
subnet = self.store(objects.SubnetTestObj(
self.neutron, self.nb_api, network_id))
subnet = objects.SubnetTestObj(self.neutron, self.nb_api, network_id)
self.addCleanup(subnet.close)
subnet.create()
self.assertTrue(subnet.exists())
port = self.store(objects.PortTestObj(
self.neutron, self.nb_api, network_id))
port = objects.PortTestObj(self.neutron, self.nb_api, network_id)
self.addCleanup(port.close)
port_id = port.create()
self.assertTrue(port.exists())
prev_version = self.nb_api.get(l2.LogicalPort(id=port_id)).version
@ -79,17 +81,16 @@ class TestObjectVersion(test_base.DFTestBase):
self.assertFalse(network.exists())
def test_router_version(self):
network = self.store(objects.NetworkTestObj(self.neutron, self.nb_api))
network = objects.NetworkTestObj(self.neutron, self.nb_api)
self.addCleanup(network.close)
network_id = network.create()
self.assertTrue(network.exists())
subnet = self.store(objects.SubnetTestObj(
self.neutron,
self.nb_api,
network_id,
))
subnet = objects.SubnetTestObj(self.neutron, self.nb_api, network_id)
self.addCleanup(subnet.close)
subnet_id = subnet.create()
self.assertTrue(subnet.exists())
router = self.store(objects.RouterTestObj(self.neutron, self.nb_api))
router = objects.RouterTestObj(self.neutron, self.nb_api)
self.addCleanup(router.close)
router_id = router.create()
self.assertTrue(router.exists())
prev_version = self.nb_api.get(l3.LogicalRouter(id=router_id)).version
@ -112,8 +113,8 @@ class TestObjectVersion(test_base.DFTestBase):
self.assertFalse(network.exists())
def test_sg_version(self):
secgroup = self.store(
objects.SecGroupTestObj(self.neutron, self.nb_api))
secgroup = objects.SecGroupTestObj(self.neutron, self.nb_api)
self.addCleanup(secgroup.close)
sg_id = secgroup.create()
self.assertTrue(secgroup.exists())
sg_obj = secgroups.SecurityGroup(id=sg_id)
@ -136,8 +137,8 @@ class TestObjectVersion(test_base.DFTestBase):
self.assertFalse(secgroup.exists())
def test_qospolicy_version(self):
qospolicy = self.store(objects.QosPolicyTestObj(self.neutron,
self.nb_api))
qospolicy = objects.QosPolicyTestObj(self.neutron, self.nb_api)
self.addCleanup(qospolicy.close)
policy_id = qospolicy.create()
self.assertTrue(qospolicy.exists())
version = self.nb_api.get(qos.QosPolicy(id=policy_id)).version
@ -156,16 +157,14 @@ class TestObjectVersion(test_base.DFTestBase):
external_net = objects.find_first_network(self.neutron,
{'router:external': True})
if not external_net:
network = self.store(
objects.NetworkTestObj(self.neutron, self.nb_api))
network = objects.NetworkTestObj(self.neutron, self.nb_api)
self.addCleanup(network.close)
external_net_para = {'name': 'public', 'router:external': True}
external_network_id = network.create(network=external_net_para)
self.assertTrue(network.exists())
ext_subnet = self.store(objects.SubnetTestObj(
self.neutron,
self.nb_api,
external_network_id,
))
ext_subnet = objects.SubnetTestObj(self.neutron, self.nb_api,
external_network_id)
self.addCleanup(ext_subnet.close)
external_subnet_para = {'cidr': '192.168.199.0/24',
'ip_version': 4,
'network_id': external_network_id}
@ -187,22 +186,20 @@ class TestObjectVersion(test_base.DFTestBase):
@lockutils.synchronized('need-external-net')
def test_floatingip_version(self):
with self._prepare_ext_net() as external_network_id:
private_network = self.store(
objects.NetworkTestObj(self.neutron, self.nb_api))
private_network = objects.NetworkTestObj(self.neutron, self.nb_api)
self.addCleanup(private_network.close)
private_network_id = private_network.create()
self.assertTrue(private_network.exists())
priv_subnet = self.store(objects.SubnetTestObj(
self.neutron,
self.nb_api,
private_network_id,
))
router = self.store(
objects.RouterTestObj(self.neutron, self.nb_api))
port = self.store(
objects.PortTestObj(self.neutron,
self.nb_api, private_network_id))
fip = self.store(
objects.FloatingipTestObj(self.neutron, self.nb_api))
priv_subnet = objects.SubnetTestObj(self.neutron, self.nb_api,
private_network_id)
self.addCleanup(priv_subnet.close)
router = objects.RouterTestObj(self.neutron, self.nb_api)
self.addCleanup(router.close)
port = objects.PortTestObj(self.neutron,
self.nb_api, private_network_id)
self.addCleanup(port.close)
fip = objects.FloatingipTestObj(self.neutron, self.nb_api)
self.addCleanup(fip.close)
router_para = {
'name': 'myrouter1', 'admin_state_up': True,

View File

@ -94,9 +94,10 @@ class TestOvsdbMonitor(test_base.DFTestBase):
def test_notify_message(self):
network = objects.NetworkTestObj(self.neutron, self.nb_api)
self.addCleanup(network.close)
network_id = network.create()
subnet = self.store(objects.SubnetTestObj(self.neutron, self.nb_api,
network_id))
subnet = objects.SubnetTestObj(self.neutron, self.nb_api, network_id)
self.addCleanup(subnet.close)
subnet_body = {'network_id': network_id,
'cidr': '10.10.0.0/24',
'gateway_ip': '10.10.0.1',
@ -107,7 +108,8 @@ class TestOvsdbMonitor(test_base.DFTestBase):
self.assertTrue(network.exists())
self.assertTrue(subnet.exists())
vm = self.store(objects.VMTestObj(self, self.neutron))
vm = objects.VMTestObj(self, self.neutron)
self.addCleanup(vm.close)
vm.create(network=network)
self.assertIsNotNone(vm.server.addresses['mynetwork'])
mac = vm.server.addresses['mynetwork'][0]['OS-EXT-IPS-MAC:mac_addr']
@ -133,10 +135,11 @@ class TestOvsdbMonitor(test_base.DFTestBase):
)
def test_reply_message(self):
network = self.store(objects.NetworkTestObj(self.neutron, self.nb_api))
network = objects.NetworkTestObj(self.neutron, self.nb_api)
self.addCleanup(network.close)
network_id = network.create()
subnet = self.store(objects.SubnetTestObj(self.neutron, self.nb_api,
network_id))
subnet = objects.SubnetTestObj(self.neutron, self.nb_api, network_id)
self.addCleanup(subnet.close)
subnet_body = {'network_id': network_id,
'cidr': '10.20.0.0/24',
'gateway_ip': '10.20.0.1',
@ -147,13 +150,15 @@ class TestOvsdbMonitor(test_base.DFTestBase):
self.assertTrue(network.exists())
self.assertTrue(subnet.exists())
vm1 = self.store(objects.VMTestObj(self, self.neutron))
vm1 = objects.VMTestObj(self, self.neutron)
self.addCleanup(vm1.close)
vm1.create(network=network)
self.assertIsNotNone(vm1.server.addresses['mynetwork'])
mac1 = vm1.server.addresses['mynetwork'][0]['OS-EXT-IPS-MAC:mac_addr']
self.assertIsNotNone(mac1)
vm2 = self.store(objects.VMTestObj(self, self.neutron))
vm2 = objects.VMTestObj(self, self.neutron)
self.addCleanup(vm2.close)
vm2.create(network=network)
self.assertIsNotNone(vm2.server.addresses['mynetwork'])
mac2 = vm2.server.addresses['mynetwork'][0]['OS-EXT-IPS-MAC:mac_addr']

View File

@ -19,16 +19,19 @@ from dragonflow.tests.fullstack import test_objects as objects
class TestPortQos(test_base.DFTestBase):
def test_port_with_qospolicy(self):
network = self.store(objects.NetworkTestObj(self.neutron, self.nb_api))
network = objects.NetworkTestObj(self.neutron, self.nb_api)
self.addCleanup(network.close)
network_id = network.create()
self.assertTrue(network.exists())
subnet = self.store(objects.SubnetTestObj(self.neutron, self.nb_api,
network_id=network_id))
subnet = objects.SubnetTestObj(self.neutron, self.nb_api,
network_id=network_id)
self.addCleanup(subnet.close)
subnet.create()
self.assertTrue(subnet.exists())
vm = self.store(objects.VMTestObj(self, self.neutron))
vm = objects.VMTestObj(self, self.neutron)
self.addCleanup(vm.close)
vm_id = vm.create(network=network)
vm_port_id = self.vswitch_api.get_port_id_by_vm_id(vm_id)
@ -36,8 +39,8 @@ class TestPortQos(test_base.DFTestBase):
port = objects.PortTestObj(self.neutron, self.nb_api, network_id,
vm_port_id)
qospolicy = self.store(objects.QosPolicyTestObj(self.neutron,
self.nb_api))
qospolicy = objects.QosPolicyTestObj(self.neutron, self.nb_api)
self.addCleanup(qospolicy.close)
qos_policy_id = qospolicy.create()
time.sleep(const.DEFAULT_CMD_TIMEOUT)
self.assertTrue(qospolicy.exists())

View File

@ -211,18 +211,20 @@ class TestOVSFlowsForPortSecurity(test_base.DFTestBase):
def _test_anti_spoof_flows(self, subnet_info):
network = self.store(objects.NetworkTestObj(self.neutron, self.nb_api))
network = objects.NetworkTestObj(self.neutron, self.nb_api)
self.addCleanup(network.close)
network_id = network.create()
self.assertTrue(network.exists())
subnet_info['network_id'] = network_id
subnet = self.store(objects.SubnetTestObj(self.neutron,
self.nb_api,
network_id=network_id))
subnet = objects.SubnetTestObj(self.neutron, self.nb_api,
network_id=network_id)
self.addCleanup(subnet.close)
subnet.create(subnet_info)
self.assertTrue(subnet.exists())
vm = self.store(objects.VMTestObj(self, self.neutron))
vm = objects.VMTestObj(self, self.neutron)
self.addCleanup(vm.close)
vm.create(network=network)
addresses = vm.server.addresses['mynetwork']

View File

@ -88,7 +88,8 @@ class TestPubSub(PubSubTestBase):
events_num += 1
subscriber = self._get_subscriber(_db_change_callback)
self.addCleanup(subscriber.close)
network = self.store(objects.NetworkTestObj(self.neutron, self.nb_api))
network = objects.NetworkTestObj(self.neutron, self.nb_api)
self.addCleanup(network.close)
network_id = network.create()
if cfg.CONF.df.enable_selective_topology_distribution:
topic = network.get_topic()
@ -97,11 +98,8 @@ class TestPubSub(PubSubTestBase):
time.sleep(const.DEFAULT_RESOURCE_READY_TIMEOUT)
self.assertNotEqual(local_event_num, events_num)
local_event_num = events_num
port = self.store(objects.PortTestObj(
self.neutron,
self.nb_api,
network_id
))
port = objects.PortTestObj(self.neutron, self.nb_api, network_id)
self.addCleanup(port.close)
port.create()
time.sleep(const.DEFAULT_RESOURCE_READY_TIMEOUT)
@ -132,7 +130,8 @@ class TestPubSub(PubSubTestBase):
subscriber = self._get_subscriber(_db_change_callback)
self.addCleanup(subscriber.close)
network = self.store(objects.NetworkTestObj(self.neutron, self.nb_api))
network = objects.NetworkTestObj(self.neutron, self.nb_api)
self.addCleanup(network.close)
network_id = network.create()
if cfg.CONF.df.enable_selective_topology_distribution:
topic = network.get_topic()
@ -140,11 +139,8 @@ class TestPubSub(PubSubTestBase):
else:
time.sleep(const.DEFAULT_RESOURCE_READY_TIMEOUT)
self.assertNotEqual(local_event_num, ns.events_num)
port = self.store(objects.PortTestObj(
self.neutron,
self.nb_api,
network_id
))
port = objects.PortTestObj(self.neutron, self.nb_api, network_id)
self.addCleanup(port.close)
local_event_num = ns.events_num
port_id = port.create()
time.sleep(const.DEFAULT_RESOURCE_READY_TIMEOUT)

View File

@ -22,7 +22,8 @@ from dragonflow.tests.fullstack import test_objects as objects
class TestRemotePort(test_base.DFTestBase):
def test_remote_port(self):
network = self.store(objects.NetworkTestObj(self.neutron, self.nb_api))
network = objects.NetworkTestObj(self.neutron, self.nb_api)
self.addCleanup(network.close)
network_id = network.create()
self.assertTrue(network.exists())
@ -32,14 +33,14 @@ class TestRemotePort(test_base.DFTestBase):
'ip_version': 4,
'name': 'subnet1',
'enable_dhcp': True}
subnet = self.store(objects.SubnetTestObj(self.neutron,
self.nb_api,
network_id=network_id))
subnet = objects.SubnetTestObj(self.neutron, self.nb_api,
network_id=network_id)
self.addCleanup(subnet.close)
subnet.create(subnet_info)
self.assertTrue(subnet.exists())
port = self.store(objects.PortTestObj(
self.neutron, self.nb_api, network_id))
port = objects.PortTestObj(self.neutron, self.nb_api, network_id)
self.addCleanup(port.close)
port_body = {
'admin_state_up': True,
'name': 'port1',

View File

@ -54,13 +54,12 @@ class ArpResponderTest(test_base.DFTestBase):
"""
Add a VM. Verify it's ARP flow is there.
"""
network = self.store(objects.NetworkTestObj(self.neutron, self.nb_api))
network = objects.NetworkTestObj(self.neutron, self.nb_api)
self.addCleanup(network.close)
network_id = network.create(network={'name': 'arp_responder_test'})
subnet_obj = self.store(objects.SubnetTestObj(
self.neutron,
self.nb_api,
network_id,
))
subnet_obj = objects.SubnetTestObj(self.neutron, self.nb_api,
network_id)
self.addCleanup(subnet_obj.close)
subnet = {'network_id': network_id,
'cidr': '10.10.10.0/24',
@ -71,7 +70,8 @@ class ArpResponderTest(test_base.DFTestBase):
subnet = subnet_obj.create(subnet)
flows_before = self._get_arp_table_flows()
vm = self.store(objects.VMTestObj(self, self.neutron))
vm = objects.VMTestObj(self, self.neutron)
self.addCleanup(vm.close)
vm.create(network=network)
ip = vm.get_first_ipv4()
self.assertIsNotNone(ip)
@ -134,13 +134,12 @@ class ICMPResponderTest(test_base.DFTestBase):
"""
Add a VM. Verify the icmp flow is there.
"""
network = self.store(objects.NetworkTestObj(self.neutron, self.nb_api))
network = objects.NetworkTestObj(self.neutron, self.nb_api)
self.addCleanup(network.close)
network_id = network.create(network={'name': 'icmp_responder_test'})
subnet_obj = self.store(objects.SubnetTestObj(
self.neutron,
self.nb_api,
network_id,
))
subnet_obj = objects.SubnetTestObj(self.neutron, self.nb_api,
network_id)
self.addCleanup(subnet_obj.close)
subnet = {'network_id': network_id,
'cidr': '10.10.10.0/24',
@ -152,7 +151,8 @@ class ICMPResponderTest(test_base.DFTestBase):
flows_before = self._get_l3_lookup_table_flows()
router = self.store(objects.RouterTestObj(self.neutron, self.nb_api))
router = objects.RouterTestObj(self.neutron, self.nb_api)
self.addCleanup(router.close)
router_id = router.create()
subnet_msg = {'subnet_id': subnet_id}
interface = self.neutron.add_interface_router(router_id,
@ -165,7 +165,8 @@ class ICMPResponderTest(test_base.DFTestBase):
router_ip = ip['ip_address']
break
vm = self.store(objects.VMTestObj(self, self.neutron))
vm = objects.VMTestObj(self, self.neutron)
self.addCleanup(vm.close)
vm.create(network=network)
ip = vm.get_first_ipv4()
self.assertIsNotNone(ip)

View File

@ -115,7 +115,8 @@ class SfcTestsCommonBase(test_base.DFTestBase):
ingress = self._create_sf_port()
egress = self._create_sf_port()
pp = self.store(objects.PortPairTestObj(self.neutron, self.nb_api))
pp = objects.PortPairTestObj(self.neutron, self.nb_api)
self.addCleanup(pp.close)
pp.create_from_ports(
ingress=ingress,
egress=egress,
@ -126,25 +127,24 @@ class SfcTestsCommonBase(test_base.DFTestBase):
def _create_ppg(self, width):
pps = [self._create_pp() for _ in range(width)]
ppg = self.store(
objects.PortPairGroupTestObj(self.neutron, self.nb_api))
ppg = objects.PortPairGroupTestObj(self.neutron, self.nb_api)
self.addCleanup(ppg.close)
ppg.create_from_portpairs(pps)
return ppg
def _create_pc(self, fc, layout):
ppgs = [self._create_ppg(w) for w in layout]
pc = self.store(
objects.PortChainTestObj(self.neutron, self.nb_api))
pc = objects.PortChainTestObj(self.neutron, self.nb_api)
self.addCleanup(pc.close)
pc.create_from_fcs_ppgs([fc], ppgs)
return pc
def setUp(self):
super(SfcTestsCommonBase, self).setUp()
self.security_group = self.store(objects.SecGroupTestObj(
self.neutron,
self.nb_api))
self.security_group = objects.SecGroupTestObj(self.neutron,
self.nb_api)
self.addCleanup(self.security_group.close)
security_group_id = self.security_group.create()
self.assertTrue(self.security_group.exists())
@ -163,12 +163,8 @@ class SfcTestsCommonBase(test_base.DFTestBase):
rule_id = self.security_group.rule_create(secrule=rule)
self.assertTrue(self.security_group.rule_exists(rule_id))
self.topology = self.store(
app_testing_objects.Topology(
self.neutron,
self.nb_api,
),
)
self.topology = app_testing_objects.Topology(self.neutron, self.nb_api)
self.addCleanup(self.topology.close)
self.subnet = self.topology.create_subnet(
cidr=IPV4_CIDR,
@ -520,9 +516,8 @@ class TestFcApp(SfcTestsCommonBase):
return packet
def test_fc(self):
fc = self.store(
objects.FlowClassifierTestObj(self.neutron, self.nb_api),
)
fc = objects.FlowClassifierTestObj(self.neutron, self.nb_api)
self.addCleanup(fc.close)
fc.create(self._fc_params)
pc = self._create_pc(fc, [1])
time.sleep(_QUICK_RESOURCE_READY_TIMEOUT)
@ -541,19 +536,18 @@ class TestFcApp(SfcTestsCommonBase):
),
}
port_policies.update(self._create_port_policies(pc))
policy = self.store(
app_testing_objects.Policy(
initial_actions=[
app_testing_objects.SendAction(
self.subnet.subnet_id,
self.src_port.port_id,
self._initial_packet,
),
],
port_policies=port_policies,
unknown_port_action=app_testing_objects.LogAction()
),
policy = app_testing_objects.Policy(
initial_actions=[
app_testing_objects.SendAction(
self.subnet.subnet_id,
self.src_port.port_id,
self._initial_packet,
),
],
port_policies=port_policies,
unknown_port_action=app_testing_objects.LogAction()
)
self.addCleanup(policy.close)
policy.start(self.topology)
policy.wait(10)
@ -588,9 +582,8 @@ class TestSfcApp(SfcTestsCommonBase):
self._gen_udp(src_port=SRC_PORT, dst_port=DST_PORT) /
('{len}'.format(len=len(self.layout)) * 64)
)
fc = self.store(
objects.FlowClassifierTestObj(self.neutron, self.nb_api),
)
fc = objects.FlowClassifierTestObj(self.neutron, self.nb_api)
self.addCleanup(fc.close)
fc.create(
{
'logical_source_port': self.src_port.port.port_id
@ -611,19 +604,18 @@ class TestSfcApp(SfcTestsCommonBase):
),
}
port_policies.update(self._create_port_policies(pc))
policy = self.store(
app_testing_objects.Policy(
initial_actions=[
app_testing_objects.SendAction(
self.subnet.subnet_id,
self.src_port.port_id,
initial_packet,
),
],
port_policies=port_policies,
unknown_port_action=app_testing_objects.LogAction()
),
policy = app_testing_objects.Policy(
initial_actions=[
app_testing_objects.SendAction(
self.subnet.subnet_id,
self.src_port.port_id,
initial_packet,
),
],
port_policies=port_policies,
unknown_port_action=app_testing_objects.LogAction()
)
self.addCleanup(policy.close)
policy.start(self.topology)
policy.wait(10)

View File

@ -47,7 +47,8 @@ class TestSnatFlows(test_base.DFTestBase):
if not self._check_if_app_enabled():
return
network = self.store(objects.NetworkTestObj(self.neutron, self.nb_api))
network = objects.NetworkTestObj(self.neutron, self.nb_api)
self.addCleanup(network.close)
network_id = network.create()
subnet = {'network_id': network_id,
'cidr': '10.200.0.0/24',
@ -67,7 +68,8 @@ class TestSnatFlows(test_base.DFTestBase):
# Create VM
ovs = utils.OvsFlowsParser()
vm = self.store(objects.VMTestObj(self, self.neutron))
vm = objects.VMTestObj(self, self.neutron)
self.addCleanup(vm.close)
vm.create(network=network)
ip = vm.get_first_ipv4()
self.assertIsNotNone(ip)

View File

@ -41,20 +41,19 @@ class TestTopology(test_base.DFTestBase):
self._remove_vm(vm2)
def _create_network(self):
network = self.store(objects.NetworkTestObj(self.neutron, self.nb_api))
network = objects.NetworkTestObj(self.neutron, self.nb_api)
self.addCleanup(network.close)
network_id = network.create()
self.assertTrue(network.exists())
subnet = self.store(objects.SubnetTestObj(
self.neutron,
self.nb_api,
network_id,
))
subnet = objects.SubnetTestObj(self.neutron, self.nb_api, network_id)
self.addCleanup(subnet.close)
subnet.create()
self.assertTrue(subnet.exists())
return network
def _create_vm(self, network):
vm = self.store(objects.VMTestObj(self, self.neutron))
vm = objects.VMTestObj(self, self.neutron)
self.addCleanup(vm.close)
vm.create(network=network)
vm_mac = vm.get_first_mac()
self.assertTrue(vm_mac is not None)