Add Network Policy support to services

This patch adds support for Network Policy on services. It
applies pods' security groups onto the services in front of them.
It makes the next assumptions:
- All the pods pointed by one svc have the same labels, thus the same
sgs being enforced
- Only copies the SG rules that have the same protocol and direction
as the listener being created
- Adds a default rule to NP to enable traffic from services subnet CIDR

Partially Implements: blueprint k8s-network-policies
Change-Id: Ibd4b51ff40b69af26ab7e7b81d18e63abddf775b
This commit is contained in:
Luis Tomas Bolivar 2018-12-24 13:01:12 +01:00 committed by Daniel Mellado
parent 71a8ebd1f0
commit b200d368cd
9 changed files with 215 additions and 118 deletions

View File

@ -31,6 +31,7 @@ from kuryr_kubernetes import constants as const
from kuryr_kubernetes.controller.drivers import base
from kuryr_kubernetes import exceptions as k_exc
from kuryr_kubernetes.objects import lbaas as obj_lbaas
from kuryr_kubernetes import utils
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
@ -199,6 +200,53 @@ class LBaaSv2Driver(base.LBaaSDriver):
LOG.exception('Failed when creating security group rule '
'for listener %s.', listener.name)
def _apply_members_security_groups(self, loadbalancer, port, target_port,
protocol, sg_rule_name):
neutron = clients.get_neutron_client()
if CONF.octavia_defaults.sg_mode == 'create':
sg_id = self._find_listeners_sg(loadbalancer)
else:
sg_id = self._get_vip_port(loadbalancer).get('security_groups')[0]
# Check if Network Policy allows listener on the pods
for sg in loadbalancer.security_groups:
if sg != sg_id:
rules = neutron.list_security_group_rules(
security_group_id=sg)
for rule in rules['security_group_rules']:
# copying ingress rules with same protocol onto the
# loadbalancer sg rules
# NOTE(ltomasbo): NP security groups only have
# remote_ip_prefix, not remote_group_id, therefore only
# applying the ones with remote_ip_prefix
if (rule['protocol'] == protocol.lower() and
rule['direction'] == 'ingress' and
rule['remote_ip_prefix']):
# If listener port not in allowed range, skip
min_port = rule.get('port_range_min')
max_port = rule.get('port_range_max')
if (min_port and target_port not in range(min_port,
max_port+1)):
continue
try:
neutron.create_security_group_rule({
'security_group_rule': {
'direction': 'ingress',
'port_range_min': port,
'port_range_max': port,
'protocol': protocol,
'remote_ip_prefix': rule[
'remote_ip_prefix'],
'security_group_id': sg_id,
'description': sg_rule_name,
},
})
except n_exc.NeutronClientException as ex:
if ex.status_code != requests.codes.conflict:
LOG.exception('Failed when creating security '
'group rule for listener %s.',
sg_rule_name)
def _extend_lb_security_group_rules(self, loadbalancer, listener):
neutron = clients.get_neutron_client()
@ -242,7 +290,7 @@ class LBaaSv2Driver(base.LBaaSDriver):
'rule for listener %s.', listener.name)
# ensure routes have access to the services
service_subnet_cidr = self._get_subnet_cidr(loadbalancer.subnet_id)
service_subnet_cidr = utils.get_subnet_cidr(loadbalancer.subnet_id)
try:
# add access from service subnet
neutron.create_security_group_rule({
@ -261,7 +309,7 @@ class LBaaSv2Driver(base.LBaaSDriver):
# support
worker_subnet_id = CONF.pod_vif_nested.worker_nodes_subnet
if worker_subnet_id:
worker_subnet_cidr = self._get_subnet_cidr(worker_subnet_id)
worker_subnet_cidr = utils.get_subnet_cidr(worker_subnet_id)
neutron.create_security_group_rule({
'security_group_rule': {
'direction': 'ingress',
@ -321,7 +369,10 @@ class LBaaSv2Driver(base.LBaaSDriver):
lbaas.delete_listener,
listener.id)
sg_id = self._find_listeners_sg(loadbalancer)
if CONF.octavia_defaults.sg_mode == 'create':
sg_id = self._find_listeners_sg(loadbalancer)
else:
sg_id = self._get_vip_port(loadbalancer).get('security_groups')[0]
if sg_id:
rules = neutron.list_security_group_rules(
security_group_id=sg_id, description=listener.name)
@ -363,7 +414,7 @@ class LBaaSv2Driver(base.LBaaSDriver):
def ensure_member(self, loadbalancer, pool,
subnet_id, ip, port, target_ref_namespace,
target_ref_name):
target_ref_name, listener_port=None):
name = ("%s/%s" % (target_ref_namespace, target_ref_name))
name += ":%s" % port
member = obj_lbaas.LBaaSMember(name=name,
@ -372,9 +423,19 @@ class LBaaSv2Driver(base.LBaaSDriver):
subnet_id=subnet_id,
ip=ip,
port=port)
return self._ensure_provisioned(loadbalancer, member,
self._create_member,
self._find_member)
result = self._ensure_provisioned(loadbalancer, member,
self._create_member,
self._find_member)
network_policy = (
'policy' in CONF.kubernetes.enabled_handlers and
CONF.kubernetes.service_security_groups_driver == 'policy')
if network_policy and listener_port:
protocol = pool.protocol
sg_rule_name = pool.name
self._apply_members_security_groups(loadbalancer, listener_port,
port, protocol, sg_rule_name)
return result
def release_member(self, loadbalancer, member):
lbaas = clients.get_loadbalancer_client()
@ -397,15 +458,6 @@ class LBaaSv2Driver(base.LBaaSDriver):
return None
def _get_subnet_cidr(self, subnet_id):
neutron = clients.get_neutron_client()
try:
subnet_obj = neutron.show_subnet(subnet_id)
except n_exc.NeutronClientException:
LOG.exception("Subnet %s CIDR not found!", subnet_id)
raise
return subnet_obj.get('subnet')['cidr']
def _create_loadbalancer(self, loadbalancer):
lbaas = clients.get_loadbalancer_client()

View File

@ -17,10 +17,12 @@ from oslo_log import log as logging
from neutronclient.common import exceptions as n_exc
from kuryr_kubernetes import clients
from kuryr_kubernetes import config
from kuryr_kubernetes import constants
from kuryr_kubernetes.controller.drivers import base
from kuryr_kubernetes.controller.drivers import utils
from kuryr_kubernetes.controller.drivers import utils as driver_utils
from kuryr_kubernetes import exceptions
from kuryr_kubernetes import utils
LOG = logging.getLogger(__name__)
@ -93,14 +95,14 @@ class NetworkPolicyDriver(base.NetworkPolicyDriver):
current_sg_rules]
for sg_rule in sg_rules_to_delete:
try:
utils.delete_security_group_rule(sgr_ids[sg_rule])
driver_utils.delete_security_group_rule(sgr_ids[sg_rule])
except n_exc.NotFound:
LOG.debug('Trying to delete non existing sg_rule %s', sg_rule)
# Create new rules that weren't already on the security group
sg_rules_to_add = [rule for rule in current_sg_rules if rule not in
existing_sg_rules]
for sg_rule in sg_rules_to_add:
sgr_id = utils.create_security_group_rule(sg_rule)
sgr_id = driver_utils.create_security_group_rule(sg_rule)
if sg_rule['security_group_rule'].get('direction') == 'ingress':
for i_rule in i_rules:
if sg_rule == i_rule:
@ -111,8 +113,8 @@ class NetworkPolicyDriver(base.NetworkPolicyDriver):
e_rule["security_group_rule"]["id"] = sgr_id
# Annotate kuryrnetpolicy CRD with current policy and ruleset
pod_selector = policy['spec'].get('podSelector')
utils.patch_kuryr_crd(crd, i_rules, e_rules, pod_selector,
np_spec=policy['spec'])
driver_utils.patch_kuryr_crd(crd, i_rules, e_rules, pod_selector,
np_spec=policy['spec'])
if existing_pod_selector != pod_selector:
return existing_pod_selector
@ -142,13 +144,26 @@ class NetworkPolicyDriver(base.NetworkPolicyDriver):
sg_id = sg['security_group']['id']
i_rules, e_rules = self.parse_network_policy_rules(policy, sg_id)
for i_rule in i_rules:
sgr_id = utils.create_security_group_rule(i_rule)
sgr_id = driver_utils.create_security_group_rule(i_rule)
i_rule['security_group_rule']['id'] = sgr_id
for e_rule in e_rules:
sgr_id = utils.create_security_group_rule(e_rule)
sgr_id = driver_utils.create_security_group_rule(e_rule)
e_rule['security_group_rule']['id'] = sgr_id
# NOTE(ltomasbo): Add extra SG rule to allow traffic from services
# subnet
svc_cidr = utils.get_subnet_cidr(
config.CONF.neutron_defaults.service_subnet)
svc_rule = {
u'security_group_rule': {
u'ethertype': 'IPv4',
u'security_group_id': sg_id,
u'direction': 'ingress',
u'description': 'Kuryr-Kubernetes NetPolicy SG rule',
u'remote_ip_prefix': svc_cidr
}}
driver_utils.create_security_group_rule(svc_rule)
except (n_exc.NeutronClientException, exceptions.ResourceNotReady):
LOG.exception("Error creating security group for network policy "
" %s", policy['metadata']['name'])
@ -179,12 +194,13 @@ class NetworkPolicyDriver(base.NetworkPolicyDriver):
ips = []
matching_pods = []
if namespace_selector:
matching_namespaces = utils.get_namespaces(namespace_selector)
matching_namespaces = driver_utils.get_namespaces(
namespace_selector)
for ns in matching_namespaces.get('items'):
matching_pods = utils.get_pods(pod_selector,
ns['metadata']['name'])
matching_pods = driver_utils.get_pods(pod_selector,
ns['metadata']['name'])
else:
matching_pods = utils.get_pods(pod_selector, namespace)
matching_pods = driver_utils.get_pods(pod_selector, namespace)
for pod in matching_pods.get('items'):
if pod['status']['podIP']:
ips.append(pod['status']['podIP'])
@ -214,7 +230,8 @@ class NetworkPolicyDriver(base.NetworkPolicyDriver):
ns_cidr = self._get_namespace_subnet_cidr(ns)
cidrs.append(ns_cidr)
else:
matching_namespaces = utils.get_namespaces(namespace_selector)
matching_namespaces = driver_utils.get_namespaces(
namespace_selector)
for ns in matching_namespaces.get('items'):
# NOTE(ltomasbo): This requires the namespace handler to be
# also enabled
@ -280,7 +297,7 @@ class NetworkPolicyDriver(base.NetworkPolicyDriver):
if rule_list[0] == {}:
LOG.debug('Applying default all open policy from %s',
policy['metadata']['selfLink'])
rule = utils.create_security_group_rule_body(
rule = driver_utils.create_security_group_rule_body(
sg_id, direction, port_range_min=1, port_range_max=65535)
sg_rule_body_list.append(rule)
@ -294,31 +311,33 @@ class NetworkPolicyDriver(base.NetworkPolicyDriver):
for port in rule_block['ports']:
if allowed_cidrs or allow_all or selectors:
for cidr in allowed_cidrs:
rule = utils.create_security_group_rule_body(
sg_id, direction, port.get('port'),
protocol=port.get('protocol'),
cidr=cidr)
rule = (
driver_utils.create_security_group_rule_body(
sg_id, direction, port.get('port'),
protocol=port.get('protocol'),
cidr=cidr))
sg_rule_body_list.append(rule)
if allow_all:
rule = utils.create_security_group_rule_body(
sg_id, direction, port.get('port'),
protocol=port.get('protocol'))
rule = (
driver_utils.create_security_group_rule_body(
sg_id, direction, port.get('port'),
protocol=port.get('protocol')))
sg_rule_body_list.append(rule)
else:
rule = utils.create_security_group_rule_body(
rule = driver_utils.create_security_group_rule_body(
sg_id, direction, port.get('port'),
protocol=port.get('protocol'))
sg_rule_body_list.append(rule)
elif allowed_cidrs or allow_all or selectors:
for cidr in allowed_cidrs:
rule = utils.create_security_group_rule_body(
rule = driver_utils.create_security_group_rule_body(
sg_id, direction,
port_range_min=1,
port_range_max=65535,
cidr=cidr)
sg_rule_body_list.append(rule)
if allow_all:
rule = utils.create_security_group_rule_body(
rule = driver_utils.create_security_group_rule_body(
sg_id, direction,
port_range_min=1,
port_range_max=65535)
@ -456,7 +475,7 @@ class NetworkPolicyDriver(base.NetworkPolicyDriver):
pod_selector = policy['spec'].get('podSelector')
if pod_selector:
policy_namespace = policy['metadata']['namespace']
pods = utils.get_pods(pod_selector, policy_namespace)
pods = driver_utils.get_pods(pod_selector, policy_namespace)
return pods.get('items')
else:
# NOTE(ltomasbo): It affects all the pods on the namespace

View File

@ -192,36 +192,40 @@ def _parse_rules(direction, crd, pod):
return matched, crd_rules
def _get_pod_sgs(pod, project_id):
sg_list = []
pod_labels = pod['metadata'].get('labels')
pod_namespace = pod['metadata']['namespace']
knp_crds = _get_kuryrnetpolicy_crds(namespace=pod_namespace)
for crd in knp_crds.get('items'):
pod_selector = crd['spec'].get('podSelector')
if pod_selector:
if _match_selector(pod_selector, pod_labels):
LOG.debug("Appending %s",
str(crd['spec']['securityGroupId']))
sg_list.append(str(crd['spec']['securityGroupId']))
else:
LOG.debug("Appending %s", str(crd['spec']['securityGroupId']))
sg_list.append(str(crd['spec']['securityGroupId']))
# NOTE(maysams) Pods that are not selected by any Networkpolicy
# are fully accessible. Thus, the default security group is associated.
if not sg_list:
sg_list = config.CONF.neutron_defaults.pod_security_groups
if not sg_list:
raise cfg.RequiredOptError('pod_security_groups',
cfg.OptGroup('neutron_defaults'))
return sg_list[:]
class NetworkPolicySecurityGroupsDriver(base.PodSecurityGroupsDriver):
"""Provides security groups for pods based on network policies"""
def get_security_groups(self, pod, project_id):
sg_list = []
pod_labels = pod['metadata'].get('labels')
pod_namespace = pod['metadata']['namespace']
knp_crds = _get_kuryrnetpolicy_crds(namespace=pod_namespace)
for crd in knp_crds.get('items'):
pod_selector = crd['spec'].get('podSelector')
if pod_selector:
if _match_selector(pod_selector, pod_labels):
LOG.debug("Appending %s",
str(crd['spec']['securityGroupId']))
sg_list.append(str(crd['spec']['securityGroupId']))
else:
LOG.debug("Appending %s", str(crd['spec']['securityGroupId']))
sg_list.append(str(crd['spec']['securityGroupId']))
# NOTE(maysams) Pods that are not selected by any Networkpolicy
# are fully accessible. Thus, the default security group is associated.
if not sg_list:
sg_list = config.CONF.neutron_defaults.pod_security_groups
if not sg_list:
raise cfg.RequiredOptError('pod_security_groups',
cfg.OptGroup('neutron_defaults'))
return sg_list[:]
return _get_pod_sgs(pod, project_id)
def create_sg_rules(self, pod):
LOG.debug("Creating sg rule for pod: %s", pod['metadata']['name'])
@ -297,36 +301,17 @@ class NetworkPolicyServiceSecurityGroupsDriver(
def get_security_groups(self, service, project_id):
sg_list = []
svc_namespace = service['metadata']['namespace']
svc_labels = service['metadata'].get('labels')
LOG.debug("Using labels %s", svc_labels)
knp_crds = _get_kuryrnetpolicy_crds(namespace=svc_namespace)
for crd in knp_crds.get('items'):
pod_selector = crd['spec'].get('podSelector')
if pod_selector:
crd_labels = pod_selector.get('matchLabels', None)
crd_expressions = pod_selector.get('matchExpressions', None)
match_exp = match_lb = True
if crd_expressions:
match_exp = _match_expressions(crd_expressions,
svc_labels)
if crd_labels and svc_labels:
match_lb = _match_labels(crd_labels, svc_labels)
if match_exp and match_lb:
LOG.debug("Appending %s",
str(crd['spec']['securityGroupId']))
sg_list.append(str(crd['spec']['securityGroupId']))
else:
LOG.debug("Appending %s", str(crd['spec']['securityGroupId']))
sg_list.append(str(crd['spec']['securityGroupId']))
# NOTE(maysams) Pods that are not selected by any Networkpolicy
# are fully accessible. Thus, the default security group is associated.
if not sg_list:
sg_list = config.CONF.neutron_defaults.pod_security_groups
if not sg_list:
raise cfg.RequiredOptError('pod_security_groups',
cfg.OptGroup('neutron_defaults'))
svc_selector = service['spec'].get('selector')
# skip is no selector
if svc_selector:
# get affected pods by svc selector
pods = driver_utils.get_pods({'selector': svc_selector},
svc_namespace).get('items')
# NOTE(ltomasbo): We assume all the pods pointed by a service
# have the same labels, and the same policy will be applied to
# all of them. Hence only considering the security groups applied
# to the first one.
if pods:
return _get_pod_sgs(pods[0], project_id)
return sg_list[:]

View File

@ -102,21 +102,26 @@ def get_pods(selector, namespace=None):
"""
kubernetes = clients.get_kubernetes_client()
labels = selector.get('matchLabels', None)
if labels:
# Removing pod-template-hash as pods will not have it and
# otherwise there will be no match
labels.pop('pod-template-hash', None)
labels = replace_encoded_characters(labels)
exps = selector.get('matchExpressions', None)
if exps:
exps = ', '.join(format_expression(exp) for exp in exps)
svc_selector = selector.get('selector')
if svc_selector:
labels = replace_encoded_characters(svc_selector)
else:
labels = selector.get('matchLabels', None)
if labels:
expressions = parse.quote("," + exps)
labels += expressions
else:
labels = parse.quote(exps)
# Removing pod-template-hash as pods will not have it and
# otherwise there will be no match
labels.pop('pod-template-hash', None)
labels = replace_encoded_characters(labels)
exps = selector.get('matchExpressions', None)
if exps:
exps = ', '.join(format_expression(exp) for exp in exps)
if labels:
expressions = parse.quote("," + exps)
labels += expressions
else:
labels = parse.quote(exps)
if namespace:
pods = kubernetes.get(

View File

@ -364,7 +364,8 @@ class LoadBalancerHandler(k8s_base.ResourceEventHandler):
p.port]
except KeyError:
continue
current_targets = {(str(m.ip), m.port) for m in lbaas_state.members}
current_targets = {(str(m.ip), m.port, m.pool_id)
for m in lbaas_state.members}
for subset in endpoints.get('subsets', []):
subset_ports = subset.get('ports', [])
@ -380,14 +381,14 @@ class LoadBalancerHandler(k8s_base.ResourceEventHandler):
continue
for subset_port in subset_ports:
target_port = subset_port['port']
if (target_ip, target_port) in current_targets:
continue
port_name = subset_port.get('name')
try:
pool = pool_by_tgt_name[port_name]
except KeyError:
LOG.debug("No pool found for port: %r", port_name)
continue
if (target_ip, target_port, pool.id) in current_targets:
continue
# TODO(apuimedo): Do not pass subnet_id at all when in
# L3 mode once old neutron-lbaasv2 is not supported, as
# octavia does not require it
@ -400,6 +401,15 @@ class LoadBalancerHandler(k8s_base.ResourceEventHandler):
# from VIP to pods happens in layer 3 mode, i.e.,
# routed.
member_subnet_id = lbaas_state.loadbalancer.subnet_id
first_member_of_the_pool = True
for member in lbaas_state.members:
if pool.id == member.pool_id:
first_member_of_the_pool = False
break
if first_member_of_the_pool:
listener_port = lsnr_by_id[pool.listener_id].port
else:
listener_port = None
member = self._drv_lbaas.ensure_member(
loadbalancer=lbaas_state.loadbalancer,
pool=pool,
@ -407,7 +417,8 @@ class LoadBalancerHandler(k8s_base.ResourceEventHandler):
ip=target_ip,
port=target_port,
target_ref_namespace=target_ref['namespace'],
target_ref_name=target_ref['name'])
target_ref_name=target_ref['name'],
listener_port=listener_port)
lbaas_state.members.append(member)
changed = True

View File

@ -159,6 +159,8 @@ class TestLBaaSv2Driver(test_base.TestCase):
'security_group_rules': []}
cls = d_lbaasv2.LBaaSv2Driver
m_driver = mock.Mock(spec=d_lbaasv2.LBaaSv2Driver)
m_driver._get_vip_port.return_value = {
'security_groups': [mock.sentinel.sg_id]}
loadbalancer = mock.Mock()
listener = mock.Mock()

View File

@ -19,6 +19,7 @@ from kuryr_kubernetes.controller.drivers import network_policy
from kuryr_kubernetes import exceptions
from kuryr_kubernetes.tests import base as test_base
from kuryr_kubernetes.tests.unit import kuryr_fixtures as k_fix
from kuryr_kubernetes import utils
from neutronclient.common import exceptions as n_exc
@ -185,11 +186,15 @@ class TestNetworkPolicyDriver(test_base.TestCase):
'_add_kuryrnetpolicy_crd')
@mock.patch.object(network_policy.NetworkPolicyDriver,
'parse_network_policy_rules')
def test_create_security_group_rules_from_network_policy(self, m_parse,
@mock.patch.object(utils, 'get_subnet_cidr')
def test_create_security_group_rules_from_network_policy(self, m_utils,
m_parse,
m_add_crd,
m_get_crd):
self._driver.neutron.create_security_group.return_value = {
'security_group': {'id': mock.sentinel.id}}
m_utils.get_subnet_cidr.return_value = {
'subnet': {'cidr': mock.sentinel.cidr}}
m_parse.return_value = (self._i_rules, self._e_rules)
self._driver.neutron.create_security_group_rule.return_value = {
'security_group_rule': {'id': mock.sentinel.id}}
@ -204,10 +209,13 @@ class TestNetworkPolicyDriver(test_base.TestCase):
'_add_kuryrnetpolicy_crd')
@mock.patch.object(network_policy.NetworkPolicyDriver,
'parse_network_policy_rules')
def test_create_security_group_rules_with_k8s_exc(self, m_parse,
@mock.patch.object(utils, 'get_subnet_cidr')
def test_create_security_group_rules_with_k8s_exc(self, m_utils, m_parse,
m_add_crd, m_get_crd):
self._driver.neutron.create_security_group.return_value = {
'security_group': {'id': mock.sentinel.id}}
m_utils.get_subnet_cidr.return_value = {
'subnet': {'cidr': mock.sentinel.cidr}}
m_parse.return_value = (self._i_rules, self._e_rules)
m_get_crd.side_effect = exceptions.K8sClientException
self._driver.neutron.create_security_group_rule.return_value = {
@ -224,10 +232,13 @@ class TestNetworkPolicyDriver(test_base.TestCase):
'_add_kuryrnetpolicy_crd')
@mock.patch.object(network_policy.NetworkPolicyDriver,
'parse_network_policy_rules')
def test_create_security_group_rules_error_add_crd(self, m_parse,
@mock.patch.object(utils, 'get_subnet_cidr')
def test_create_security_group_rules_error_add_crd(self, m_utils, m_parse,
m_add_crd, m_get_crd):
self._driver.neutron.create_security_group.return_value = {
'security_group': {'id': mock.sentinel.id}}
m_utils.get_subnet_cidr.return_value = {
'subnet': {'cidr': mock.sentinel.cidr}}
m_parse.return_value = (self._i_rules, self._e_rules)
m_add_crd.side_effect = exceptions.K8sClientException
self._driver.neutron.create_security_group_rule.return_value = {

View File

@ -382,7 +382,7 @@ class FakeLBaaSDriver(drv_base.LBaaSDriver):
id=str(uuid.uuid4()))
def ensure_member(self, loadbalancer, pool, subnet_id, ip, port,
target_ref_namespace, target_ref_name
target_ref_namespace, target_ref_name, listener_port=None
):
name = "%s:%s:%s" % (loadbalancer.name, ip, port)
return obj_lbaas.LBaaSMember(name=name,

View File

@ -16,6 +16,7 @@ import time
import requests
from neutronclient.common import exceptions as n_exc
from os_vif import objects
from oslo_cache import core as cache
from oslo_config import cfg
@ -161,6 +162,17 @@ def get_subnet(subnet_id):
return network
@MEMOIZE
def get_subnet_cidr(subnet_id):
neutron = clients.get_neutron_client()
try:
subnet_obj = neutron.show_subnet(subnet_id)
except n_exc.NeutronClientException:
LOG.exception("Subnet %s CIDR not found!", subnet_id)
raise
return subnet_obj.get('subnet')['cidr']
def extract_pod_annotation(annotation):
obj = objects.base.VersionedObject.obj_from_primitive(annotation)
# FIXME(dulek): This is code to maintain compatibility with Queens. We can