Validate ip versions

This ensures that options taking ip versions accept only valid version
numbers (4 or 6).

Change-Id: I284360f8c9aee5e9fc4ebddd8f9a33f01ec06571
This commit is contained in:
Takashi Kajinami 2023-11-27 00:43:40 +09:00
parent d8a2f212f7
commit ed07a430f5
5 changed files with 25 additions and 14 deletions

View File

@ -71,8 +71,8 @@ listen listener
def parse_ip_versions(ip_versions):
if not set(ip_versions).issubset({str(constants.IP_VERSION_4),
str(constants.IP_VERSION_6)}):
if not set(ip_versions).issubset({constants.IP_VERSION_4,
constants.IP_VERSION_6}):
LOG.warning('Invalid metadata address IP versions: %s. Metadata rate '
'limiting will not be enabled.', ip_versions)
return
@ -90,7 +90,7 @@ def get_haproxy_config(cfg_info, rate_limiting_config, header_config_template,
ip_version = parse_ip_versions(rate_limiting_config.ip_versions)
if rate_limiting_config.rate_limit_enabled and ip_version:
cfg_info['ip_version'] = (
'ipv6' if ip_version == '6' else 'ip')
'ipv6' if ip_version == 6 else 'ip')
cfg_info['base_window_duration'] = (
rate_limiting_config['base_window_duration'])
cfg_info['base_query_rate_limit'] = (

View File

@ -12,7 +12,9 @@
# License for the specific language governing permissions and limitations
# under the License.
from neutron_lib import constants
from oslo_config import cfg
from oslo_config import types
from neutron._i18n import _
@ -109,7 +111,11 @@ METADATA_RATE_LIMITING_OPTS = [
default=False,
help=_('Enable rate limiting on the metadata API.')),
cfg.ListOpt('ip_versions',
default=['4'],
item_type=types.Integer(choices=[
(constants.IP_VERSION_4, 'IPv4'),
(constants.IP_VERSION_6, 'IPv6')
]),
default=[constants.IP_VERSION_4],
help=_('Comma separated list of the metadata address IP '
'versions (4, 6) for which rate limiting will be '
'enabled. The default is to rate limit only for the '

View File

@ -13,7 +13,9 @@
# License for the specific language governing permissions and limitations
# under the License.
from neutron_lib import constants
from oslo_config import cfg
from oslo_config import types
from neutron._i18n import _
from neutron.common import _constants as common_const
@ -64,10 +66,13 @@ ml2_opts = [
"values for external_network_type config option depend "
"on the network type values configured in type_drivers "
"config option.")),
cfg.IntOpt('overlay_ip_version',
default=4,
help=_("IP version of all overlay (tunnel) network endpoints. "
"Use a value of 4 for IPv4 or 6 for IPv6.")),
cfg.Opt('overlay_ip_version',
default=constants.IP_VERSION_4,
type=types.Integer(choices=[
(constants.IP_VERSION_4, 'IPv4'),
(constants.IP_VERSION_6, 'IPv6')
]),
help=_("IP version of all overlay (tunnel) network endpoints.")),
cfg.StrOpt('tunnelled_network_rp_name',
default=common_const.RP_TUNNELLED,
help=_("Resource provider name for the host with tunnelled "

View File

@ -156,7 +156,7 @@ class MetadataL3AgentTestCase(framework.L3AgentTestFramework):
self.conf.set_override('rate_limit_enabled', True,
'metadata_rate_limiting')
if ipv6:
self.conf.set_override('ip_versions', ['6'],
self.conf.set_override('ip_versions', [6],
'metadata_rate_limiting')
machine, qr_lla = self._create_resources()
interface = self._setup_for_ipv6(machine, qr_lla) if ipv6 else None
@ -236,7 +236,7 @@ class MetadataL3AgentTestCase(framework.L3AgentTestFramework):
def test_metadata_proxy_rate_limiting_invalid_ip_versions(self):
self.conf.set_override('base_query_rate_limit', 2,
'metadata_rate_limiting')
self.conf.set_override('ip_versions', ['4', '6'],
self.conf.set_override('ip_versions', [4, 6],
'metadata_rate_limiting')
machine, _ = self._set_up_for_rate_limiting_test()
# Since we are passing an invalid ip_versions configuration, rate

View File

@ -279,10 +279,10 @@ class TestMetadataDriverProcess(base.BaseTestCase):
return self._test_spawn_metadata_proxy(rate_limited=True)
def test_metadata_proxy_conf_parse_ip_versions(self):
self.assertEqual('4', comm_meta.parse_ip_versions(['4']))
self.assertEqual('6', comm_meta.parse_ip_versions(['6']))
self.assertIsNone(comm_meta.parse_ip_versions(['4', '6']))
self.assertIsNone(comm_meta.parse_ip_versions(['5', '6']))
self.assertEqual(4, comm_meta.parse_ip_versions([4]))
self.assertEqual(6, comm_meta.parse_ip_versions([6]))
self.assertIsNone(comm_meta.parse_ip_versions([4, 6]))
self.assertIsNone(comm_meta.parse_ip_versions([5, 6]))
def test_spawn_metadata_proxy_dad_failed(self):
self._test_spawn_metadata_proxy(dad_failed=True)