[aim] GBP based RPC

Change-Id: I6e2ae06abe24f617bd6964d8d43d2d14b503c7e1
This commit is contained in:
Ivar Lazzaro 2016-08-04 14:39:48 -07:00
parent 18a0972197
commit 682504575a
No known key found for this signature in database
GPG Key ID: 4A319E8712CE0874
7 changed files with 673 additions and 231 deletions

View File

@ -15,23 +15,20 @@
from aim import aim_manager
from aim.api import resource as aim_resource
from aim import config as aim_cfg
from aim import context as aim_context
from aim import utils as aim_utils
from neutron._i18n import _LE
from neutron._i18n import _LI
from neutron._i18n import _LW
from neutron.agent.linux import dhcp
from neutron.common import constants as n_constants
from neutron.common import rpc as n_rpc
from neutron.db import address_scope_db
from neutron.db import api as db_api
from neutron.db import l3_db
from neutron.db import models_v2
from neutron.extensions import portbindings
from neutron import manager
from neutron.plugins.ml2 import driver_api as api
from neutron.plugins.ml2 import rpc as ml2_rpc
from opflexagent import constants as ofcst
from opflexagent import rpc as o_rpc
from oslo_log import log
from gbpservice.neutron.extensions import cisco_apic
@ -46,7 +43,7 @@ LOG = log.getLogger(__name__)
# REVISIT(rkukura): Consider making these APIC name constants
# configurable, although changing them would break an existing
# deployment.
AP_NAME = 'NeutronAP'
ANY_FILTER_NAME = 'AnyFilter'
ANY_FILTER_ENTRY_NAME = 'AnyFilterEntry'
DEFAULT_VRF_NAME = 'DefaultVRF'
@ -74,20 +71,16 @@ class ApicMechanismDriver(api_plus.MechanismDriver):
self.name_mapper = apic_mapper.APICNameMapper(self.db, log)
self.aim = aim_manager.AimManager()
self._core_plugin = None
# REVISIT(rkukura): Read from config or possibly from AIM?
self.enable_dhcp_opt = True
self.enable_metadata_opt = True
self._setup_opflex_rpc_listeners()
def _setup_opflex_rpc_listeners(self):
self.opflex_endpoints = [o_rpc.GBPServerRpcCallback(self)]
self.opflex_topic = o_rpc.TOPIC_OPFLEX
self.opflex_conn = n_rpc.create_connection(new=True)
self.opflex_conn.create_consumer(
self.opflex_topic, self.opflex_endpoints, fanout=False)
self.opflex_conn.consume_in_threads()
self.aim_cfg_mgr = aim_cfg.ConfigManager(
aim_context.AimContext(db_api.get_session()),
host=aim_cfg.CONF.host)
# Get APIC configuration and subscribe for changes
self.enable_metadata_opt = self.aim_cfg_mgr.get_option_and_subscribe(
self._set_enable_metadata_opt, 'enable_optimized_metadata', 'apic')
self.enable_dhcp_opt = self.aim_cfg_mgr.get_option_and_subscribe(
self._set_enable_dhcp_opt, 'enable_optimized_dhcp', 'apic')
self.ap_name = self.aim_cfg_mgr.get_option_and_subscribe(
self._set_ap_name, 'apic_app_profile_name', 'apic')
def ensure_tenant(self, plugin_context, tenant_id):
LOG.debug("APIC AIM MD ensuring tenant_id: %s", tenant_id)
@ -112,9 +105,8 @@ class ApicMechanismDriver(api_plus.MechanismDriver):
tenant = aim_resource.Tenant(name=tenant_aname)
if not self.aim.get(aim_ctx, tenant):
self.aim.create(aim_ctx, tenant)
ap = aim_resource.ApplicationProfile(tenant_name=tenant_aname,
name=AP_NAME)
name=self.ap_name)
if not self.aim.get(aim_ctx, ap):
self.aim.create(aim_ctx, ap)
@ -163,11 +155,9 @@ class ApicMechanismDriver(api_plus.MechanismDriver):
limit_ip_learn_to_subnets=True,
ep_move_detect_mode='garp')
self.aim.create(aim_ctx, bd)
epg = aim_resource.EndpointGroup(tenant_name=tenant_aname,
app_profile_name=AP_NAME,
name=aname,
display_name=dname,
app_profile_name=self.ap_name,
name=aname, display_name=dname,
bd_name=aname)
self.aim.create(aim_ctx, epg)
@ -197,7 +187,7 @@ class ApicMechanismDriver(api_plus.MechanismDriver):
bd = self.aim.update(aim_ctx, bd, display_name=dname)
epg = aim_resource.EndpointGroup(tenant_name=tenant_aname,
app_profile_name=AP_NAME,
app_profile_name=self.ap_name,
name=aname)
epg = self.aim.update(aim_ctx, epg, display_name=dname)
@ -220,7 +210,7 @@ class ApicMechanismDriver(api_plus.MechanismDriver):
aim_ctx = aim_context.AimContext(session)
epg = aim_resource.EndpointGroup(tenant_name=tenant_aname,
app_profile_name=AP_NAME,
app_profile_name=self.ap_name,
name=aname)
self.aim.delete(aim_ctx, epg)
@ -257,7 +247,7 @@ class ApicMechanismDriver(api_plus.MechanismDriver):
sync_state = self._merge_status(aim_ctx, sync_state, aim_bd)
aim_epg = aim_resource.EndpointGroup(tenant_name=tenant_aname,
app_profile_name=AP_NAME,
app_profile_name=self.ap_name,
name=aname)
dist_names[cisco_apic.EPG] = aim_epg.dn
sync_state = self._merge_status(aim_ctx, sync_state, aim_epg)
@ -311,7 +301,6 @@ class ApicMechanismDriver(api_plus.MechanismDriver):
name=vrf_aname)
dist_names[cisco_apic.VRF] = aim_vrf.dn
sync_state = self._merge_status(aim_ctx, sync_state, aim_vrf)
result[cisco_apic.DIST_NAMES] = dist_names
result[cisco_apic.SYNC_STATE] = sync_state
@ -751,7 +740,7 @@ class ApicMechanismDriver(api_plus.MechanismDriver):
# Ensure network's EPG provides/consumes router's Contract.
aim_epg = aim_resource.EndpointGroup(tenant_name=network_tenant_aname,
app_profile_name=AP_NAME,
app_profile_name=self.ap_name,
name=network_aname)
aim_epg = self.aim.get(aim_ctx, aim_epg)
@ -864,7 +853,7 @@ class ApicMechanismDriver(api_plus.MechanismDriver):
if router_id not in router_ids:
aim_epg = aim_resource.EndpointGroup(
tenant_name=network_tenant_aname,
app_profile_name=AP_NAME,
app_profile_name=self.ap_name,
name=network_aname)
aim_epg = self.aim.get(aim_ctx, aim_epg)
@ -948,153 +937,6 @@ class ApicMechanismDriver(api_plus.MechanismDriver):
# TODO(rkukura): Implement DVS port binding
return False
# RPC Method
def get_gbp_details(self, context, **kwargs):
LOG.debug("APIC AIM MD handling get_gbp_details for: %s", kwargs)
try:
return self._get_gbp_details(context, kwargs)
except Exception as e:
device = kwargs.get('device')
LOG.error(_LE("An exception has occurred while retrieving device "
"gbp details for %s"), device)
LOG.exception(e)
return {'device': device}
def request_endpoint_details(self, context, **kwargs):
LOG.debug("APIC AIM MD handling get_endpoint_details for: %s", kwargs)
try:
request = kwargs.get('request')
result = {'device': request['device'],
'timestamp': request['timestamp'],
'request_id': request['request_id'],
'gbp_details': None,
'neutron_details': None}
result['gbp_details'] = self._get_gbp_details(context, request)
result['neutron_details'] = ml2_rpc.RpcCallbacks(
None, None).get_device_details(context, **request)
return result
except Exception as e:
LOG.error(_LE("An exception has occurred while requesting device "
"gbp details for %s"), request.get('device'))
LOG.exception(e)
return None
def _get_gbp_details(self, context, request):
device = request.get('device')
host = request.get('host')
core_plugin = manager.NeutronManager.get_plugin()
port_id = core_plugin._device_to_port_id(context, device)
port_context = core_plugin.get_bound_port_context(context, port_id,
host)
if not port_context:
LOG.warning(_LW("Device %(device)s requested by agent "
"%(agent_id)s not found in database"),
{'device': port_id,
'agent_id': request.get('agent_id')})
return {'device': device}
port = port_context.current
if port[portbindings.HOST_ID] != host:
LOG.warning(_LW("Device %(device)s requested by agent "
"%(agent_id)s not found bound for host %(host)s"),
{'device': port_id, 'host': host,
'agent_id': request.get('agent_id')})
return
session = context.session
with session.begin(subtransactions=True):
# REVISIT(rkukura): Should AIM resources be
# validated/created here if necessary? Also need to avoid
# creating any new name mappings without first getting
# their resource names.
# TODO(rkukura): For GBP, we need to use the EPG
# associated with the port's PT's PTG. For now, we just use the
# network's default EPG.
# TODO(rkukura): Use common tenant for shared networks.
# TODO(rkukura): Scope the tenant's AIM name.
network = port_context.network.current
epg_tenant_aname = self.name_mapper.tenant(session,
network['tenant_id'])
epg_aname = self.name_mapper.network(session, network['id'],
network['name'])
promiscuous_mode = port['device_owner'] in PROMISCUOUS_TYPES
details = {'allowed_address_pairs': port['allowed_address_pairs'],
'app_profile_name': AP_NAME,
'device': device,
'enable_dhcp_optimization': self.enable_dhcp_opt,
'enable_metadata_optimization': self.enable_metadata_opt,
'endpoint_group_name': epg_aname,
'host': host,
'l3_policy_id': network['tenant_id'], # TODO(rkukura)
'mac_address': port['mac_address'],
'port_id': port_id,
'promiscuous_mode': promiscuous_mode,
'ptg_tenant': epg_tenant_aname,
'subnets': self._get_subnet_details(core_plugin, context,
port)}
if port['device_owner'].startswith('compute:') and port['device_id']:
# REVISIT(rkukura): Do we need to map to name using nova client?
details['vm-name'] = port['device_id']
# TODO(rkukura): Mark active allowed_address_pairs
# TODO(rkukura): Add the following details common to the old
# GBP and ML2 drivers: floating_ip, host_snat_ips, ip_mapping,
# vrf_name, vrf_subnets, vrf_tenant.
# TODO(rkukura): Add the following details unique to the old
# ML2 driver: attestation, interface_mtu.
# TODO(rkukura): Add the following details unique to the old
# GBP driver: extra_details, extra_ips, fixed_ips,
# l2_policy_id.
return details
def _get_subnet_details(self, core_plugin, context, port):
subnets = core_plugin.get_subnets(
context,
filters={'id': [ip['subnet_id'] for ip in port['fixed_ips']]})
for subnet in subnets:
dhcp_ips = set()
for port in core_plugin.get_ports(
context, filters={
'network_id': [subnet['network_id']],
'device_owner': [n_constants.DEVICE_OWNER_DHCP]}):
dhcp_ips |= set([x['ip_address'] for x in port['fixed_ips']
if x['subnet_id'] == subnet['id']])
dhcp_ips = list(dhcp_ips)
if not subnet['dns_nameservers']:
# Use DHCP namespace port IP
subnet['dns_nameservers'] = dhcp_ips
# Ser Default route if needed
metadata = default = False
if subnet['ip_version'] == 4:
for route in subnet['host_routes']:
if route['destination'] == '0.0.0.0/0':
default = True
if route['destination'] == dhcp.METADATA_DEFAULT_CIDR:
metadata = True
# Set missing routes
if not default:
subnet['host_routes'].append(
{'destination': '0.0.0.0/0',
'nexthop': subnet['gateway_ip']})
if not metadata and dhcp_ips and not self.enable_metadata_opt:
subnet['host_routes'].append(
{'destination': dhcp.METADATA_DEFAULT_CIDR,
'nexthop': dhcp_ips[0]})
subnet['dhcp_server_ips'] = dhcp_ips
return subnets
@property
def plugin(self):
if not self._core_plugin:
@ -1175,3 +1017,13 @@ class ApicMechanismDriver(api_plus.MechanismDriver):
LOG.info(_LI("Creating default VRF for %s"), tenant_aname)
vrf = self.aim.create(aim_ctx, attrs)
return vrf
# DB Configuration callbacks
def _set_enable_metadata_opt(self, new_conf):
self.enable_metadata_opt = new_conf['value']
def _set_enable_dhcp_opt(self, new_conf):
self.enable_dhcp_opt = new_conf['value']
def _set_ap_name(self, new_conf):
self.ap_name = new_conf['value']

