Merge "Avoid using non-ASCII characters when generating config files"

This commit is contained in:
Zuul 2018-06-21 18:48:02 +00:00 committed by Gerrit Code Review
commit 0184b7d630
9 changed files with 156 additions and 16 deletions

View File

@ -13,6 +13,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import abc
import base64
import copy
import filecmp
import os
@ -37,6 +38,7 @@ from oslo_log import helpers as log_helpers
from oslo_log import log as logging
import oslo_messaging
from oslo_service import loopingcall
from oslo_utils import encodeutils
from oslo_utils import fileutils
import six
@ -111,6 +113,18 @@ JINJA_ENV = None
IPSEC_CONNS = 'ipsec_site_connections'
# *Swan supports Base64 encoded binary values as PSKs. In such cases,
# a character sequence beginning with 0s is interpreted as Base64
# encoded binary data.
# - StrongSwan
# - https://wiki.strongswan.org/projects/strongswan/wiki/PskSecret
# - LibreSwan
# - https://libreswan.org/man/ipsec.secrets.5.html
# - https://libreswan.org/man/ipsec_ttodata.3.html
# - OpenSwan (no online documents, see manpages sources in the repository)
# - https://github.com/xelerance/Openswan
PSK_BASE64_PREFIX = '0s'
def _get_template(template_file):
global JINJA_ENV
@ -209,9 +223,23 @@ class BaseSwanProcess(object):
(not ipsec_site_conn['local_id'])):
ipsec_site_conn['local_id'] = ipsec_site_conn['external_ip']
def base64_encode_psk(self):
if not self.vpnservice:
return
for ipsec_site_conn in self.vpnservice['ipsec_site_connections']:
psk = ipsec_site_conn['psk']
encoded_psk = base64.b64encode(encodeutils.safe_encode(psk))
# NOTE(huntxu): base64.b64encode returns an instance of 'bytes'
# in Python 3, convert it to a str. For Python 2, after calling
# safe_decode, psk is converted into a unicode not containing any
# non-ASCII characters so it doesn't matter.
psk = encodeutils.safe_decode(encoded_psk, incoming='utf_8')
ipsec_site_conn['psk'] = PSK_BASE64_PREFIX + psk
def update_vpnservice(self, vpnservice):
self.vpnservice = vpnservice
self.translate_dialect()
self.base64_encode_psk()
def _dialect(self, obj, key):
obj[key] = self.DIALECT_MAP.get(obj[key], obj[key])

View File

@ -1,4 +1,4 @@
# Configuration for {{vpnservice.name}}
# Configuration for {{vpnservice.id}}
config setup
nat_traversal=yes
conn %default

View File

@ -1,4 +1,4 @@
# Configuration for {{vpnservice.name}}
# Configuration for {{vpnservice.id}}
{% for ipsec_site_connection in vpnservice.ipsec_site_connections -%}
{{ipsec_site_connection.local_id}} {{ipsec_site_connection.peer_id}} : PSK "{{ipsec_site_connection.psk}}"
{{ipsec_site_connection.local_id}} {{ipsec_site_connection.peer_id}} : PSK {{ipsec_site_connection.psk}}
{% endfor %}

View File

@ -1,4 +1,4 @@
# Configuration for {{vpnservice.name}}
# Configuration for {{vpnservice.id}}
config setup
conn %default

View File

@ -1,3 +1,3 @@
# Configuration for {{vpnservice.name}}{% for ipsec_site_connection in vpnservice.ipsec_site_connections %}
{{ipsec_site_connection.local_id}} {{ipsec_site_connection.peer_id}} : PSK "{{ipsec_site_connection.psk}}"
# Configuration for {{vpnservice.id}}{% for ipsec_site_connection in vpnservice.ipsec_site_connections %}
{{ipsec_site_connection.local_id}} {{ipsec_site_connection.peer_id}} : PSK {{ipsec_site_connection.psk}}
{% endfor %}

View File

@ -166,6 +166,11 @@ FAKE_ROUTER = {
'routes': []
}
# It's a long name.
NON_ASCII_VPNSERVICE_NAME = u'\u9577\u3044\u540d\u524d\u3067\u3059'
# I'm doing very well.
NON_ASCII_PSK = u'\u00e7a va tr\u00e9s bien'
def get_ovs_bridge(br_name):
return ovs_lib.OVSBridge(br_name)

View File

