[server side] Fix race issue for port forwarding plugin

This patch fixes the race condition with update/delete neutron
serveral resources, such as port forwarding conflict with
floatingip and port forwarding conflict with port.

Also this approach need the revision function, so need to fix in port
forwarding model to aware relationship revision update.

As the port forwarding resource associated with 2 resources,
one is floatingip, the other is neutron internal port.
So floatingip update/delete maybe in a conflict situation with
port forwarding creation. But for port, we just lack the logic to
process port forwarding during update port's fixed_ip and delete
port.

So the approach here is adding logic to let l3 plugin and port
forwarding plugin know each other when both sides may process the same
floatingip resource. Based on the existing revision_number feature,
if one side fail as db staleError, the api layer will retry the whole
operation for this resource, so there must be a failure on one side in
this case. This patch just adds the association logic for l3 plugin and
port forwarding plugin, also adds a event receiver for port update/delete.

Then the behavior about the port forwarding associated resources would
be:
* For fip resource, I introduce one function in that patch.
  _check_floatingip_request
So during floatingip update/delete, the function will process
fip and check by rpc callback from l3_plugin, if port forwarding plugin
also creates a port forwarding with the same fip at this moment. The
success side would be the one who update the fip_db first, the other side
would be failure after db retry.

* For port resource, during update port fixed_ip or delete port, we will
delete the associated port forwarding resources for free the
fip:external_port socket.

Partially-Implements: blueprint port-forwarding
Change-Id: I637ebcb33b91d899a077bded5ca10097a830a847
Partial-Bug: #1491317
This commit is contained in:
ZhaoBo 2018-07-03 22:12:47 +08:00
parent 18e941a402
commit 4088461ed6
5 changed files with 290 additions and 28 deletions

View File

@ -56,3 +56,4 @@ class PortForwarding(model_base.BASEV2, model_base.HasId):
lazy='subquery', uselist=True,
cascade='delete')
)
revises_on_change = ('floating_ip', 'port',)

View File

@ -23,3 +23,7 @@ class PortForwardingNotFound(n_exc.NotFound):
class PortForwardingNotSupportFilterField(n_exc.BadRequest):
message = _("Port Forwarding filter %(filter)s is not supported.")
class FipInUseByPortForwarding(n_exc.InUse):
message = _("Floating IP %(id)s in use by Port Forwarding resources.")

View File

