Add support for podSelector

This include support for both types, when pod selector is used alone
or together with a namespace selector.

TODO in follow up patch sets:
- React to new pods/namespaces created with labels
- React to pod/namespaces relabeling/deletion

Partially Implements: blueprint k8s-network-policies

Change-Id: Ie29b9da64fcd5df7b9a0e9af7b4835208f76da66
This commit is contained in:
Luis Tomas Bolivar 2018-12-07 18:31:50 +01:00
parent fe583c3e6d
commit 30369502bb
3 changed files with 206 additions and 120 deletions

View File

@ -12,9 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from six.moves.urllib.parse import urlencode
from oslo_log import log as logging
from neutronclient.common import exceptions as n_exc
@ -72,7 +69,7 @@ class NetworkPolicyDriver(base.NetworkPolicyDriver):
LOG.debug("Already existing CRD %s", crd_name)
sg_id = crd['spec']['securityGroupId']
# Fetch existing SG rules from kuryrnetpolicy CRD
existing_sg_rules = None
existing_sg_rules = []
existing_i_rules = crd['spec'].get('ingressSgRules')
existing_e_rules = crd['spec'].get('egressSgRules')
if existing_i_rules or existing_e_rules:
@ -158,7 +155,7 @@ class NetworkPolicyDriver(base.NetworkPolicyDriver):
for e_rule in e_rules:
sgr_id = self._create_security_group_rule(e_rule)
e_rule['security_group_rule']['id'] = sgr_id
except n_exc.NeutronClientException:
except (n_exc.NeutronClientException, exceptions.ResourceNotReady):
LOG.exception("Error creating security group for network policy "
" %s", policy['metadata']['name'])
# If there's any issue creating sg rules, remove them
@ -183,36 +180,161 @@ class NetworkPolicyDriver(base.NetworkPolicyDriver):
LOG.exception('Error annotating network policy')
raise
def _get_namespaces_cidr(self, namespace_selector):
def _get_pods_ips(self, pod_selector, namespace=None,
namespace_selector=None):
ips = []
matching_pods = []
if namespace_selector:
matching_namespaces = utils.get_namespaces(namespace_selector)
for ns in matching_namespaces.get('items'):
matching_pods = utils.get_pods(pod_selector,
ns['metadata']['name'])
else:
matching_pods = utils.get_pods(pod_selector, namespace)
for pod in matching_pods.get('items'):
if pod['status']['podIP']:
ips.append(pod['status']['podIP'])
return ips
def _get_namespace_subnet_cidr(self, namespace):
try:
ns_annotations = namespace['metadata']['annotations']
ns_name = ns_annotations[constants.K8S_ANNOTATION_NET_CRD]
except KeyError:
LOG.exception('Namespace handler must be enabled to support '
'Network Policies with namespaceSelector')
raise exceptions.ResourceNotReady(namespace)
try:
net_crd = self.kubernetes.get('{}/kuryrnets/{}'.format(
constants.K8S_API_CRD, ns_name))
except exceptions.K8sClientException:
LOG.exception("Kubernetes Client Exception.")
raise
return net_crd['spec']['subnetCIDR']
def _get_namespaces_cidr(self, namespace_selector, namespace=None):
cidrs = []
namespace_label = urlencode(namespace_selector[
'matchLabels'])
# NOTE(maysams): K8s API does not accept &, so we need to replace
# it with ',' or '%2C' instead
namespace_label = namespace_label.replace('&', ',')
matching_namespaces = self.kubernetes.get(
'{}/namespaces?labelSelector={}'.format(
constants.K8S_API_BASE, namespace_label)).get('items')
for ns in matching_namespaces:
# NOTE(ltomasbo): This requires the namespace handler to be
# also enabled
try:
ns_annotations = ns['metadata']['annotations']
ns_name = ns_annotations[constants.K8S_ANNOTATION_NET_CRD]
except KeyError:
LOG.exception('Namespace handler must be enabled to support '
'Network Policies with namespaceSelector')
raise
try:
net_crd = self.kubernetes.get('{}/kuryrnets/{}'.format(
constants.K8S_API_CRD, ns_name))
except exceptions.K8sClientException:
LOG.exception("Kubernetes Client Exception.")
raise
ns_cidr = net_crd['spec']['subnetCIDR']
if not namespace_selector and namespace:
ns = self.kubernetes.get(
'{}/namespaces/{}'.format(constants.K8S_API_BASE, namespace))
ns_cidr = self._get_namespace_subnet_cidr(ns)
cidrs.append(ns_cidr)
else:
matching_namespaces = utils.get_namespaces(namespace_selector)
for ns in matching_namespaces.get('items'):
# NOTE(ltomasbo): This requires the namespace handler to be
# also enabled
ns_cidr = self._get_namespace_subnet_cidr(ns)
cidrs.append(ns_cidr)
return cidrs
def _parse_selectors(self, rule_block, rule_direction, policy_namespace):
allowed_cidrs = []
allow_all = False
selectors = False
for rule in rule_block.get(rule_direction, []):
namespace_selector = rule.get('namespaceSelector')
pod_selector = rule.get('podSelector')
if namespace_selector == {}:
selectors = True
if pod_selector:
# allow matching pods in all namespaces
allowed_cidrs.extend(self._get_pods_ips(
pod_selector))
else:
# allow from all
allow_all = True
elif namespace_selector:
selectors = True
if pod_selector:
# allow matching pods on maching namespaces
allowed_cidrs.extend(self._get_pods_ips(
pod_selector,
namespace_selector=namespace_selector))
else:
# allow from/to all on the maching namespaces
allowed_cidrs.extend(self._get_namespaces_cidr(
namespace_selector))
else:
if pod_selector == {}:
# allow from/to all pods on the network policy
# namespace
selectors = True
allowed_cidrs.extend(self._get_namespaces_cidr(
None,
namespace=policy_namespace))
elif pod_selector:
# allow matching pods on the network policy
# namespace
selectors = True
allowed_cidrs.extend(self._get_pods_ips(
pod_selector,
namespace=policy_namespace))
return allow_all, selectors, allowed_cidrs
def _parse_sg_rules(self, sg_rule_body_list, direction, policy, sg_id):
rule_list = policy['spec'].get(direction)
if not rule_list:
return
policy_namespace = policy['metadata']['namespace']
rule_direction = 'from'
if direction == 'egress':
rule_direction = 'to'
if rule_list[0] == {}:
LOG.debug('Applying default all open policy from %s',
policy['metadata']['selfLink'])
rule = self._create_security_group_rule_body(
sg_id, direction, port_range_min=1, port_range_max=65535)
sg_rule_body_list.append(rule)
for rule_block in rule_list:
LOG.debug('Parsing %(dir)s Rule %(rule)s', {'dir': direction,
'rule': rule_block})
allow_all, selectors, allowed_cidrs = self._parse_selectors(
rule_block, rule_direction, policy_namespace)
if 'ports' in rule_block:
for port in rule_block['ports']:
if allowed_cidrs or allow_all or selectors:
for cidr in allowed_cidrs:
rule = self._create_security_group_rule_body(
sg_id, direction, port.get('port'),
protocol=port.get('protocol'),
cidr=cidr)
sg_rule_body_list.append(rule)
if allow_all:
rule = self._create_security_group_rule_body(
sg_id, direction, port.get('port'),
protocol=port.get('protocol'))
sg_rule_body_list.append(rule)
else:
rule = self._create_security_group_rule_body(
sg_id, direction, port.get('port'),
protocol=port.get('protocol'))
sg_rule_body_list.append(rule)
elif allowed_cidrs or allow_all or selectors:
for cidr in allowed_cidrs:
rule = self._create_security_group_rule_body(
sg_id, direction,
port_range_min=1,
port_range_max=65535,
cidr=cidr)
sg_rule_body_list.append(rule)
if allow_all:
rule = self._create_security_group_rule_body(
sg_id, direction,
port_range_min=1,
port_range_max=65535)
sg_rule_body_list.append(rule)
else:
LOG.debug('This network policy specifies no %(direction)s '
'%(rule_direction)s and no ports: %(policy)s',
{'direction': direction,
'rule_direction': rule_direction,
'policy': policy['metadata']['selfLink']})
def parse_network_policy_rules(self, policy, sg_id):
"""Create security group rule bodies out of network policies.
@ -221,94 +343,13 @@ class NetworkPolicyDriver(base.NetworkPolicyDriver):
ingress and egress ports blocks.
"""
LOG.debug('Parsing Network Policy %s' % policy['metadata']['name'])
ingress_rule_list = policy['spec'].get('ingress')
egress_rule_list = policy['spec'].get('egress')
ingress_sg_rule_body_list = []
egress_sg_rule_body_list = []
if ingress_rule_list:
if ingress_rule_list[0] == {}:
LOG.debug('Applying default all open policy from %s',
policy['metadata']['selfLink'])
i_rule = self._create_security_group_rule_body(
sg_id, 'ingress', port_range_min=1, port_range_max=65535)
ingress_sg_rule_body_list.append(i_rule)
for ingress_rule in ingress_rule_list:
LOG.debug('Parsing Ingress Rule %s', ingress_rule)
allowed_cidrs = []
for from_rule in ingress_rule.get('from', []):
namespace_selector = from_rule.get('namespaceSelector')
if namespace_selector:
allowed_cidrs = self._get_namespaces_cidr(
namespace_selector)
if 'ports' in ingress_rule:
for port in ingress_rule['ports']:
if allowed_cidrs:
for cidr in allowed_cidrs:
i_rule = self._create_security_group_rule_body(
sg_id, 'ingress', port.get('port'),
protocol=port.get('protocol'),
cidr=cidr)
ingress_sg_rule_body_list.append(i_rule)
else:
i_rule = self._create_security_group_rule_body(
sg_id, 'ingress', port.get('port'),
protocol=port.get('protocol'))
ingress_sg_rule_body_list.append(i_rule)
elif allowed_cidrs:
for cidr in allowed_cidrs:
i_rule = self._create_security_group_rule_body(
sg_id, 'ingress',
port_range_min=1,
port_range_max=65535,
cidr=cidr)
ingress_sg_rule_body_list.append(i_rule)
else:
LOG.debug('This network policy specifies no ingress from '
'and no ports: %s',
policy['metadata']['selfLink'])
if egress_rule_list:
if egress_rule_list[0] == {}:
LOG.debug('Applying default all open policy from %s',
policy['metadata']['selfLink'])
e_rule = self._create_security_group_rule_body(
sg_id, 'egress', port_range_min=1, port_range_max=65535)
egress_sg_rule_body_list.append(e_rule)
for egress_rule in egress_rule_list:
LOG.debug('Parsing Egress Rule %s', egress_rule)
allowed_cidrs = []
for from_rule in egress_rule.get('to', []):
namespace_selector = from_rule.get('namespaceSelector')
if namespace_selector:
allowed_cidrs = self._get_namespaces_cidr(
namespace_selector)
if 'ports' in egress_rule:
for port in egress_rule['ports']:
if allowed_cidrs:
for cidr in allowed_cidrs:
e_rule = self._create_security_group_rule_body(
sg_id, 'egress', port.get('port'),
protocol=port.get('protocol'),
cidr=cidr)
egress_sg_rule_body_list.append(e_rule)
else:
e_rule = self._create_security_group_rule_body(
sg_id, 'egress', port.get('port'),
protocol=port.get('protocol'))
egress_sg_rule_body_list.append(e_rule)
elif allowed_cidrs:
for cidr in allowed_cidrs:
e_rule = self._create_security_group_rule_body(
sg_id, 'egress',
port_range_min=1,
port_range_max=65535,
cidr=cidr)
egress_sg_rule_body_list.append(e_rule)
else:
LOG.debug('This network policy specifies no egrees to '
'and no ports: %s',
policy['metadata']['selfLink'])
self._parse_sg_rules(ingress_sg_rule_body_list, 'ingress', policy,
sg_id)
self._parse_sg_rules(egress_sg_rule_body_list, 'egress', policy,
sg_id)
return ingress_sg_rule_body_list, egress_sg_rule_body_list