@ -133,3 +133,55 @@ class TestOpenSwanDeviceDriver(test_scenario.TestIPSecBase):
ipsec.OpenSwanProcess.active.stop()
ipsec.OpenSwanProcess._config_changed.stop()
cfg.CONF.set_override('restart_check_config', False, group='pluto')
def test_openswan_connection_with_non_ascii_vpnservice_name(self):
site1 = self.create_site(test_scenario.PUBLIC_NET[4],
[self.private_nets[1]])
site2 = self.create_site(test_scenario.PUBLIC_NET[5],
[self.private_nets[2]])
site1.vpn_service.update(
{'name': test_scenario.NON_ASCII_VPNSERVICE_NAME})
self.check_ping(site1, site2, success=False)
self.check_ping(site2, site1, success=False)
self.prepare_ipsec_site_connections(site1, site2)
self.sync_to_create_ipsec_connections(site1, site2)
self.check_ping(site1, site2)
self.check_ping(site2, site1)
def test_openswan_connection_with_non_ascii_psk(self):
site1 = self.create_site(test_scenario.PUBLIC_NET[4],
[self.private_nets[1]])
site2 = self.create_site(test_scenario.PUBLIC_NET[5],
[self.private_nets[2]])
self.check_ping(site1, site2, success=False)
self.check_ping(site2, site1, success=False)
self.prepare_ipsec_site_connections(site1, site2)
self._update_ipsec_connection(site1, psk=test_scenario.NON_ASCII_PSK)
self._update_ipsec_connection(site2, psk=test_scenario.NON_ASCII_PSK)
self.sync_to_create_ipsec_connections(site1, site2)
self.check_ping(site1, site2)
self.check_ping(site2, site1)
def test_openswan_connection_with_wrong_non_ascii_psk(self):
site1 = self.create_site(test_scenario.PUBLIC_NET[4],
[self.private_nets[1]])
site2 = self.create_site(test_scenario.PUBLIC_NET[5],
[self.private_nets[2]])
self.check_ping(site1, site2, success=False)
self.check_ping(site2, site1, success=False)
self.prepare_ipsec_site_connections(site1, site2)
self._update_ipsec_connection(site1, psk=test_scenario.NON_ASCII_PSK)
self._update_ipsec_connection(site2,
psk=test_scenario.NON_ASCII_PSK[:-1])
self.sync_to_create_ipsec_connections(site1, site2)
self.check_ping(site1, site2, success=False)
self.check_ping(site2, site1, success=False)

View File

@ -252,3 +252,55 @@ class TestStrongSwanScenario(test_scenario.TestIPSecBase):
def test_strongswan_connection_with_sha512(self):
self._test_strongswan_connection_with_auth_algo('sha512')
def test_strongswan_connection_with_non_ascii_vpnservice_name(self):
site1 = self.create_site(test_scenario.PUBLIC_NET[4],
[self.private_nets[1]])
site2 = self.create_site(test_scenario.PUBLIC_NET[5],
[self.private_nets[2]])
site1.vpn_service.update(
{'name': test_scenario.NON_ASCII_VPNSERVICE_NAME})
self.check_ping(site1, site2, success=False)
self.check_ping(site2, site1, success=False)
self.prepare_ipsec_site_connections(site1, site2)
self.sync_to_create_ipsec_connections(site1, site2)
self.check_ping(site1, site2)
self.check_ping(site2, site1)
def test_strongswan_connection_with_non_ascii_psk(self):
site1 = self.create_site(test_scenario.PUBLIC_NET[4],
[self.private_nets[1]])
site2 = self.create_site(test_scenario.PUBLIC_NET[5],
[self.private_nets[2]])
self.check_ping(site1, site2, success=False)
self.check_ping(site2, site1, success=False)
self.prepare_ipsec_site_connections(site1, site2)
self._update_ipsec_connection(site1, psk=test_scenario.NON_ASCII_PSK)
self._update_ipsec_connection(site2, psk=test_scenario.NON_ASCII_PSK)
self.sync_to_create_ipsec_connections(site1, site2)
self.check_ping(site1, site2)
self.check_ping(site2, site1)
def test_strongswan_connection_with_wrong_non_ascii_psk(self):
site1 = self.create_site(test_scenario.PUBLIC_NET[4],
[self.private_nets[1]])
site2 = self.create_site(test_scenario.PUBLIC_NET[5],
[self.private_nets[2]])
self.check_ping(site1, site2, success=False)
self.check_ping(site2, site1, success=False)
self.prepare_ipsec_site_connections(site1, site2)
self._update_ipsec_connection(site1, psk=test_scenario.NON_ASCII_PSK)
self._update_ipsec_connection(site2,
psk=test_scenario.NON_ASCII_PSK[:-1])
self.sync_to_create_ipsec_connections(site1, site2)
self.check_ping(site1, site2, success=False)
self.check_ping(site2, site1, success=False)

