diff --git a/neutron/agent/l3/extensions/port_forwarding.py b/neutron/agent/l3/extensions/port_forwarding.py index 2ed43fcf896..3bf975d4d5e 100644 --- a/neutron/agent/l3/extensions/port_forwarding.py +++ b/neutron/agent/l3/extensions/port_forwarding.py @@ -27,6 +27,7 @@ from neutron.api.rpc.callbacks.consumer import registry from neutron.api.rpc.callbacks import events from neutron.api.rpc.callbacks import resources from neutron.api.rpc.handlers import resources_rpc +from neutron.common import coordination LOG = logging.getLogger(__name__) @@ -53,22 +54,22 @@ class RouterFipPortForwardingMapping(object): """ self.router_fip_mapping = collections.defaultdict(set) + @lockutils.synchronized('port-forwarding-cache') def set_port_forwardings(self, port_forwardings): for port_forwarding in port_forwardings: - self.set_fip_port_forwarding(port_forwarding.floatingip_id, - port_forwarding, - port_forwarding.router_id) + self._set_fip_port_forwarding(port_forwarding.floatingip_id, + port_forwarding, + port_forwarding.router_id) + @lockutils.synchronized('port-forwarding-cache') def update_port_forwardings(self, port_forwardings): for port_forwarding in port_forwardings: self.managed_port_forwardings[port_forwarding.id] = port_forwarding - def get_port_forwarding(self, port_forwarding_id): - return self.managed_port_forwardings.get(port_forwarding_id) - + @lockutils.synchronized('port-forwarding-cache') def del_port_forwardings(self, port_forwardings): for port_forwarding in port_forwardings: - if not self.get_port_forwarding(port_forwarding.id): + if not self.managed_port_forwardings.get(port_forwarding.id): continue self.managed_port_forwardings.pop(port_forwarding.id) self.fip_port_forwarding[port_forwarding.floatingip_id].remove( @@ -80,11 +81,12 @@ class RouterFipPortForwardingMapping(object): if not self.router_fip_mapping[port_forwarding.router_id]: del self.router_fip_mapping[port_forwarding.router_id] - def set_fip_port_forwarding(self, fip_id, pf, router_id): + def _set_fip_port_forwarding(self, fip_id, pf, router_id): self.router_fip_mapping[router_id].add(fip_id) self.fip_port_forwarding[fip_id].add(pf.id) self.managed_port_forwardings[pf.id] = pf + @lockutils.synchronized('port-forwarding-cache') def clear_by_fip(self, fip_id, router_id): self.router_fip_mapping[router_id].remove(fip_id) if len(self.router_fip_mapping[router_id]) == 0: @@ -93,8 +95,9 @@ class RouterFipPortForwardingMapping(object): del self.managed_port_forwardings[pf_id] del self.fip_port_forwarding[fip_id] + @lockutils.synchronized('port-forwarding-cache') def check_port_forwarding_changes(self, new_pf): - old_pf = self.get_port_forwarding(new_pf.id) + old_pf = self.managed_port_forwardings.get(new_pf.id) return old_pf != new_pf @@ -120,7 +123,6 @@ class PortForwardingAgentExtension(l3_extension.L3AgentExtension): def consume_api(self, agent_api): self.agent_api = agent_api - @lockutils.synchronized('port-forwarding') def _handle_notification(self, context, resource_type, forwardings, event_type): for forwarding in forwardings: @@ -167,6 +169,7 @@ class PortForwardingAgentExtension(l3_extension.L3AgentExtension): iptables_manager.ipv4['nat'].add_chain(chain) iptables_manager.ipv4['nat'].add_rule(chain, rule, tag=rule_tag) + @coordination.synchronized('port-forwarding-{namespace}') def _process_create(self, port_forwardings, ri, interface_name, namespace, iptables_manager): if not port_forwardings: @@ -298,6 +301,7 @@ class PortForwardingAgentExtension(l3_extension.L3AgentExtension): context, [port_forwarding], ri, interface_name, namespace, iptables_manager) + @coordination.synchronized('port-forwarding-{namespace}') def _process_update(self, port_forwardings, iptables_manager, interface_name, namespace): if not port_forwardings: @@ -322,6 +326,7 @@ class PortForwardingAgentExtension(l3_extension.L3AgentExtension): iptables_manager.apply() self._store_local(port_forwardings, events.UPDATED) + @coordination.synchronized('port-forwarding-{namespace}') def _process_delete(self, context, port_forwardings, ri, interface_name, namespace, iptables_manager): if not port_forwardings: @@ -427,7 +432,6 @@ class PortForwardingAgentExtension(l3_extension.L3AgentExtension): self.check_local_port_forwardings( context, ri, ri.fip_managed_by_port_forwardings) - @lockutils.synchronized('port-forwarding') def add_router(self, context, data): """Handle a router add event. @@ -438,7 +442,6 @@ class PortForwardingAgentExtension(l3_extension.L3AgentExtension): """ self.process_port_forwarding(context, data) - @lockutils.synchronized('port-forwarding') def update_router(self, context, data): """Handle a router update event.