View File

@ -66,6 +66,18 @@ def is_host_network(pod):
def get_pods(selector, namespace):
"""Return a k8s object list with the pods matching the selector.
It accepts an optional parameter to state the namespace where the pod
selector will be apply. If empty namespace is passed, then the pod
selector is applied in all namespaces.
param selector: k8s selector of types matchLabels or matchExpressions
param namespace: namespace name where the selector will be applied. If
None, the pod selector is applied in all namespaces
return: k8s list objec containing all matching pods
"""
kubernetes = clients.get_kubernetes_client()
labels = selector.get('matchLabels', None)
if labels:
@ -83,13 +95,45 @@ def get_pods(selector, namespace):
else:
labels = parse.quote(exps)
pods = kubernetes.get(
'{}/namespaces/{}/pods?labelSelector={}'.format(
constants.K8S_API_BASE, namespace, labels))
if namespace:
pods = kubernetes.get(
'{}/namespaces/{}/pods?labelSelector={}'.format(
constants.K8S_API_BASE, namespace, labels))
else:
pods = kubernetes.get(
'{}/pods?labelSelector={}'.format(constants.K8S_API_BASE, labels))
return pods
def get_namespaces(selector):
"""Return a k8s object list with the namespaces matching the selector.
param selector: k8s selector of types matchLabels or matchExpressions
return: k8s list objec containing all matching namespaces
"""
kubernetes = clients.get_kubernetes_client()
labels = selector.get('matchLabels', None)
if labels:
labels = replace_encoded_characters(labels)
exps = selector.get('matchExpressions', None)
if exps:
exps = ', '.join(format_expression(exp) for exp in exps)
if labels:
expressions = parse.quote("," + exps)
labels += expressions
else:
labels = parse.quote(exps)
namespaces = kubernetes.get(
'{}/namespaces?labelSelector={}'.format(
constants.K8S_API_BASE, labels))
return namespaces
def format_expression(expression):
key = expression['key']
operator = expression['operator'].lower()

View File

@ -306,7 +306,8 @@ class TestNetworkPolicyDriver(test_base.TestCase):
pod = get_pod_obj()
self.kubernetes.get.return_value = {'items': [pod]}
self.assertRaises(KeyError, self._driver._get_namespaces_cidr,
self.assertRaises(exceptions.ResourceNotReady,
self._driver._get_namespaces_cidr,
namespace_selector)
self.kubernetes.get.assert_called_once()