@ -19,7 +19,9 @@ import functools
import netaddr
from neutron_lib.api.definitions import floating_ip_port_forwarding as apidef
from neutron_lib.api.definitions import l3
from neutron_lib.callbacks import events
from neutron_lib.callbacks import registry
from neutron_lib.callbacks import resources
from neutron_lib import constants as lib_consts
from neutron_lib.db import utils as db_utils
from neutron_lib import exceptions as lib_exc
@ -27,6 +29,7 @@ from neutron_lib.exceptions import l3 as lib_l3_exc
from neutron_lib.objects import exceptions as obj_exc
from neutron_lib.plugins import constants
from neutron_lib.plugins import directory
from oslo_log import log as logging
from neutron._i18n import _
from neutron.api.rpc.callbacks import events as rpc_events
@ -41,6 +44,8 @@ from neutron.objects import port_forwarding as pf
from neutron.objects import router
from neutron.services.portforwarding.common import exceptions as pf_exc
LOG = logging.getLogger(__name__)
def make_result_with_fields(f):
@functools.wraps(f)
@ -97,6 +102,116 @@ class PortForwardingPlugin(fip_pf.PortForwardingPluginBase):
result_dict[apidef.COLLECTION_NAME] = port_forwarding_result
return result_dict
@registry.receives(resources.FLOATING_IP, [events.PRECOMMIT_UPDATE,
events.PRECOMMIT_DELETE])
def _check_floatingip_request(self, resource, event, trigger, context,
**kwargs):
# We only support the "free" floatingip to be associated with
# port forwarding resources. And in the PUT request of floatingip,
# the request body must contain a "port_id" field which is not
# allowed in port forwarding functionality.
floatingip_id = None
if event == events.PRECOMMIT_UPDATE:
fip_db = kwargs.get('floatingip_db')
floatingip_id = fip_db.id
# Here the key-value must contain a floatingip param, and the value
# must a dict with key 'floatingip'.
if not kwargs['floatingip']['floatingip'].get('port_id'):
# Only care about the associate floatingip cases.
# The port_id field is a must-option. But if a floatingip
# disassociate a internal port, the port_id should be null.
LOG.debug('Skip check for floatingip %s, as the update '
'request does not contain port_id.', floatingip_id)
return
elif event == events.PRECOMMIT_DELETE:
floatingip_id = kwargs.get('port').get('device_id')
if not floatingip_id:
return
exist_pf_resources = pf.PortForwarding.get_objects(
context, floatingip_id=floatingip_id)
if exist_pf_resources:
raise pf_exc.FipInUseByPortForwarding(id=floatingip_id)
@registry.receives(resources.PORT, [events.AFTER_UPDATE,
events.PRECOMMIT_DELETE])
@db_api.retry_if_session_inactive()
def _process_port_request(self, resource, event, trigger, context,
**kwargs):
# Deleting floatingip will receive port resource with precommit_delete
# event, so just return, then check the request in
# _check_floatingip_request callback.
if kwargs['port']['device_owner'].startswith(
lib_consts.DEVICE_OWNER_FLOATINGIP):
return
# This block is used for checking if there are some fixed ips updates.
# Whatever the event is AFTER_UPDATE/PRECOMMIT_DELETE,
# we will use the update_ip_set for checking if the possible associated
# port forwarding resources need to be deleted for port's AFTER_UPDATE
# event. Or get all affected ip addresses for port's PRECOMMIT_DELETE
# event.
port_id = kwargs['port']['id']
update_fixed_ips = kwargs['port']['fixed_ips']
update_ip_set = set()
for update_fixed_ip in update_fixed_ips:
if (netaddr.IPNetwork(update_fixed_ip.get('ip_address')).version ==
lib_consts.IP_VERSION_4):
update_ip_set.add(update_fixed_ip.get('ip_address'))
if not update_ip_set:
return
# If the port owner wants to update or delete port, we must elevate the
# context to check if the floatingip or port forwarding resources
# are owned by other tenants.
if not context.is_admin:
context = context.elevated()
# If the logic arrives here, that means we have got update_ip_set and
# its value is not None. So we need to get all port forwarding
# resources based on the request port_id for preparing the next
# process, such as deleting them.
pf_resources = pf.PortForwarding.get_objects(
context, internal_port_id=port_id)
if not pf_resources:
return
# If the logic arrives here, that means we have got pf_resources and
# its value is not None either. Then we collect all ip addresses
# which are used by port forwarding resources to generate used_ip_set,
# and we default to set remove_ip_set as used_ip_set which means we
# want to delete all port forwarding resources when event is
# PRECOMMIT_DELETE. And when event is AFTER_UPDATE, we get the
# different part.
used_ip_set = set()
for pf_resource in pf_resources:
used_ip_set.add(str(pf_resource.internal_ip_address))
remove_ip_set = used_ip_set
if event == events.AFTER_UPDATE:
remove_ip_set = used_ip_set - update_ip_set
if not remove_ip_set:
return
# Here, we get the remove_ip_set, the following block will delete the
# port forwarding resources based on remove_ip_set. Just need to note
# here, if event is AFTER_UPDATE, and remove_ip_set is empty, the
# following block won't be processed.
remove_port_forwarding_list = []
with db_api.context_manager.writer.using(context):
for pf_resource in pf_resources:
if str(pf_resource.internal_ip_address) in remove_ip_set:
pf_objs = pf.PortForwarding.get_objects(
context, floatingip_id=pf_resource.floatingip_id)
if len(pf_objs) == 1 and pf_objs[0].id == pf_resource.id:
fip_obj = router.FloatingIP.get_object(
context, id=pf_resource.floatingip_id)
fip_obj.update_fields({'router_id': None})
fip_obj.update()
pf_resource.delete()
remove_port_forwarding_list.append(pf_resource)
self.push_api.push(context, remove_port_forwarding_list,
rpc_events.DELETED)
def _get_internal_ip_subnet(self, request_ip, fixed_ips):
request_ip = netaddr.IPNetwork(request_ip)
for fixed_ip in fixed_ips:
@ -166,34 +281,45 @@ class PortForwardingPlugin(fip_pf.PortForwardingPluginBase):
port_forwarding):
port_forwarding = port_forwarding.get(apidef.RESOURCE_NAME)
port_forwarding['floatingip_id'] = floatingip_id
pf_obj = pf.PortForwarding(context, **port_forwarding)
try:
with db_api.context_manager.writer.using(context):
fip_obj = self._get_fip_obj(context, floatingip_id)
with db_api.context_manager.writer.using(context):
fip_obj = self._get_fip_obj(context, floatingip_id)
if fip_obj.fixed_port_id:
raise lib_l3_exc.FloatingIPPortAlreadyAssociated(
port_id=port_forwarding['internal_port_id'],
fip_id=fip_obj.id,
floating_ip_address=fip_obj.floating_ip_address,
fixed_ip=str(port_forwarding['internal_ip_address']),
net_id=fip_obj.floating_network_id)
router_id = self._find_a_router_for_fip_port_forwarding(
context, port_forwarding, fip_obj)
pf_obj = pf.PortForwarding(context, **port_forwarding)
router_id = self._find_a_router_for_fip_port_forwarding(
context, port_forwarding, fip_obj)
# If this func does not raise an exception, means the
# router_id matched.
# case1: fip_obj.router_id = None
# case2: fip_obj.router_id is the same with we selected.
self._check_router_match(context, fip_obj,
router_id, port_forwarding)
if not fip_obj.router_id:
fip_obj.router_id = router_id
fip_obj.update()
# If this func does not raise an exception, means the
# router_id matched.
# case1: fip_obj.router_id = None
# case2: fip_obj.router_id is the same with we selected.
self._check_router_match(context, fip_obj,
router_id, port_forwarding)
if not fip_obj.router_id:
values = {'router_id': router_id, 'fixed_port_id': None}
router.FloatingIP.update_objects(
context, values, id=floatingip_id)
try:
pf_obj.create()
except obj_exc.NeutronDbObjectDuplicateEntry:
(__, conflict_params) = self._find_existing_port_forwarding(
context, floatingip_id, port_forwarding)
message = _("A duplicate port forwarding entry with same "
"attributes already exists, conflicting values "
"are %s") % conflict_params
raise lib_exc.BadRequest(resource=apidef.RESOURCE_NAME,
msg=message)
self.push_api.push(context, [pf_obj], rpc_events.CREATED)
return pf_obj
except obj_exc.NeutronDbObjectDuplicateEntry:
(__,
conflict_params) = self._find_existing_port_forwarding(
context, floatingip_id, port_forwarding)
message = _("A duplicate port forwarding entry with same "
"attributes already exists, conflicting "
"values are %s") % conflict_params
raise lib_exc.BadRequest(resource=apidef.RESOURCE_NAME,
msg=message)
self.push_api.push(context, [pf_obj], rpc_events.CREATED)
return pf_obj
@db_base_plugin_common.convert_result_to_dict
def update_floatingip_port_forwarding(self, context, id, floatingip_id,

View File

@ -10,10 +10,16 @@
# License for the specific language governing permissions and limitations
# under the License.
import threading
import mock
import netaddr
from neutron_lib.api.definitions import floating_ip_port_forwarding as apidef
from neutron_lib.callbacks import exceptions as c_exc
from neutron_lib import exceptions as lib_exc
from neutron_lib.exceptions import l3 as lib_l3_exc
from oslo_utils import uuidutils
from six.moves import queue
from neutron.services.portforwarding.common import exceptions as pf_exc
from neutron.services.portforwarding import pf_plugin
@ -40,6 +46,20 @@ class PortForwardingTestCaseBase(ml2_test_base.ML2TestFramework):
def _get_floatingip(self, floatingip_id):
return self.l3_plugin.get_floatingip(self.context, floatingip_id)
def _update_floatingip(self, fip_id, update_info):
return self.l3_plugin.update_floatingip(
self.context, fip_id, {"floatingip": update_info})
def _delete_floatingip(self, fip_id):
return self.l3_plugin.delete_floatingip(self.context, fip_id)
def _update_port(self, port_id, update_info):
return self.core_plugin.update_port(
self.context, port_id, {'port': update_info})
def _delete_port(self, port_id):
return self.core_plugin.delete_port(self.context, port_id)
def _add_router_interface(self, router_id, subnet_id):
interface_info = {"subnet_id": subnet_id}
self.l3_plugin.add_router_interface(
@ -260,3 +280,108 @@ class PortForwardingTestCase(PortForwardingTestCaseBase):
self.assertRaises(pf_exc.PortForwardingNotFound,
self.pf_plugin.delete_floatingip_port_forwarding,
self.context, res['id'], uuidutils.generate_uuid())
def _simulate_concurrent_requests_process_and_raise(
self, funcs, args_list):
class SimpleThread(threading.Thread):
def __init__(self, q):
super(SimpleThread, self).__init__()
self.q = q
self.exception = None
def run(self):
try:
while not self.q.empty():
item = None
try:
item = self.q.get(False)
func, func_args = item[0], item[1]
func(*func_args)
except queue.Empty:
pass
finally:
if item:
self.q.task_done()
except Exception as e:
self.exception = e
def get_exception(self):
return self.exception
q = queue.Queue()
for func, func_args in zip(funcs, args_list):
q.put_nowait((func, func_args))
threads = []
for _ in range(len(funcs)):
t = SimpleThread(q)
threads.append(t)
t.start()
q.join()
for t in threads:
e = t.get_exception()
if e:
raise e
def test_concurrent_create_port_forwarding_delete_fip(self):
func1 = self.pf_plugin.create_floatingip_port_forwarding
func2 = self._delete_floatingip
funcs = [func1, func2]
args_list = [(self.context, self.fip['id'], self.port_forwarding),
(self.fip['id'],)]
self.assertRaises(c_exc.CallbackFailure,
self._simulate_concurrent_requests_process_and_raise,
funcs, args_list)
port_forwardings = self.pf_plugin.get_floatingip_port_forwardings(
self.context, floatingip_id=self.fip['id'], fields=['id'])
self.pf_plugin.delete_floatingip_port_forwarding(
self.context, port_forwardings[0][apidef.ID],
floatingip_id=self.fip['id'])
funcs.reverse()
args_list.reverse()
self.assertRaises(lib_l3_exc.FloatingIPNotFound,
self._simulate_concurrent_requests_process_and_raise,
funcs, args_list)
def test_concurrent_create_port_forwarding_update_fip(self):
newport = self._create_port(self.fmt, self.net['id']).json['port']
func1 = self.pf_plugin.create_floatingip_port_forwarding
func2 = self._update_floatingip
funcs = [func1, func2]
args_list = [(self.context, self.fip['id'], self.port_forwarding),
(self.fip['id'], {'port_id': newport['id']})]
self.assertRaises(c_exc.CallbackFailure,
self._simulate_concurrent_requests_process_and_raise,
funcs, args_list)
funcs.reverse()
args_list.reverse()
self.assertRaises(c_exc.CallbackFailure,
self._simulate_concurrent_requests_process_and_raise,
funcs, args_list)
def test_concurrent_create_port_forwarding_update_port(self):
new_ip = str(
netaddr.IPAddress(self.port['fixed_ips'][0]['ip_address']) + 2)
funcs = [self.pf_plugin.create_floatingip_port_forwarding,
self._update_port]
args_list = [(self.context, self.fip['id'], self.port_forwarding),
(self.port['id'], {
'fixed_ips': [{'subnet_id': self.subnet['id'],
'ip_address': new_ip}]})]
self._simulate_concurrent_requests_process_and_raise(funcs, args_list)
self.assertEqual([], self.pf_plugin.get_floatingip_port_forwardings(
self.context, floatingip_id=self.fip['id']))
def test_concurrent_create_port_forwarding_delete_port(self):
funcs = [self.pf_plugin.create_floatingip_port_forwarding,
self._delete_port]
args_list = [(self.context, self.fip['id'], self.port_forwarding),
(self.port['id'],)]
self._simulate_concurrent_requests_process_and_raise(funcs, args_list)
self.assertEqual([], self.pf_plugin.get_floatingip_port_forwardings(
self.context, floatingip_id=self.fip['id']))

View File

@ -31,6 +31,7 @@ from neutron.api.rpc.handlers import resources_rpc
from neutron.db import db_base_plugin_v2
from neutron.db import l3_db
from neutron import manager
from neutron.objects import base as obj_base
from neutron.objects import port_forwarding
from neutron.objects import router
from neutron.services.portforwarding.common import exceptions as pf_exc
@ -188,6 +189,7 @@ class TestPortForwardingPlugin(testlib_api.SqlTestCase):
self.pf_plugin.update_floatingip_port_forwarding,
self.ctxt, 'pf_id', **pf_input)
@mock.patch.object(obj_base.NeutronDbObject, 'update_objects')
@mock.patch.object(resources_rpc.ResourcesPushRpcApi, 'push')
@mock.patch.object(pf_plugin.PortForwardingPlugin, '_check_router_match')
@mock.patch.object(pf_plugin.PortForwardingPlugin,
@ -197,7 +199,7 @@ class TestPortForwardingPlugin(testlib_api.SqlTestCase):
@mock.patch('neutron.objects.port_forwarding.PortForwarding')
def test_create_floatingip_port_forwarding(
self, mock_port_forwarding, mock_fip_get_object, mock_find_router,
mock_check_router_match, mock_push_api):
mock_check_router_match, mock_push_api, mock_update_objects):
# Update fip
pf_input = {
'port_forwarding':
@ -210,11 +212,12 @@ class TestPortForwardingPlugin(testlib_api.SqlTestCase):
mock_port_forwarding.return_value = pf_obj
mock_fip_get_object.return_value = fip_obj
fip_obj.router_id = ''
fip_obj.fixed_port_id = ''
self.pf_plugin.create_floatingip_port_forwarding(
self.ctxt, **pf_input)
mock_port_forwarding.assert_called_once_with(
self.ctxt, **pf_input['port_forwarding']['port_forwarding'])
self.assertTrue(fip_obj.update.called)
self.assertTrue(mock_update_objects.called)
self.assertTrue(pf_obj.create.called)
mock_push_api.assert_called_once_with(
self.ctxt, mock.ANY, rpc_events.CREATED)
@ -223,15 +226,17 @@ class TestPortForwardingPlugin(testlib_api.SqlTestCase):
pf_obj.reset_mock()
fip_obj.reset_mock()
mock_port_forwarding.reset_mock()
mock_update_objects.reset_mock()
mock_push_api.reset_mock()
mock_port_forwarding.return_value = pf_obj
fip_obj.router_id = 'router_id'
fip_obj.fixed_port_id = ''
self.pf_plugin.create_floatingip_port_forwarding(
self.ctxt, **pf_input)
mock_port_forwarding.assert_called_once_with(
self.ctxt, **pf_input['port_forwarding']['port_forwarding'])
self.assertTrue(pf_obj.create.called)
self.assertFalse(fip_obj.update.called)
self.assertFalse(mock_update_objects.called)
mock_push_api.assert_called_once_with(
self.ctxt, mock.ANY, rpc_events.CREATED)
@ -256,6 +261,7 @@ class TestPortForwardingPlugin(testlib_api.SqlTestCase):
fip_obj = mock.Mock()
mock_port_forwarding.return_value = pf_obj
mock_fip_get_object.return_value = fip_obj
fip_obj.fixed_port_id = ''
pf_obj.create.side_effect = obj_exc.NeutronDbObjectDuplicateEntry(
mock.Mock(), mock.Mock())