View File

@ -39,6 +39,7 @@ FAKE_HOST = 'fake_host'
FAKE_ROUTER_ID = _uuid()
FAKE_IPSEC_SITE_CONNECTION1_ID = _uuid()
FAKE_IPSEC_SITE_CONNECTION2_ID = _uuid()
FAKE_VPNSERVICE_ID = _uuid()
FAKE_IKE_POLICY = {
'ike_version': 'v1',
'encryption_algorithm': 'aes-128',
@ -57,7 +58,7 @@ FAKE_IPSEC_POLICY = {
}
FAKE_VPN_SERVICE = {
'id': _uuid(),
'id': FAKE_VPNSERVICE_ID,
'router_id': FAKE_ROUTER_ID,
'name': 'myvpn',
'admin_state_up': True,
@ -158,7 +159,7 @@ IPV6_NEXT_HOP = '''# To recognize the given IP addresses in this config
# rightnexthop is not mandatory for ipsec, so no need in ipv6.'''
EXPECTED_OPENSWAN_CONF = """
# Configuration for myvpn
# Configuration for %(vpnservice_id)s
config setup
nat_traversal=yes
conn %%default
@ -216,12 +217,12 @@ STRONGSWAN_AUTH_ESP = 'esp=aes128-sha1-modp1536'
STRONGSWAN_AUTH_AH = 'ah=sha1-modp1536'
EXPECTED_IPSEC_OPENSWAN_SECRET_CONF = '''
# Configuration for myvpn
60.0.0.4 60.0.0.5 : PSK "password"
60.0.0.4 60.0.0.6 : PSK "password"'''
# Configuration for %s
60.0.0.4 60.0.0.5 : PSK 0scGFzc3dvcmQ=
60.0.0.4 60.0.0.6 : PSK 0scGFzc3dvcmQ=''' % FAKE_VPNSERVICE_ID
EXPECTED_IPSEC_STRONGSWAN_CONF = '''
# Configuration for myvpn
# Configuration for %(vpnservice_id)s
config setup
conn %%default
@ -282,11 +283,11 @@ include strongswan.d/*.conf
'''
EXPECTED_IPSEC_STRONGSWAN_SECRET_CONF = '''
# Configuration for myvpn
60.0.0.4 60.0.0.5 : PSK "password"
# Configuration for %s
60.0.0.4 60.0.0.5 : PSK 0scGFzc3dvcmQ=
60.0.0.4 60.0.0.6 : PSK "password"
'''
60.0.0.4 60.0.0.6 : PSK 0scGFzc3dvcmQ=
''' % FAKE_VPNSERVICE_ID
PLUTO_ACTIVE_STATUS = """000 "%(conn_id)s/0x1": erouted;\n
000 #4: "%(conn_id)s/0x1":500 STATE_QUICK_R2 (IPsec SA established);""" % {
@ -976,6 +977,7 @@ class TestOpenSwanConfigGeneration(BaseIPsecDeviceDriver):
next_hop = IPV4_NEXT_HOP if version == 4 else IPV6_NEXT_HOP % local_ip
peer_ips = info.get('peers', ['60.0.0.5', '60.0.0.6'])
return EXPECTED_OPENSWAN_CONF % {
'vpnservice_id': FAKE_VPNSERVICE_ID,
'next_hop': next_hop,
'local_cidrs1': local_cidrs[0], 'local_cidrs2': local_cidrs[1],
'local_ver': version,
@ -1056,6 +1058,7 @@ class IPsecStrongswanConfigGeneration(BaseIPsecDeviceDriver):
peer_ips = info.get('peers', ['60.0.0.5', '60.0.0.6'])
auth_mode = info.get('ipsec_auth', STRONGSWAN_AUTH_ESP)
return EXPECTED_IPSEC_STRONGSWAN_CONF % {
'vpnservice_id': FAKE_VPNSERVICE_ID,
'local_cidrs1': local_cidrs[0], 'local_cidrs2': local_cidrs[1],
'peer_cidrs1': peer_cidrs[0], 'peer_cidrs2': peer_cidrs[1],
'left': local_ip,