View File

@ -12,7 +12,10 @@
from aim.api import resource as aim_resource
from aim import context as aim_context
from neutron._i18n import _LE
from neutron._i18n import _LI
from neutron.agent.linux import dhcp
from neutron.common import constants as n_constants
from neutron import manager
from oslo_concurrency import lockutils
from oslo_log import helpers as log
@ -21,8 +24,6 @@ from oslo_log import log as logging
from gbpservice.neutron.extensions import cisco_apic
from gbpservice.neutron.extensions import cisco_apic_gbp as aim_ext
from gbpservice.neutron.extensions import group_policy as gpolicy
from gbpservice.neutron.plugins.ml2plus.drivers.apic_aim import (
mechanism_driver as aim_md)
from gbpservice.neutron.plugins.ml2plus.drivers.apic_aim import model
from gbpservice.neutron.services.grouppolicy.common import (
constants as gp_const)
@ -30,6 +31,10 @@ from gbpservice.neutron.services.grouppolicy.common import constants as g_const
from gbpservice.neutron.services.grouppolicy.common import exceptions as exc
from gbpservice.neutron.services.grouppolicy.drivers import (
neutron_resources as nrd)
from gbpservice.neutron.services.grouppolicy.drivers.cisco.apic import (
aim_mapping_rpc as aim_rpc)
from gbpservice.neutron.services.grouppolicy.drivers.cisco.apic import (
apic_mapping as amap)
from gbpservice.neutron.services.grouppolicy.drivers.cisco.apic import (
apic_mapping_lib as alib)
from gbpservice.neutron.services.grouppolicy import plugin as gbp_plugin
@ -45,9 +50,13 @@ ADDR_SCOPE_KEYS = ['address_scope_v4_id', 'address_scope_v6_id']
# Definitions duplicated from apicapi lib
APIC_OWNED = 'apic_owned_'
PROMISCUOUS_TYPES = [n_constants.DEVICE_OWNER_DHCP,
n_constants.DEVICE_OWNER_LOADBALANCER]
# TODO(ivar): define a proper promiscuous API
PROMISCUOUS_SUFFIX = 'promiscuous'
class AIMMappingDriver(nrd.CommonNeutronBase):
class AIMMappingDriver(nrd.CommonNeutronBase, aim_rpc.AIMMappingRPCMixin):
"""AIM Mapping Orchestration driver.
This driver maps GBP resources to the ACI-Integration-Module (AIM).
@ -59,6 +68,19 @@ class AIMMappingDriver(nrd.CommonNeutronBase):
self.db = model.DbModel()
super(AIMMappingDriver, self).initialize()
self._apic_aim_mech_driver = None
self.setup_opflex_rpc_listeners()
self._ensure_apic_infra()
def _ensure_apic_infra(self):
# TODO(ivar): remove this code from here
# with the old architecture, this is how we used to create APIC
# infra model. This is now undesirable for a plethora of reasons,
# some of which include the fact that we are adding a dependency
# to apic_ml2, apicapi, and we are also using the old configuration
# model to make this work. We need to decide how we actually want to
# infra configuration.
LOG.debug('Pushing APIC infra configuration')
amap.ApicMappingDriver.get_apic_manager()
@property
def aim_mech_driver(self):
@ -462,6 +484,7 @@ class AIMMappingDriver(nrd.CommonNeutronBase):
session, [aim_contract, aim_contract_subject])
def _aim_tenant_name(self, session, tenant_id):
# TODO(ivar): manage shared objects
tenant_name = self.name_mapper.tenant(session, tenant_id)
LOG.debug("Mapped tenant_id %(id)s to %(apic_name)s",
{'id': tenant_id, 'apic_name': tenant_name})
@ -484,7 +507,7 @@ class AIMMappingDriver(nrd.CommonNeutronBase):
kwargs = {'tenant_name': str(tenant_name),
'name': str(epg_name),
'display_name': display_name,
'app_profile_name': aim_md.AP_NAME}
'app_profile_name': self.aim_mech_driver.ap_name}
if bd_name:
kwargs['bd_name'] = bd_name
if bd_tenant_name:
@ -709,6 +732,16 @@ class AIMMappingDriver(nrd.CommonNeutronBase):
aim_contract_subject = self._aim_contract_subject(aim_contract)
self.aim.delete(aim_context, aim_contract_subject)
def _get_aim_default_endpoint_group(self, session, network):
epg_name = self.name_mapper.network(session, network['id'],
network['name'])
tenant_name = self.name_mapper.tenant(session, network['tenant_id'])
aim_ctx = aim_context.AimContext(session)
epg = aim_resource.EndpointGroup(
tenant_name=tenant_name,
app_profile_name=self.aim_mech_driver.ap_name, name=epg_name)
return self.aim.get(aim_ctx, epg)
def _aim_bridge_domain(self, session, tenant_id, network_id, network_name):
# This returns a new AIM BD resource
# TODO(Sumit): Use _aim_resource_by_name
@ -958,3 +991,164 @@ class AIMMappingDriver(nrd.CommonNeutronBase):
def _get_aim_context(self, context):
session = context._plugin_context.session
return aim_context.AimContext(session)
def _is_port_promiscuous(self, plugin_context, port):
pt = self._port_id_to_pt(plugin_context, port['id'])
if (pt and pt.get('cluster_id') and
pt.get('cluster_id') != pt['id']):
master = self._get_policy_target(plugin_context, pt['cluster_id'])
if master.get('group_default_gateway'):
return True
return (port['device_owner'] in PROMISCUOUS_TYPES or
port['name'].endswith(PROMISCUOUS_SUFFIX)) or (
pt and pt.get('group_default_gateway'))
def _is_dhcp_optimized(self, plugin_context, port):
return self.aim_mech_driver.enable_dhcp_opt
def _is_metadata_optimized(self, plugin_context, port):
return self.aim_mech_driver.enable_metadata_opt
def _get_port_epg(self, plugin_context, port):
ptg, pt = self._port_id_to_ptg(plugin_context, port['id'])
if ptg:
return self._get_aim_endpoint_group(plugin_context.session, ptg)
else:
# Return default EPG based on network
network = self._get_network(plugin_context, port['network_id'])
epg = self._get_aim_default_endpoint_group(plugin_context.session,
network)
if not epg:
# Something is wrong, default EPG doesn't exist.
# TODO(ivar): should rise an exception
LOG.error(_LE("Default EPG doesn't exist for "
"port %s"), port['id'])
return epg
def _get_subnet_details(self, plugin_context, port, details):
# L2P might not exist for a pure Neutron port
l2p = self._network_id_to_l2p(plugin_context, port['network_id'])
# TODO(ivar): support shadow network
#if not l2p and self._ptg_needs_shadow_network(context, ptg):
# l2p = self._get_l2_policy(context._plugin_context,
# ptg['l2_policy_id'])
subnets = self._get_subnets(
plugin_context,
filters={'id': [ip['subnet_id'] for ip in port['fixed_ips']]})
for subnet in subnets:
dhcp_ips = set()
for port in self._get_ports(
plugin_context,
filters={
'network_id': [subnet['network_id']],
'device_owner': [n_constants.DEVICE_OWNER_DHCP]}):
dhcp_ips |= set([x['ip_address'] for x in port['fixed_ips']
if x['subnet_id'] == subnet['id']])
dhcp_ips = list(dhcp_ips)
if not subnet['dns_nameservers']:
# Use DHCP namespace port IP
subnet['dns_nameservers'] = dhcp_ips
# Set Default & Metadata routes if needed
default_route = metadata_route = {}
if subnet['ip_version'] == 4:
for route in subnet['host_routes']:
if route['destination'] == '0.0.0.0/0':
default_route = route
if route['destination'] == dhcp.METADATA_DEFAULT_CIDR:
metadata_route = route
if not l2p or not l2p['inject_default_route']:
# In this case we do not want to send the default route
# and the metadata route. We also do not want to send
# the gateway_ip for the subnet.
if default_route:
subnet['host_routes'].remove(default_route)
if metadata_route:
subnet['host_routes'].remove(metadata_route)
del subnet['gateway_ip']
else:
# Set missing routes
if not default_route:
subnet['host_routes'].append(
{'destination': '0.0.0.0/0',
'nexthop': subnet['gateway_ip']})
if not metadata_route and dhcp_ips and (
not self._is_metadata_optimized(plugin_context, port)):
subnet['host_routes'].append(
{'destination': dhcp.METADATA_DEFAULT_CIDR,
'nexthop': dhcp_ips[0]})
subnet['dhcp_server_ips'] = dhcp_ips
return subnets
def _get_aap_details(self, plugin_context, port, details):
pt = self._port_id_to_pt(plugin_context, port['id'])
aaps = port['allowed_address_pairs']
if pt:
# Set the correct address ownership for this port
owned_addresses = self._get_owned_addresses(
plugin_context, pt['port_id'])
for allowed in aaps:
if allowed['ip_address'] in owned_addresses:
# Signal the agent that this particular address is active
# on its port
allowed['active'] = True
return aaps
def _get_port_address_scope(self, plugin_context, port):
for ip in port['fixed_ips']:
subnet = self._get_subnet(plugin_context, ip['subnet_id'])
subnetpool = self._get_subnetpools(
plugin_context, filters={'id': [subnet['subnetpool_id']]})
if subnetpool:
address_scope = self._get_address_scopes(
plugin_context,
filters={'id': [subnetpool[0]['address_scope_id']]})
if address_scope:
return address_scope[0]
def _get_port_address_scope_cached(self, plugin_context, port, cache):
if not cache.get('gbp_map_address_scope'):
cache['gbp_map_address_scope'] = (
self._get_port_address_scope(plugin_context, port))
return cache['gbp_map_address_scope']
def _get_address_scope_cached(self, plugin_context, vrf_id, cache):
if not cache.get('gbp_map_address_scope'):
address_scope = self._get_address_scopes(
plugin_context, filters={'id': [vrf_id]})
cache['gbp_map_address_scope'] = (address_scope[0] if
address_scope else None)
return cache['gbp_map_address_scope']
def _get_vrf_id(self, plugin_context, port, details):
# retrieve the Address Scope from the Neutron port
address_scope = self._get_port_address_scope_cached(
plugin_context, port, details['_cache'])
# TODO(ivar): what should we return if Address Scope doesn't exist?
return address_scope['id'] if address_scope else None
def _get_port_vrf(self, plugin_context, vrf_id, details):
address_scope = self._get_address_scope_cached(
plugin_context, vrf_id, details['_cache'])
if address_scope:
vrf_name = self.name_mapper.address_scope(
plugin_context.session, address_scope['id'],
address_scope['name'])
tenant_name = self.name_mapper.tenant(
plugin_context.session, address_scope['tenant_id'])
aim_ctx = aim_context.AimContext(plugin_context.session)
epg = aim_resource.VRF(tenant_name=tenant_name, name=vrf_name)
return self.aim.get(aim_ctx, epg)
def _get_vrf_subnets(self, plugin_context, vrf_id, details):
subnets = []
address_scope = self._get_address_scope_cached(
plugin_context, vrf_id, details['_cache'])
if address_scope:
# Get all the subnetpools associated with this Address Scope
subnetpools = self._get_subnetpools(
plugin_context,
filters={'address_scope_id': [address_scope['id']]})
for pool in subnetpools:
subnets.extend(pool['prefixes'])
return subnets

View File

@ -0,0 +1,223 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from apic_ml2.neutron.db import port_ha_ipaddress_binding as ha_ip_db
from neutron._i18n import _LE
from neutron._i18n import _LW
from neutron.common import rpc as n_rpc
from neutron.common import topics
from neutron.plugins.ml2 import rpc as ml2_rpc
from opflexagent import rpc as o_rpc
from oslo_log import log
from gbpservice.neutron.services.grouppolicy.drivers.cisco.apic import (
nova_client as nclient)
LOG = log.getLogger(__name__)
class AIMMappingRPCMixin(ha_ip_db.HAIPOwnerDbMixin):
"""RPC mixin for AIM mapping.
Collection of all the RPC methods consumed by the AIM mapping.
By defining the mixin requirements, we can potentially move the RPC
handling between GBP and Neutron preserving the same code base. Such
requirements might be easier to implement in some places (eg. won't
require model extensions) compared to others, based on the visibility
that each module has over the network abstraction.
"""
def setup_opflex_rpc_listeners(self):
self.notifier = o_rpc.AgentNotifierApi(topics.AGENT)
LOG.debug("Set up Opflex RPC listeners.")
self.opflex_endpoints = [
o_rpc.GBPServerRpcCallback(self, self.notifier)]
self.opflex_topic = o_rpc.TOPIC_OPFLEX
self.opflex_conn = n_rpc.create_connection(new=True)
self.opflex_conn.create_consumer(
self.opflex_topic, self.opflex_endpoints, fanout=False)
self.opflex_conn.consume_in_threads()
def get_vrf_details(self, context, **kwargs):
details = {'l3_policy_id': kwargs['vrf_id']}
details['_cache'] = {}
self._add_vrf_details(context, details['l3_policy_id'], details)
details.pop('_cache', None)
return details
def request_vrf_details(self, context, **kwargs):
return self.get_vrf_details(context, **kwargs)
def get_gbp_details(self, context, **kwargs):
LOG.debug("APIC AIM MD handling get_gbp_details for: %s", kwargs)
try:
return self._get_gbp_details(context, kwargs)
except Exception as e:
device = kwargs.get('device')
LOG.error(_LE("An exception has occurred while retrieving device "
"gbp details for %s"), device)
LOG.exception(e)
return {'device': device}
def request_endpoint_details(self, context, **kwargs):
LOG.debug("APIC AIM handling get_endpoint_details for: %s", kwargs)
try:
request = kwargs.get('request')
result = {'device': request['device'],
'timestamp': request['timestamp'],
'request_id': request['request_id'],
'gbp_details': self._get_gbp_details(context, request),
'neutron_details': ml2_rpc.RpcCallbacks(
None, None).get_device_details(context, **request)}
return result
except Exception as e:
LOG.error(_LE("An exception has occurred while requesting device "
"gbp details for %s"), request.get('device'))
LOG.exception(e)
return None
# Things you need in order to run this Mixin:
# - self._core_plugin: attribute that points to the Neutron core plugin;
# - self._is_port_promiscuous(context, port): define whether or not
# a port should be put in promiscuous mode;
# - self._get_port_epg(context, port): returns the AIM EPG for the specific
# port
# for both Neutron and GBP.
# - self._is_dhcp_optimized(context, port);
# - self._is_metadata_optimized(context, port);
# - self._get_vrf_id(context, port, details): VRF identified for the port;
def _get_gbp_details(self, context, request):
# TODO(ivar): should this happen within a single transaction? what are
# the concurrency risks?
device = request.get('device')
host = request.get('host')
core_plugin = self._core_plugin
port_id = core_plugin._device_to_port_id(context, device)
port_context = core_plugin.get_bound_port_context(context, port_id,
host)
if not port_context:
LOG.warning(_LW("Device %(device)s requested by agent "
"%(agent_id)s not found in database"),
{'device': port_id,
'agent_id': request.get('agent_id')})
return {'device': request.get('device')}
port = port_context.current
# NOTE(ivar): removed the PROXY_PORT_PREFIX hack.
# This was needed to support network services without hotplug.
epg = self._get_port_epg(context, port)
details = {'device': request.get('device'),
'enable_dhcp_optimization': self._is_dhcp_optimized(
context, port),
'enable_metadata_optimization': self._is_metadata_optimized(
context, port),
'port_id': port_id,
'mac_address': port['mac_address'],
'app_profile_name': epg.app_profile_name,
'tenant_id': port['tenant_id'],
'host': host,
# TODO(ivar): scope names, possibly through AIM or the name
# mapper
'ptg_tenant': epg.tenant_name,
'endpoint_group_name': epg.name,
'promiscuous_mode': self._is_port_promiscuous(context,
port),
'extra_ips': [],
'floating_ip': [],
'ip_mapping': [],
# Put per mac-address extra info
'extra_details': {}}
# Set VM name if needed.
if port['device_owner'].startswith('compute:') and port['device_id']:
vm = nclient.NovaClient().get_server(port['device_id'])
details['vm-name'] = vm.name if vm else port['device_id']
# NOTE(ivar): having these methods cleanly separated actually makes
# things less efficient by requiring lots of calls duplication.
# we could alleviate this by passing down a cache that stores commonly
# requested objects (like EPGs). 'details' itself could be used for
# such caching.
details['_cache'] = {}
details['l3_policy_id'] = self._get_vrf_id(context, port, details)
self._add_subnet_details(context, port, details)
self._add_nat_details(context, port, details)
self._add_allowed_address_pairs_details(context, port, details)
self._add_vrf_details(context, details['l3_policy_id'], details)
self._add_extra_details(context, port, details)
details.pop('_cache', None)
LOG.debug("Details for port %s : %s" % (port['id'], details))
return details
def _get_owned_addresses(self, plugin_context, port_id):
return set(self.ha_ip_handler.get_ha_ipaddresses_for_port(port_id))
# Child class needs to support:
# - self._get_subnet_details(context, port, details)
def _add_subnet_details(self, context, port, details):
# This method needs to define requirements for this Mixin's child
# classes in order to fill the following result parameters:
# - subnets;
details['subnets'] = self._get_subnet_details(context, port, details)
def _add_nat_details(self, context, port, details):
# TODO(ivar): How to retrieve NAT details depends on ES implementation
# This method needs to define requirements for this Mixin's child
# classes in order to fill the following result parameters:
# - floating_ip;
# - ip_mapping;
# - host_snat_ips.
pass
# Child class needs to support:
# - self._get_aap_details(context, port, details)
def _add_allowed_address_pairs_details(self, context, port, details):
# This method needs to define requirements for this Mixin's child
# classes in order to fill the following result parameters:
# - allowed_address_pairs
# This should take care of realizing whether a given address is
# active in the specific port
details['allowed_address_pairs'] = self._get_aap_details(context, port,
details)
# Child class needs to support:
# - self._get_port_vrf(context, port, details): AIM VRF for the port;
# - self._get_vrf_subnets(context, port, details): Subnets managed
# by the port's VRF.
def _add_vrf_details(self, context, vrf_id, details):
# TODO(ivar): VRF details depend on Address Scopes from Neutron
# This method needs to define requirements for this Mixin's child
# classes in order to fill the following result parameters:
# - l3_policy_id;
# - vrf_tenant;
# - vrf_name;
# - vrf_subnets.
aim_vrf = self._get_port_vrf(context, vrf_id, details)
if aim_vrf:
# TODO(ivar): scope
details['vrf_tenant'] = aim_vrf.tenant_name
details['vrf_name'] = aim_vrf.name
details['vrf_subnets'] = self._get_vrf_subnets(context, vrf_id,
details)
def _add_extra_details(self, context, port, details):
# TODO(ivar): Extra details depend on HA and SC implementation
# This method needs to define requirements for this Mixin's child
# classes in order to fill per-mac address extra information.
# What is an "End of the Chain" port for Neutron?
pass

View File

@ -262,7 +262,7 @@ class ApicMappingDriver(api.ResourceMappingDriver,
@staticmethod
def get_apic_manager(client=True):
if not ApicMappingDriver.manager:
apic_config = cfg.CONF.ml2_cisco_apic
apic_config = cfg.CONF.apic
network_config = {
'vlan_ranges': cfg.CONF.ml2_type_vlan.network_vlan_ranges,
'vni_ranges': cfg.CONF.ml2_type_vxlan.vni_ranges,

View File

@ -10,6 +10,7 @@
# License for the specific language governing permissions and limitations
# under the License.
from neutron import manager
from oslo_log import helpers as log
from gbpservice.neutron.services.grouppolicy.common import exceptions as exc
@ -31,8 +32,16 @@ class CommonNeutronBase(ipd.ImplicitPolicyBase, rmd.OwnedResourcesOperations,
def initialize(self):
# REVISIT: Check if this is still required
self._cached_agent_notifier = None
self._gbp_plugin = None
super(CommonNeutronBase, self).initialize()
@property
def gbp_plugin(self):
if not self._gbp_plugin:
self._gbp_plugin = (manager.NeutronManager.get_service_plugins()
.get("GROUP_POLICY"))
return self._gbp_plugin
@log.log_method_call
def create_l2_policy_precommit(self, context):
l2p_db = context._plugin._get_l2_policy(
@ -71,3 +80,23 @@ class CommonNeutronBase(ipd.ImplicitPolicyBase, rmd.OwnedResourcesOperations,
l3p_id = l2p_db['l3_policy_id']
l2p_db.update({'l3_policy_id': None})
self._cleanup_l3_policy(context, l3p_id, clean_session=False)
def _port_id_to_pt(self, plugin_context, port_id):
pts = self.gbp_plugin.get_policy_targets(
plugin_context, {'port_id': [port_id]})
if pts:
return pts[0]
def _port_id_to_ptg(self, plugin_context, port_id):
pt = self._port_id_to_pt(plugin_context, port_id)
if pt:
return self.gbp_plugin.get_policy_target_group(
plugin_context, pt['policy_target_group_id']), pt
return None, None
def _network_id_to_l2p(self, context, network_id):
l2ps = self.gbp_plugin.get_l2_policies(
context, filters={'network_id': [network_id]})
for l2p in l2ps:
if l2p['network_id'] == network_id:
return l2p

View File

@ -13,8 +13,11 @@
# License for the specific language governing permissions and limitations
# under the License.
import mock
from aim import aim_manager
from aim.api import resource as aim_resource
from aim import config as aim_cfg
from aim import context as aim_context
from aim.db import model_base as aim_model_base
from keystoneclient.v3 import client as ksc_client
@ -70,9 +73,26 @@ class FakeKeystoneClient(object):
# TODO(rkukura): Also run Neutron L3 tests on apic_aim L3 plugin.
class ApicAimTestMixin(object):
def initialize_db_config(self, session):
aim_cfg._get_option_subscriber_manager = mock.Mock()
self.aim_cfg_manager = aim_cfg.ConfigManager(
aim_context.AimContext(db_session=session), '')
self.aim_cfg_manager.replace_all(aim_cfg.CONF)
def set_override(self, item, value, group=None, host=''):
# Override DB config as well
if group:
aim_cfg.CONF.set_override(item, value, group)
else:
aim_cfg.CONF.set_override(item, value)
self.aim_cfg_manager.to_db(aim_cfg.CONF, host=host)
class ApicAimTestCase(test_address_scope.AddressScopeTestCase,
test_l3.L3NatTestCaseMixin):
test_l3.L3NatTestCaseMixin, ApicAimTestMixin):
def setUp(self):
# Enable the test mechanism driver to ensure that
# we can successfully call through to all mechanism
@ -97,6 +117,12 @@ class ApicAimTestCase(test_address_scope.AddressScopeTestCase,
'L3_ROUTER_NAT':
'gbpservice.neutron.services.apic_aim.l3_plugin.ApicL3Plugin'}
engine = db_api.get_engine()
aim_model_base.Base.metadata.create_all(engine)
self.db_session = db_api.get_session()
self.initialize_db_config(self.db_session)
super(ApicAimTestCase, self).setUp(PLUGIN_NAME,
service_plugins=service_plugins)
ext_mgr = extensions.PluginAwareExtensionManager.get_instance()
@ -105,10 +131,6 @@ class ApicAimTestCase(test_address_scope.AddressScopeTestCase,
self.saved_keystone_client = ksc_client.Client
ksc_client.Client = FakeKeystoneClient
engine = db_api.get_engine()
aim_model_base.Base.metadata.create_all(engine)
self.plugin = manager.NeutronManager.get_plugin()
self.plugin.start_rpc_listeners()
self.driver = self.plugin.mechanism_manager.mech_drivers[
@ -116,7 +138,7 @@ class ApicAimTestCase(test_address_scope.AddressScopeTestCase,
self.l3_plugin = manager.NeutronManager.get_service_plugins()[
service_constants.L3_ROUTER_NAT]
self.aim_mgr = aim_manager.AimManager()
self._app_profile_name = 'NeutronAP'
self._app_profile_name = self.driver.ap_name
self._tenant_name = self._map_name({'id': 'test-tenant',
'name': 'TestTenantName'})
@ -133,6 +155,11 @@ class ApicAimTestCase(test_address_scope.AddressScopeTestCase,
# Assumes no conflicts and no substition needed.
return resource['name'][:40] + '_' + resource['id'][:5]
def _find_by_dn(self, dn, cls):
aim_ctx = aim_context.AimContext(self.db_session)
resource = cls.from_dn(dn)
return self.aim_mgr.get(aim_ctx, resource)
class TestAimMapping(ApicAimTestCase):
def _get_tenant(self, tenant_name, should_exist=True):
@ -185,7 +212,7 @@ class TestAimMapping(ApicAimTestCase):
def _get_epg(self, epg_name, tenant_name, app_profile_name,
should_exist=True):
session = db_api.get_session()
session = self.db_session
aim_ctx = aim_context.AimContext(session)
epg = aim_resource.EndpointGroup(tenant_name=tenant_name,
app_profile_name=app_profile_name,
@ -860,38 +887,6 @@ class TestPortBinding(ApicAimTestCase):
self.assertEqual({'port_filter': False, 'ovs_hybrid_plug': False},
port['binding:vif_details'])
# Verify get_gbp_details.
device = 'tap%s' % port_id
details = self.driver.get_gbp_details(context.get_admin_context(),
device=device, host='host1')
self.assertEqual(port['allowed_address_pairs'],
details['allowed_address_pairs'])
self.assertEqual('NeutronAP', details['app_profile_name'])
self.assertEqual(device, details['device'])
self.assertTrue(details['enable_dhcp_optimization'])
self.assertTrue(details['enable_metadata_optimization'])
self.assertIn('net1', details['endpoint_group_name'])
self.assertEqual('host1', details['host'])
self.assertEqual('test-tenant', details['l3_policy_id'])
self.assertEqual(port['mac_address'], details['mac_address'])
self.assertEqual(port_id, details['port_id'])
self.assertFalse(details['promiscuous_mode'])
self.assertIn('TestTenantName', details['ptg_tenant'])
self.assertEqual(1, len(details['subnets']))
self.assertEqual('someid', details['vm-name'])
# Verify request_endpoint_details.
req_details = self.driver.request_endpoint_details(
context.get_admin_context(),
request={'device': 'tap%s' % port_id, 'host': 'host1',
'timestamp': 0, 'request_id': 'a_request_id'})
self.assertEqual(details, req_details['gbp_details'])
self.assertEqual(port_id, req_details['neutron_details']['port_id'])
# TODO(rkukura): Verify subnet details. Also, test with
# variations of DHCP IPs on subnet, dns_nameservers and
# host_routes values, etc..
# TODO(rkukura): Add tests for promiscuous_mode cases.
def test_bind_unsupported_vnic_type(self):

View File

@ -21,14 +21,16 @@ from aim.db import model_base as aim_model_base
from keystoneclient.v3 import client as ksc_client
from neutron import context as nctx
from neutron.db import api as db_api
from neutron.tests.unit.extensions import test_address_scope
from opflexagent import constants as ocst
import webob.exc
from gbpservice.neutron.plugins.ml2plus.drivers.apic_aim import (
mechanism_driver as aim_md)
from gbpservice.neutron.plugins.ml2plus.drivers.apic_aim import model
from gbpservice.neutron.services.grouppolicy.common import (
constants as gp_const)
from gbpservice.neutron.services.grouppolicy import config
from gbpservice.neutron.services.grouppolicy.drivers.cisco.apic import (
apic_mapping as amap)
from gbpservice.neutron.services.grouppolicy.drivers.cisco.apic import (
apic_mapping_lib as alib)
from gbpservice.neutron.tests.unit.plugins.ml2plus import (
@ -52,10 +54,17 @@ DEFAULT_FILTER_ENTRY = {'arp_opcode': u'unspecified',
'source_to_port': u'unspecified',
'stateful': False,
'tcp_flags': u'unspecified'}
AGENT_TYPE = ocst.AGENT_TYPE_OPFLEX_OVS
AGENT_CONF = {'alive': True, 'binary': 'somebinary',
'topic': 'sometopic', 'agent_type': AGENT_TYPE,
'configurations': {'opflex_networks': None,
'bridge_mappings': {'physnet1': 'br-eth1'}}}
class AIMBaseTestCase(test_nr_base.CommonNeutronBaseTestCase,
test_ext_base.ExtensionDriverTestBase):
test_ext_base.ExtensionDriverTestBase,
test_aim_md.ApicAimTestMixin,
test_address_scope.AddressScopeTestCase):
_extension_drivers = ['aim_extension']
_extension_path = None
@ -70,10 +79,16 @@ class AIMBaseTestCase(test_nr_base.CommonNeutronBaseTestCase,
# performed up until that point (including those in the aim_mapping)
# driver are rolled back.
policy_drivers = policy_drivers or ['aim_mapping', 'dummy']
self.agent_conf = AGENT_CONF
ml2_opts = ml2_options or {'mechanism_drivers': ['logger', 'apic_aim'],
'extension_drivers': ['apic_aim'],
'type_drivers': ['opflex', 'local', 'vlan'],
'tenant_network_types': ['opflex']}
engine = db_api.get_engine()
aim_model_base.Base.metadata.create_all(engine)
amap.ApicMappingDriver.get_apic_manager = mock.Mock()
self.db_session = db_api.get_session()
self.initialize_db_config(self.db_session)
super(AIMBaseTestCase, self).setUp(
policy_drivers=policy_drivers, core_plugin=core_plugin,
ml2_options=ml2_opts, l3_plugin=l3_plugin,
@ -89,16 +104,22 @@ class AIMBaseTestCase(test_nr_base.CommonNeutronBaseTestCase,
self._neutron_context = nctx.Context(
'', kwargs.get('tenant_id', self._tenant_id),
is_admin_context=False)
self._neutron_context._session = self.db_session
self._neutron_admin_context = nctx.get_admin_context()
engine = db_api.get_engine()
aim_model_base.Base.metadata.create_all(engine)
self._aim_mgr = None
self._aim_context = aim_context.AimContext(
self._neutron_context.session)
self._driver = None
self._dummy = None
self._name_mapper = None
self._driver = None
nova_client = mock.patch(
'gbpservice.neutron.services.grouppolicy.drivers.cisco.'
'apic.nova_client.NovaClient.get_server').start()
vm = mock.Mock()
vm.name = 'someid'
nova_client.return_value = vm
self._db = model.DbModel()
@ -111,6 +132,13 @@ class AIMBaseTestCase(test_nr_base.CommonNeutronBaseTestCase,
ksc_client.Client = self.saved_keystone_client
super(AIMBaseTestCase, self).tearDown()
def _bind_port_to_host(self, port_id, host):
data = {'port': {'binding:host_id': host,
'device_owner': 'compute:',
'device_id': 'someid'}}
return super(AIMBaseTestCase, self)._bind_port_to_host(
port_id, host, data=data)
@property
def driver(self):
# aim_mapping policy driver reference
@ -149,7 +177,7 @@ class AIMBaseTestCase(test_nr_base.CommonNeutronBaseTestCase,
def _test_aim_resource_status(self, aim_resource_obj, gbp_resource):
aim_status = self.aim_mgr.get_status(self._aim_context,
aim_resource_obj)
aim_resource_obj)
if aim_status.is_error():
self.assertEqual(gp_const.STATUS_ERROR, gbp_resource['status'])
elif aim_status.is_build():
@ -605,7 +633,7 @@ class TestPolicyTargetGroup(AIMBaseTestCase):
self._neutron_context.session, ptg_id, ptg_name))
aim_tenant_name = str(self.name_mapper.tenant(
self._neutron_context.session, self._tenant_id))
aim_app_profile_name = aim_md.AP_NAME
aim_app_profile_name = self.driver.aim_mech_driver.ap_name
aim_app_profiles = self.aim_mgr.find(
self._aim_context, aim_resource.ApplicationProfile,
tenant_name=aim_tenant_name, name=aim_app_profile_name)
@ -679,7 +707,7 @@ class TestPolicyTargetGroup(AIMBaseTestCase):
self._neutron_context.session, ptg_id, ptg_name))
aim_tenant_name = str(self.name_mapper.tenant(
self._neutron_context.session, self._tenant_id))
aim_app_profile_name = aim_md.AP_NAME
aim_app_profile_name = self.driver.aim_mech_driver.ap_name
aim_app_profiles = self.aim_mgr.find(
self._aim_context, aim_resource.ApplicationProfile,
tenant_name=aim_tenant_name, name=aim_app_profile_name)
@ -821,6 +849,127 @@ class TestPolicyTarget(AIMBaseTestCase):
res = req.get_response(self.api)
self.assertEqual(webob.exc.HTTPNotFound.code, res.status_int)
def _verify_gbp_details_assertions(self, mapping, req_mapping, port_id,
expected_epg_name, expected_epg_tenant,
subnet):
self.assertEqual(mapping, req_mapping['gbp_details'])
self.assertEqual(port_id, mapping['port_id'])
self.assertEqual(expected_epg_name, mapping['endpoint_group_name'])
self.assertEqual(expected_epg_tenant, mapping['ptg_tenant'])
self.assertEqual('someid', mapping['vm-name'])
self.assertTrue(mapping['enable_dhcp_optimization'])
self.assertFalse(mapping['enable_metadata_optimization'])
self.assertEqual(1, len(mapping['subnets']))
self.assertEqual(subnet['subnet']['cidr'],
mapping['subnets'][0]['cidr'])
# Verify Neutron details
self.assertEqual(port_id, req_mapping['neutron_details']['port_id'])
def _verify_vrf_details_assertions(self, vrf_mapping, expected_vrf_name,
expected_l3p_id, expected_subnets,
expected_vrf_tenant):
self.assertEqual(expected_vrf_name, vrf_mapping['vrf_name'])
self.assertEqual(expected_vrf_tenant, vrf_mapping['vrf_tenant'])
self.assertEqual(set(expected_subnets),
set(vrf_mapping['vrf_subnets']))
self.assertEqual(expected_l3p_id,
vrf_mapping['l3_policy_id'])
def _do_test_get_gbp_details(self):
l3p = self.create_l3_policy(name='myl3')['l3_policy']
l2p = self.create_l2_policy(name='myl2',
l3_policy_id=l3p['id'])['l2_policy']
ptg = self.create_policy_target_group(
name="ptg1", l2_policy_id=l2p['id'])['policy_target_group']
pt1 = self.create_policy_target(
policy_target_group_id=ptg['id'])['policy_target']
self._bind_port_to_host(pt1['port_id'], 'h1')
mapping = self.driver.get_gbp_details(self._neutron_admin_context,
device='tap%s' % pt1['port_id'], host='h1')
req_mapping = self.driver.request_endpoint_details(
nctx.get_admin_context(),
request={'device': 'tap%s' % pt1['port_id'], 'host': 'h1',
'timestamp': 0, 'request_id': 'request_id'})
epg_name = self.name_mapper.policy_target_group(
self._neutron_context.session, ptg['id'], ptg['name'])
epg_tenant = self.name_mapper.tenant(self._neutron_context.session,
ptg['tenant_id'])
subnet = self._get_object('subnets', ptg['subnets'][0], self.api)
self._verify_gbp_details_assertions(
mapping, req_mapping, pt1['port_id'], epg_name, epg_tenant, subnet)
# Create event on a second host to verify that the SNAT
# port gets created for this second host
pt2 = self.create_policy_target(
policy_target_group_id=ptg['id'])['policy_target']
self._bind_port_to_host(pt2['port_id'], 'h1')
mapping = self.driver.get_gbp_details(self._neutron_admin_context,
device='tap%s' % pt2['port_id'], host='h2')
self.assertEqual(pt2['port_id'], mapping['port_id'])
def _do_test_gbp_details_no_pt(self):
# Create port and bind it
address_scope = self._make_address_scope(
self.fmt, 4, name='as1')['address_scope']
subnetpool = self._make_subnetpool(
self.fmt, ['10.10.0.0/26', '1.1.0.0/16'],
name='as1', address_scope_id=address_scope['id'],
tenant_id=self._tenant_id)['subnetpool']
self._make_subnetpool(
self.fmt, ['2.1.0.0/16'],
name='as2', address_scope_id=address_scope['id'],
tenant_id=self._tenant_id)
with self.network() as network:
with self.subnet(network=network, cidr='1.1.2.0/24',
subnetpool_id=subnetpool['id']) as subnet:
with self.port(subnet=subnet) as port:
port_id = port['port']['id']
network = network['network']
self._bind_port_to_host(port_id, 'h1')
mapping = self.driver.get_gbp_details(
self._neutron_admin_context, device='tap%s' % port_id,
host='h1')
req_mapping = self.driver.request_endpoint_details(
nctx.get_admin_context(),
request={'device': 'tap%s' % port_id, 'host': 'h1',
'timestamp': 0, 'request_id': 'request_id'})
vrf_mapping = self.driver.get_vrf_details(
self._neutron_admin_context,
vrf_id=address_scope['id'])
epg_name = self.name_mapper.network(
self._neutron_context.session, network['id'],
network['name'])
epg_tenant = self.name_mapper.tenant(
self._neutron_context.session, network['tenant_id'])
self._verify_gbp_details_assertions(
mapping, req_mapping, port_id, epg_name, epg_tenant,
subnet)
vrf_name = self.name_mapper.address_scope(
self._neutron_context.session, address_scope['id'],
address_scope['name'])
# Verify for both GBP details and VRF details
self._verify_vrf_details_assertions(
mapping, vrf_name, address_scope['id'],
['10.10.0.0/26', '1.1.0.0/16', '2.1.0.0/16'],
epg_tenant)
self._verify_vrf_details_assertions(
vrf_mapping, vrf_name, address_scope['id'],
['10.10.0.0/26', '1.1.0.0/16', '2.1.0.0/16'],
epg_tenant)
def test_get_gbp_details(self):
self._do_test_get_gbp_details()
def test_get_gbp_details_no_pt(self):
# Test that traditional Neutron ports behave correctly from the
# RPC perspective
self._do_test_gbp_details_no_pt()
class TestPolicyTargetRollback(AIMBaseTestCase):