Add a request validator for SNAT logging

A log request for SNAT resource should specify resource_type='snat'
and resource_id=router_id. So this patch adds a validation method for
the incoming log request.

Co-Authored-By: Kim Bao Long <longkb@vn.fujitsu.com>
Change-Id: I4fff5a9c8877d192aab780eed9be90452711fb3d
Partial-Bug: #1752290
This commit is contained in:
Nguyen Phuong An 2018-04-27 11:03:49 +07:00
parent 591cbff803
commit 50c75e9e51
4 changed files with 199 additions and 0 deletions

View File

@ -21,6 +21,8 @@ LOGGING_PLUGIN = 'logging-plugin'
# supported logging types
SECURITY_GROUP = 'security_group'
# TODO(annp): Moving to neutron-lib
SNAT = 'snat'
# target resource types
PORT = 'port'

View File

@ -57,3 +57,20 @@ class ValidatedMethodNotFound(n_exc.NeutronException):
"""A validated method not found Exception"""
message = _('Validated method for %(resource_type)s log '
'could not be found.')
class ResourceIdNotSpecified(n_exc.InvalidInput):
message = _('resource_id should be specified for %(resource_type)s.')
class RouterNotEnabledSnat(n_exc.NeutronException):
message = _('SNAT is not enabled for router %(resource_id)s.')
class EventsDisabled(n_exc.InvalidInput):
message = _('List of events %(events)s were disabled for'
'%(resource_type)s.')
class RouterGatewayNotSet(n_exc.NeutronException):
message = _('Router gateway is not set for router %(resource_id)s.')

View File

@ -0,0 +1,60 @@
# Copyright (c) 2018 Fujitsu Limited
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import log as logging
from neutron.objects import router
from neutron.services.logapi.common import constants as log_const
from neutron.services.logapi.common import exceptions as log_exc
from neutron.services.logapi.common import validators
LOG = logging.getLogger(__name__)
EVENTS_DISABLE = [log_const.DROP_EVENT, log_const.ACCEPT_EVENT]
def _get_router(context, router_id):
router_obj = router.Router.get_object(context, id=router_id)
if not router_obj:
raise log_exc.ResourceNotFound(resource_id=router_id)
return router_obj
@validators.ResourceValidateRequest.register(log_const.SNAT)
def validate_snat_request(context, log_data):
"""Validate the incoming SNAT log request
This method validates whether SNAT log request is satisfied or not.
A ResourceNotFound will be raised if resource_id in log_data does not
belong to any Router object. This method will also raise a
RouterNotEnabledSnat exception in the case of a indicated router does not
enable SNAT feature.
"""
resource_id = log_data.get('resource_id')
event = log_data.get('event')
if not resource_id:
raise log_exc.ResourceIdNotSpecified(resource_type=log_const.SNAT)
if event in EVENTS_DISABLE:
raise log_exc.EventsDisabled(events=EVENTS_DISABLE,
resource_type=log_const.SNAT)
router_obj = _get_router(context, resource_id)
# Check whether SNAT is enabled or not
if not router_obj.enable_snat:
raise log_exc.RouterNotEnabledSnat(resource_id=resource_id)
# Check whether router gateway is set or not.
if not router_obj.gw_port_id:
raise log_exc.RouterGatewayNotSet(resource_id=resource_id)

View File

@ -0,0 +1,120 @@
# Copyright (c) 2018 Fujitsu Limited
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from neutron_lib.plugins import directory
from oslo_utils import importutils
from neutron.objects import router as router_obj
from neutron.services.logapi.common import exceptions as log_exc
from neutron.services.logapi.common import validators
from neutron.tests import base
class FakePlugin(object):
def __init__(self):
self.validator_mgr = validators.ResourceValidateRequest.get_instance()
self.supported_logging_types = ['snat']
class TestSnatLogRequestValidations(base.BaseTestCase):
"""Test validation for SNAT log request"""
def setUp(self):
self.log_plugin = FakePlugin()
importutils.import_module('neutron.services.logapi.common.'
'snat_validate')
super(TestSnatLogRequestValidations, self).setUp()
def test_validate_request_resource_id_not_specific(self):
log_data = {'resource_type': 'snat'}
with mock.patch.object(directory, 'get_plugin',
return_value=self.log_plugin):
with mock.patch.object(router_obj.Router, 'get_object',
return_value=mock.ANY):
self.assertRaises(
log_exc.ResourceIdNotSpecified,
self.log_plugin.validator_mgr.validate_request,
mock.ANY,
log_data)
def test_validate_request_resource_id_not_exists(self):
log_data = {'resource_type': 'snat',
'resource_id': 'fake_router_id'}
with mock.patch.object(directory, 'get_plugin',
return_value=self.log_plugin):
with mock.patch.object(router_obj.Router, 'get_object',
return_value=None):
self.assertRaises(
log_exc.ResourceNotFound,
self.log_plugin.validator_mgr.validate_request,
mock.ANY,
log_data)
def test_validate_request_with_disable_events(self):
log_data_1 = {'resource_type': 'snat',
'resource_id': 'fake_router_id_1',
'event': 'ACCEPT'}
log_data_2 = {'resource_type': 'snat',
'resource_id': 'fake_router_id_2',
'event': 'DROP'}
with mock.patch.object(directory, 'get_plugin',
return_value=self.log_plugin):
with mock.patch.object(router_obj.Router, 'get_object',
return_value=mock.ANY):
self.assertRaises(
log_exc.EventsDisabled,
self.log_plugin.validator_mgr.validate_request,
mock.ANY,
log_data_1)
self.assertRaises(
log_exc.EventsDisabled,
self.log_plugin.validator_mgr.validate_request,
mock.ANY,
log_data_2)
def test_validate_request_with_snat_disable(self):
log_data = {'resource_type': 'snat',
'resource_id': 'fake_router_id'}
f_router = mock.Mock()
f_router.enable_snat = False
with mock.patch.object(directory, 'get_plugin',
return_value=self.log_plugin):
with mock.patch.object(router_obj.Router, 'get_object',
return_value=f_router):
self.assertRaises(
log_exc.RouterNotEnabledSnat,
self.log_plugin.validator_mgr.validate_request,
mock.ANY,
log_data)
def test_validate_request_with_not_set_gw_port(self):
log_data = {'resource_type': 'snat',
'resource_id': 'fake_router_id'}
f_router = mock.Mock()
f_router.enable_snat = True
f_router.gw_port_id = None
with mock.patch.object(directory, 'get_plugin',
return_value=self.log_plugin):
with mock.patch.object(router_obj.Router, 'get_object',
return_value=f_router):
self.assertRaises(
log_exc.RouterGatewayNotSet,
self.log_plugin.validator_mgr.validate_request,
mock.ANY,
log_data)