Merge "Add Network Policy support to services"

This commit is contained in:
Zuul 2019-01-08 19:17:06 +00:00 committed by Gerrit Code Review
commit 7480cc36f8
9 changed files with 215 additions and 118 deletions

View File

@ -31,6 +31,7 @@ from kuryr_kubernetes import constants as const
from kuryr_kubernetes.controller.drivers import base
from kuryr_kubernetes import exceptions as k_exc
from kuryr_kubernetes.objects import lbaas as obj_lbaas
from kuryr_kubernetes import utils
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
@ -199,6 +200,53 @@ class LBaaSv2Driver(base.LBaaSDriver):
LOG.exception('Failed when creating security group rule '
'for listener %s.', listener.name)
def _apply_members_security_groups(self, loadbalancer, port, target_port,
protocol, sg_rule_name):
neutron = clients.get_neutron_client()
if CONF.octavia_defaults.sg_mode == 'create':
sg_id = self._find_listeners_sg(loadbalancer)
else:
sg_id = self._get_vip_port(loadbalancer).get('security_groups')[0]
# Check if Network Policy allows listener on the pods
for sg in loadbalancer.security_groups:
if sg != sg_id:
rules = neutron.list_security_group_rules(
security_group_id=sg)
for rule in rules['security_group_rules']:
# copying ingress rules with same protocol onto the
# loadbalancer sg rules
# NOTE(ltomasbo): NP security groups only have
# remote_ip_prefix, not remote_group_id, therefore only
# applying the ones with remote_ip_prefix
if (rule['protocol'] == protocol.lower() and
rule['direction'] == 'ingress' and
rule['remote_ip_prefix']):
# If listener port not in allowed range, skip
min_port = rule.get('port_range_min')
max_port = rule.get('port_range_max')
if (min_port and target_port not in range(min_port,
max_port+1)):
continue
try:
neutron.create_security_group_rule({
'security_group_rule': {
'direction': 'ingress',
'port_range_min': port,
'port_range_max': port,
'protocol': protocol,
'remote_ip_prefix': rule[
'remote_ip_prefix'],
'security_group_id': sg_id,
'description': sg_rule_name,
},
})
except n_exc.NeutronClientException as ex:
if ex.status_code != requests.codes.conflict:
LOG.exception('Failed when creating security '
'group rule for listener %s.',
sg_rule_name)
def _extend_lb_security_group_rules(self, loadbalancer, listener):
neutron = clients.get_neutron_client()
@ -242,7 +290,7 @@ class LBaaSv2Driver(base.LBaaSDriver):
'rule for listener %s.', listener.name)
# ensure routes have access to the services
service_subnet_cidr = self._get_subnet_cidr(loadbalancer.subnet_id)
service_subnet_cidr = utils.get_subnet_cidr(loadbalancer.subnet_id)
try:
# add access from service subnet
neutron.create_security_group_rule({
@ -261,7 +309,7 @@ class LBaaSv2Driver(base.LBaaSDriver):
# support
worker_subnet_id = CONF.pod_vif_nested.worker_nodes_subnet
if worker_subnet_id:
worker_subnet_cidr = self._get_subnet_cidr(worker_subnet_id)
worker_subnet_cidr = utils.get_subnet_cidr(worker_subnet_id)
neutron.create_security_group_rule({
'security_group_rule': {
'direction': 'ingress',
@ -321,7 +369,10 @@ class LBaaSv2Driver(base.LBaaSDriver):
lbaas.delete_listener,
listener.id)
sg_id = self._find_listeners_sg(loadbalancer)
if CONF.octavia_defaults.sg_mode == 'create':
sg_id = self._find_listeners_sg(loadbalancer)
else:
sg_id = self._get_vip_port(loadbalancer).get('security_groups')[0]
if sg_id:
rules = neutron.list_security_group_rules(
security_group_id=sg_id, description=listener.name)
@ -363,7 +414,7 @@ class LBaaSv2Driver(base.LBaaSDriver):
def ensure_member(self, loadbalancer, pool,
subnet_id, ip, port, target_ref_namespace,
target_ref_name):
target_ref_name, listener_port=None):
name = ("%s/%s" % (target_ref_namespace, target_ref_name))
name += ":%s" % port
member = obj_lbaas.LBaaSMember(name=name,
@ -372,9 +423,19 @@ class LBaaSv2Driver(base.LBaaSDriver):
subnet_id=subnet_id,
ip=ip,
port=port)
return self._ensure_provisioned(loadbalancer, member,
self._create_member,
self._find_member)
result = self._ensure_provisioned(loadbalancer, member,
self._create_member,
self._find_member)
network_policy = (
'policy' in CONF.kubernetes.enabled_handlers and
CONF.kubernetes.service_security_groups_driver == 'policy')
if network_policy and listener_port:
protocol = pool.protocol
sg_rule_name = pool.name
self._apply_members_security_groups(loadbalancer, listener_port,
port, protocol, sg_rule_name)
return result
def release_member(self, loadbalancer, member):
lbaas = clients.get_loadbalancer_client()
@ -397,15 +458,6 @@ class LBaaSv2Driver(base.LBaaSDriver):
return None
def _get_subnet_cidr(self, subnet_id):
neutron = clients.get_neutron_client()
try:
subnet_obj = neutron.show_subnet(subnet_id)
except n_exc.NeutronClientException:
LOG.exception("Subnet %s CIDR not found!", subnet_id)
raise
return subnet_obj.get('subnet')['cidr']
def _create_loadbalancer(self, loadbalancer):
lbaas = clients.get_loadbalancer_client()

View File

@ -17,10 +17,12 @@ from oslo_log import log as logging
from neutronclient.common import exceptions as n_exc
from kuryr_kubernetes import clients
from kuryr_kubernetes import config
from kuryr_kubernetes import constants
from kuryr_kubernetes.controller.drivers import base
from kuryr_kubernetes.controller.drivers import utils
from kuryr_kubernetes.controller.drivers import utils as driver_utils
from kuryr_kubernetes import exceptions
from kuryr_kubernetes import utils
LOG = logging.getLogger(__name__)
@ -93,14 +95,14 @@ class NetworkPolicyDriver(base.NetworkPolicyDriver):
current_sg_rules]
for sg_rule in sg_rules_to_delete:
try:
utils.delete_security_group_rule(sgr_ids[sg_rule])
driver_utils.delete_security_group_rule(sgr_ids[sg_rule])
except n_exc.NotFound:
LOG.debug('Trying to delete non existing sg_rule %s', sg_rule)
# Create new rules that weren't already on the security group
sg_rules_to_add = [rule for rule in current_sg_rules if rule not in
existing_sg_rules]
for sg_rule in sg_rules_to_add:
sgr_id = utils.create_security_group_rule(sg_rule)
sgr_id = driver_utils.create_security_group_rule(sg_rule)
if sg_rule['security_group_rule'].get('direction') == 'ingress':
for i_rule in i_rules:
if sg_rule == i_rule:
@ -111,8 +113,8 @@ class NetworkPolicyDriver(base.NetworkPolicyDriver):
e_rule["security_group_rule"]["id"] = sgr_id
# Annotate kuryrnetpolicy CRD with current policy and ruleset
pod_selector = policy['spec'].get('podSelector')
utils.patch_kuryr_crd(crd, i_rules, e_rules, pod_selector,
np_spec=policy['spec'])
driver_utils.patch_kuryr_crd(crd, i_rules, e_rules, pod_selector,
np_spec=policy['spec'])
if existing_pod_selector != pod_selector:
return existing_pod_selector
@ -142,13 +144,26 @@ class NetworkPolicyDriver(base.NetworkPolicyDriver):
sg_id = sg['security_group']['id']
i_rules, e_rules = self.parse_network_policy_rules(policy, sg_id)
for i_rule in i_rules:
sgr_id = utils.create_security_group_rule(i_rule)
sgr_id = driver_utils.create_security_group_rule(i_rule)
i_rule['security_group_rule']['id'] = sgr_id
for e_rule in e_rules:
sgr_id = utils.create_security_group_rule(e_rule)
sgr_id = driver_utils.create_security_group_rule(e_rule)
e_rule['security_group_rule']['id'] = sgr_id
# NOTE(ltomasbo): Add extra SG rule to allow traffic from services
# subnet
svc_cidr = utils.get_subnet_cidr(
config.CONF.neutron_defaults.service_subnet)
svc_rule = {
u'security_group_rule': {
u'ethertype': 'IPv4',
u'security_group_id': sg_id,
u'direction': 'ingress',
u'description': 'Kuryr-Kubernetes NetPolicy SG rule',
u'remote_ip_prefix': svc_cidr
}}
driver_utils.create_security_group_rule(svc_rule)
except (n_exc.NeutronClientException, exceptions.ResourceNotReady):
LOG.exception("Error creating security group for network policy "
" %s", policy['metadata']['name'])
@ -179,12 +194,13 @@ class NetworkPolicyDriver(base.NetworkPolicyDriver):
ips = []
matching_pods = {"items": []}
if namespace_selector:
matching_namespaces = utils.get_namespaces(namespace_selector)
matching_namespaces = driver_utils.get_namespaces(
namespace_selector)
for ns in matching_namespaces.get('items'):
matching_pods = utils.get_pods(pod_selector,
ns['metadata']['name'])
matching_pods = driver_utils.get_pods(pod_selector,
ns['metadata']['name'])
else:
matching_pods = utils.get_pods(pod_selector, namespace)
matching_pods = driver_utils.get_pods(pod_selector, namespace)
for pod in matching_pods.get('items'):
if pod['status']['podIP']:
ips.append(pod['status']['podIP'])
@ -214,7 +230,8 @@ class NetworkPolicyDriver(base.NetworkPolicyDriver):
ns_cidr = self._get_namespace_subnet_cidr(ns)
cidrs.append(ns_cidr)
else:
matching_namespaces = utils.get_namespaces(namespace_selector)
matching_namespaces = driver_utils.get_namespaces(
namespace_selector)
for ns in matching_namespaces.get('items'):
# NOTE(ltomasbo): This requires the namespace handler to be
# also enabled
@ -280,7 +297,7 @@ class NetworkPolicyDriver(base.NetworkPolicyDriver):
if rule_list[0] == {}:
LOG.debug('Applying default all open policy from %s',
policy['metadata']['selfLink'])
rule = utils.create_security_group_rule_body(
rule = driver_utils.create_security_group_rule_body(
sg_id, direction, port_range_min=1, port_range_max=65535)
sg_rule_body_list.append(rule)
@ -294,31 +311,33 @@ class NetworkPolicyDriver(base.NetworkPolicyDriver):
for port in rule_block['ports']:
if allowed_cidrs or allow_all or selectors:
for cidr in allowed_cidrs:
rule = utils.create_security_group_rule_body(
sg_id, direction, port.get('port'),
protocol=port.get('protocol'),
cidr=cidr)
rule = (
driver_utils.create_security_group_rule_body(
sg_id, direction, port.get('port'),
protocol=port.get('protocol'),
cidr=cidr))
sg_rule_body_list.append(rule)
if allow_all:
rule = utils.create_security_group_rule_body(
sg_id, direction, port.get('port'),
protocol=port.get('protocol'))
rule = (
driver_utils.create_security_group_rule_body(
sg_id, direction, port.get('port'),
protocol=port.get('protocol')))
sg_rule_body_list.append(rule)
else:
rule = utils.create_security_group_rule_body(
rule = driver_utils.create_security_group_rule_body(
sg_id, direction, port.get('port'),
protocol=port.get('protocol'))
sg_rule_body_list.append(rule)
elif allowed_cidrs or allow_all or selectors:
for cidr in allowed_cidrs:
rule = utils.create_security_group_rule_body(
rule = driver_utils.create_security_group_rule_body(
sg_id, direction,
port_range_min=1,
port_range_max=65535,
cidr=cidr)
sg_rule_body_list.append(rule)
if allow_all:
rule = utils.create_security_group_rule_body(
rule = driver_utils.create_security_group_rule_body(
sg_id, direction,
port_range_min=1,
port_range_max=65535)
@ -456,7 +475,7 @@ class NetworkPolicyDriver(base.NetworkPolicyDriver):
pod_selector = policy['spec'].get('podSelector')
if pod_selector:
policy_namespace = policy['metadata']['namespace']
pods = utils.get_pods(pod_selector, policy_namespace)
pods = driver_utils.get_pods(pod_selector, policy_namespace)
return pods.get('items')
else:
# NOTE(ltomasbo): It affects all the pods on the namespace

View File

@ -192,36 +192,40 @@ def _parse_rules(direction, crd, pod):
return matched, crd_rules
def _get_pod_sgs(pod, project_id):
sg_list = []
pod_labels = pod['metadata'].get('labels')
pod_namespace = pod['metadata']['namespace']
knp_crds = _get_kuryrnetpolicy_crds(namespace=pod_namespace)
for crd in knp_crds.get('items'):
pod_selector = crd['spec'].get('podSelector')
if pod_selector:
if _match_selector(pod_selector, pod_labels):
LOG.debug("Appending %s",
str(crd['spec']['securityGroupId']))
sg_list.append(str(crd['spec']['securityGroupId']))
else:
LOG.debug("Appending %s", str(crd['spec']['securityGroupId']))
sg_list.append(str(crd['spec']['securityGroupId']))
# NOTE(maysams) Pods that are not selected by any Networkpolicy
# are fully accessible. Thus, the default security group is associated.
if not sg_list:
sg_list = config.CONF.neutron_defaults.pod_security_groups
if not sg_list:
raise cfg.RequiredOptError('pod_security_groups',
cfg.OptGroup('neutron_defaults'))
return sg_list[:]
class NetworkPolicySecurityGroupsDriver(base.PodSecurityGroupsDriver):
"""Provides security groups for pods based on network policies"""
def get_security_groups(self, pod, project_id):
sg_list = []
pod_labels = pod['metadata'].get('labels')
pod_namespace = pod['metadata']['namespace']
knp_crds = _get_kuryrnetpolicy_crds(namespace=pod_namespace)
for crd in knp_crds.get('items'):
pod_selector = crd['spec'].get('podSelector')
if pod_selector:
if _match_selector(pod_selector, pod_labels):
LOG.debug("Appending %s",
str(crd['spec']['securityGroupId']))
sg_list.append(str(crd['spec']['securityGroupId']))
else:
LOG.debug("Appending %s", str(crd['spec']['securityGroupId']))
sg_list.append(str(crd['spec']['securityGroupId']))
# NOTE(maysams) Pods that are not selected by any Networkpolicy
# are fully accessible. Thus, the default security group is associated.
if not sg_list:
sg_list = config.CONF.neutron_defaults.pod_security_groups
if not sg_list:
raise cfg.RequiredOptError('pod_security_groups',
cfg.OptGroup('neutron_defaults'))
return sg_list[:]
return _get_pod_sgs(pod, project_id)
def create_sg_rules(self, pod):
LOG.debug("Creating sg rule for pod: %s", pod['metadata']['name'])
@ -297,36 +301,17 @@ class NetworkPolicyServiceSecurityGroupsDriver(
def get_security_groups(self, service, project_id):
sg_list = []
svc_namespace = service['metadata']['namespace']
svc_labels = service['metadata'].get('labels')
LOG.debug("Using labels %s", svc_labels)
knp_crds = _get_kuryrnetpolicy_crds(namespace=svc_namespace)
for crd in knp_crds.get('items'):
pod_selector = crd['spec'].get('podSelector')
if pod_selector:
crd_labels = pod_selector.get('matchLabels', None)
crd_expressions = pod_selector.get('matchExpressions', None)
match_exp = match_lb = True
if crd_expressions:
match_exp = _match_expressions(crd_expressions,
svc_labels)
if crd_labels and svc_labels:
match_lb = _match_labels(crd_labels, svc_labels)
if match_exp and match_lb:
LOG.debug("Appending %s",
str(crd['spec']['securityGroupId']))
sg_list.append(str(crd['spec']['securityGroupId']))
else:
LOG.debug("Appending %s", str(crd['spec']['securityGroupId']))
sg_list.append(str(crd['spec']['securityGroupId']))
# NOTE(maysams) Pods that are not selected by any Networkpolicy
# are fully accessible. Thus, the default security group is associated.
if not sg_list:
sg_list = config.CONF.neutron_defaults.pod_security_groups
if not sg_list:
raise cfg.RequiredOptError('pod_security_groups',
cfg.OptGroup('neutron_defaults'))
svc_selector = service['spec'].get('selector')
# skip is no selector
if svc_selector:
# get affected pods by svc selector
pods = driver_utils.get_pods({'selector': svc_selector},
svc_namespace).get('items')
# NOTE(ltomasbo): We assume all the pods pointed by a service
# have the same labels, and the same policy will be applied to
# all of them. Hence only considering the security groups applied
# to the first one.
if pods:
return _get_pod_sgs(pods[0], project_id)
return sg_list[:]

View File

@ -102,21 +102,26 @@ def get_pods(selector, namespace=None):
"""
kubernetes = clients.get_kubernetes_client()
labels = selector.get('matchLabels', None)
if labels:
# Removing pod-template-hash as pods will not have it and
# otherwise there will be no match
labels.pop('pod-template-hash', None)
labels = replace_encoded_characters(labels)
exps = selector.get('matchExpressions', None)
if exps:
exps = ', '.join(format_expression(exp) for exp in exps)
svc_selector = selector.get('selector')
if svc_selector:
labels = replace_encoded_characters(svc_selector)
else:
labels = selector.get('matchLabels', None)
if labels:
expressions = parse.quote("," + exps)
labels += expressions
else:
labels = parse.quote(exps)
# Removing pod-template-hash as pods will not have it and
# otherwise there will be no match
labels.pop('pod-template-hash', None)
labels = replace_encoded_characters(labels)
exps = selector.get('matchExpressions', None)
if exps:
exps = ', '.join(format_expression(exp) for exp in exps)
if labels:
expressions = parse.quote("," + exps)
labels += expressions
else:
labels = parse.quote(exps)
if namespace:
pods = kubernetes.get(

View File

@ -364,7 +364,8 @@ class LoadBalancerHandler(k8s_base.ResourceEventHandler):
p.port]
except KeyError:
continue
current_targets = {(str(m.ip), m.port) for m in lbaas_state.members}
current_targets = {(str(m.ip), m.port, m.pool_id)
for m in lbaas_state.members}
for subset in endpoints.get('subsets', []):
subset_ports = subset.get('ports', [])
@ -380,14 +381,14 @@ class LoadBalancerHandler(k8s_base.ResourceEventHandler):
continue
for subset_port in subset_ports:
target_port = subset_port['port']
if (target_ip, target_port) in current_targets:
continue
port_name = subset_port.get('name')
try:
pool = pool_by_tgt_name[port_name]
except KeyError:
LOG.debug("No pool found for port: %r", port_name)
continue
if (target_ip, target_port, pool.id) in current_targets:
continue
# TODO(apuimedo): Do not pass subnet_id at all when in
# L3 mode once old neutron-lbaasv2 is not supported, as
# octavia does not require it
@ -400,6 +401,15 @@ class LoadBalancerHandler(k8s_base.ResourceEventHandler):
# from VIP to pods happens in layer 3 mode, i.e.,
# routed.
member_subnet_id = lbaas_state.loadbalancer.subnet_id
first_member_of_the_pool = True
for member in lbaas_state.members:
if pool.id == member.pool_id:
first_member_of_the_pool = False
break
if first_member_of_the_pool:
listener_port = lsnr_by_id[pool.listener_id].port
else:
listener_port = None
member = self._drv_lbaas.ensure_member(
loadbalancer=lbaas_state.loadbalancer,
pool=pool,
@ -407,7 +417,8 @@ class LoadBalancerHandler(k8s_base.ResourceEventHandler):
ip=target_ip,
port=target_port,
target_ref_namespace=target_ref['namespace'],
target_ref_name=target_ref['name'])
target_ref_name=target_ref['name'],
listener_port=listener_port)
lbaas_state.members.append(member)
changed = True

View File

@ -159,6 +159,8 @@ class TestLBaaSv2Driver(test_base.TestCase):
'security_group_rules': []}
cls = d_lbaasv2.LBaaSv2Driver
m_driver = mock.Mock(spec=d_lbaasv2.LBaaSv2Driver)
m_driver._get_vip_port.return_value = {
'security_groups': [mock.sentinel.sg_id]}
loadbalancer = mock.Mock()
listener = mock.Mock()

View File

@ -19,6 +19,7 @@ from kuryr_kubernetes.controller.drivers import network_policy
from kuryr_kubernetes import exceptions
from kuryr_kubernetes.tests import base as test_base
from kuryr_kubernetes.tests.unit import kuryr_fixtures as k_fix
from kuryr_kubernetes import utils
from neutronclient.common import exceptions as n_exc
@ -185,11 +186,15 @@ class TestNetworkPolicyDriver(test_base.TestCase):
'_add_kuryrnetpolicy_crd')
@mock.patch.object(network_policy.NetworkPolicyDriver,
'parse_network_policy_rules')
def test_create_security_group_rules_from_network_policy(self, m_parse,
@mock.patch.object(utils, 'get_subnet_cidr')
def test_create_security_group_rules_from_network_policy(self, m_utils,
m_parse,
m_add_crd,
m_get_crd):
self._driver.neutron.create_security_group.return_value = {
'security_group': {'id': mock.sentinel.id}}
m_utils.get_subnet_cidr.return_value = {
'subnet': {'cidr': mock.sentinel.cidr}}
m_parse.return_value = (self._i_rules, self._e_rules)
self._driver.neutron.create_security_group_rule.return_value = {
'security_group_rule': {'id': mock.sentinel.id}}
@ -204,10 +209,13 @@ class TestNetworkPolicyDriver(test_base.TestCase):
'_add_kuryrnetpolicy_crd')
@mock.patch.object(network_policy.NetworkPolicyDriver,
'parse_network_policy_rules')
def test_create_security_group_rules_with_k8s_exc(self, m_parse,
@mock.patch.object(utils, 'get_subnet_cidr')
def test_create_security_group_rules_with_k8s_exc(self, m_utils, m_parse,
m_add_crd, m_get_crd):
self._driver.neutron.create_security_group.return_value = {
'security_group': {'id': mock.sentinel.id}}
m_utils.get_subnet_cidr.return_value = {
'subnet': {'cidr': mock.sentinel.cidr}}
m_parse.return_value = (self._i_rules, self._e_rules)
m_get_crd.side_effect = exceptions.K8sClientException
self._driver.neutron.create_security_group_rule.return_value = {
@ -224,10 +232,13 @@ class TestNetworkPolicyDriver(test_base.TestCase):
'_add_kuryrnetpolicy_crd')
@mock.patch.object(network_policy.NetworkPolicyDriver,
'parse_network_policy_rules')
def test_create_security_group_rules_error_add_crd(self, m_parse,
@mock.patch.object(utils, 'get_subnet_cidr')
def test_create_security_group_rules_error_add_crd(self, m_utils, m_parse,
m_add_crd, m_get_crd):
self._driver.neutron.create_security_group.return_value = {
'security_group': {'id': mock.sentinel.id}}
m_utils.get_subnet_cidr.return_value = {
'subnet': {'cidr': mock.sentinel.cidr}}
m_parse.return_value = (self._i_rules, self._e_rules)
m_add_crd.side_effect = exceptions.K8sClientException
self._driver.neutron.create_security_group_rule.return_value = {

View File

@ -382,7 +382,7 @@ class FakeLBaaSDriver(drv_base.LBaaSDriver):
id=str(uuid.uuid4()))
def ensure_member(self, loadbalancer, pool, subnet_id, ip, port,
target_ref_namespace, target_ref_name
target_ref_namespace, target_ref_name, listener_port=None
):
name = "%s:%s:%s" % (loadbalancer.name, ip, port)
return obj_lbaas.LBaaSMember(name=name,

View File

@ -16,6 +16,7 @@ import time
import requests
from neutronclient.common import exceptions as n_exc
from os_vif import objects
from oslo_cache import core as cache
from oslo_config import cfg
@ -161,6 +162,17 @@ def get_subnet(subnet_id):
return network
@MEMOIZE
def get_subnet_cidr(subnet_id):
neutron = clients.get_neutron_client()
try:
subnet_obj = neutron.show_subnet(subnet_id)
except n_exc.NeutronClientException:
LOG.exception("Subnet %s CIDR not found!", subnet_id)
raise
return subnet_obj.get('subnet')['cidr']
def extract_pod_annotation(annotation):
obj = objects.base.VersionedObject.obj_from_primitive(annotation)
# FIXME(dulek): This is code to maintain compatibility with Queens. We can