Add driver framework

This abstracts all router-specific functionality into a router driver,
paving the way for the Rug managing other advanced services.

'rug-ctl router {debug, manage, update, rebuild}' commands have been
deprecated in favor of 'rug-ctl resource' equivalents, to be removed
in M.  The newer 'resource' commands are meant to address any
Rug-managed resource.  These requests are routed to the correct place
on the server-side, instead of a Neutron lookup on the client side.

Co-authored-by: Adam Gandelman <adamg@ubuntu.com>

Partially Implements: blueprint appliance-provisioning-driver
Change-Id: I1d2c68b81d28ebb0ca6ce4dcad237c4803bb96d6
This commit is contained in:
David Lenwell 2015-08-20 09:15:59 -07:00 committed by Adam Gandelman
parent 4845bb79ab
commit c8f14d6e9d
39 changed files with 2743 additions and 1449 deletions

4
.gitignore vendored
View File

@ -42,3 +42,7 @@ test.conf
*.db
*.db_clean
#macos hidden files
.DS_Store
._.DS_Store

View File

@ -43,11 +43,10 @@ neutron_opts = [
cfg.StrOpt('external_network_id'),
cfg.StrOpt('management_subnet_id'),
cfg.StrOpt('external_subnet_id'),
cfg.StrOpt('router_image_uuid'),
cfg.StrOpt('management_prefix', default='fdca:3ba5:a17a:acda::/64'),
cfg.StrOpt('external_prefix', default='172.16.77.0/24'),
cfg.IntOpt('akanda_mgt_service_port', default=5000),
cfg.StrOpt('router_instance_flavor', default=1),
cfg.StrOpt('default_instance_flavor', default=1),
cfg.StrOpt('interface_driver'),
]
@ -307,6 +306,7 @@ class Neutron(object):
if detailed:
return [Router.from_dict(r) for r in
self.rpc_client.get_routers()]
routers = self.api_client.list_routers().get('routers', [])
return [Router.from_dict(r) for r in routers]
@ -547,8 +547,8 @@ class Neutron(object):
# what the status of the router should be. Log the error
# but otherwise ignore it.
LOG.info(_LI(
'ignoring failure to update status for router %s to %s: %s'),
router_id, status, e,
'ignoring failure to update status for %s to %s: %s'),
id, status, e,
)
def clear_device_id(self, port):

View File

@ -16,7 +16,7 @@
from datetime import datetime
from novaclient.v1_1 import client
from novaclient import client
from novaclient import exceptions as novaclient_exceptions
from oslo_config import cfg
@ -28,17 +28,17 @@ LOG = logging.getLogger(__name__)
OPTIONS = [
cfg.StrOpt(
'router_ssh_public_key',
'ssh_public_key',
help="Path to the SSH public key for the 'akanda' user within "
"router appliance instances",
"appliance instances",
default='/etc/akanda-rug/akanda.pub')
]
cfg.CONF.register_opts(OPTIONS)
class InstanceInfo(object):
def __init__(self, instance_id, name, management_port=None, ports=(),
image_uuid=None, booting=False, last_boot=None):
def __init__(self, instance_id, name, management_port=None,
ports=(), image_uuid=None, booting=False, last_boot=None):
self.id_ = instance_id
self.name = name
self.image_uuid = image_uuid
@ -80,6 +80,7 @@ class Nova(object):
def __init__(self, conf):
self.conf = conf
self.client = client.Client(
'2',
conf.admin_user,
conf.admin_password,
conf.admin_tenant_name,
@ -87,20 +88,22 @@ class Nova(object):
auth_system=conf.auth_strategy,
region_name=conf.auth_region)
def create_instance(self, router_id, image_uuid, make_ports_callback):
def create_instance(self,
name, image_uuid, flavor, make_ports_callback):
mgt_port, instance_ports = make_ports_callback()
nics = [{'net-id': p.network_id, 'v4-fixed-ip': '', 'port-id': p.id}
nics = [{'net-id': p.network_id,
'v4-fixed-ip': '',
'port-id': p.id}
for p in ([mgt_port] + instance_ports)]
LOG.debug('creating instance for router %s with image %s',
router_id, image_uuid)
name = 'ak-' + router_id
LOG.debug('creating instance %s with image %s',
name, image_uuid)
server = self.client.servers.create(
name,
image=image_uuid,
flavor=self.conf.router_instance_flavor,
flavor=flavor,
nics=nics,
config_drive=True,
userdata=_format_userdata(mgt_port)
@ -120,32 +123,31 @@ class Nova(object):
instance_info.nova_status = server.status
return instance_info
def get_instance_info_for_obj(self, router_id):
"""Retrieves an InstanceInfo object for a given router_id
def get_instance_info(self, name):
"""Retrieves an InstanceInfo object for a given instance name
:param router_id: UUID of the router being queried
:param name: name of the instance being queried
:returns: an InstanceInfo object representing the router instance
"""
instance = self.get_instance_for_obj(router_id)
instance = self.get_instance_for_obj(name)
if instance:
return InstanceInfo(
instance.id,
instance.name,
name,
image_uuid=instance.image['id']
)
def get_instance_for_obj(self, router_id):
"""Retreives a nova server for a given router_id, based on instance
name.
def get_instance_for_obj(self, name):
"""Retreives a nova server for a given instance name.
:param router_id: UUID of the router being queried
:param name: name of the instance being queried
:returns: a novaclient.v2.servers.Server object or None
"""
instances = self.client.servers.list(
search_opts=dict(name='ak-' + router_id)
search_opts=dict(name=name)
)
if instances:
@ -167,14 +169,18 @@ class Nova(object):
def destroy_instance(self, instance_info):
if instance_info:
LOG.debug('deleting instance for router %s', instance_info.name)
LOG.debug('deleting instance %s', instance_info.name)
self.client.servers.delete(instance_info.id_)
def boot_instance(self, prev_instance_info, router_id, router_image_uuid,
def boot_instance(self,
prev_instance_info,
name,
image_uuid,
flavor,
make_ports_callback):
if not prev_instance_info:
instance = self.get_instance_for_obj(router_id)
instance = self.get_instance_for_obj(name)
else:
instance = self.get_instance_by_id(prev_instance_info.id_)
@ -192,14 +198,16 @@ class Nova(object):
instance.name,
image_uuid=instance.image['id']
)
instance_info.nova_status = instance.status
return instance_info
self.client.servers.delete(instance.id)
return None
# it is now safe to attempt boot
instance_info = self.create_instance(
router_id,
router_image_uuid,
name,
image_uuid,
flavor,
make_ports_callback
)
return instance_info
@ -239,8 +247,8 @@ final_message: "Akanda appliance is running"
""" # noqa
def _router_ssh_key():
key = cfg.CONF.router_ssh_public_key
def _ssh_key():
key = cfg.CONF.ssh_public_key
if not key:
return ''
try:
@ -253,7 +261,7 @@ def _router_ssh_key():
def _format_userdata(mgt_port):
ctxt = {
'ssh_public_key': _router_ssh_key(),
'ssh_public_key': _ssh_key(),
'mac_address': mgt_port.mac_address,
'ip_address': mgt_port.fixed_ips[0].ip_address,
}

View File

@ -21,18 +21,28 @@ import argparse
import subprocess
import sys
from akanda.rug.common.i18n import _LW
from akanda.rug import commands
from akanda.rug.cli import message
from akanda.rug.api import nova, neutron
from novaclient import exceptions
from oslo_config import cfg
from oslo_log import log as logging
from neutronclient.v2_0 import client
LOG = logging.getLogger(__name__)
class _TenantRouterCmd(message.MessageSending):
def get_parser(self, prog_name):
new_cmd = str(prog_name).replace('router', 'resource')
LOG.warning(_LW(
"WARNING: '%s' is deprecated in favor of '%s' and will be removed "
"in the Mitaka release.") % (prog_name, new_cmd))
# Bypass the direct base class to let us put the tenant id
# argument first
p = super(_TenantRouterCmd, self).get_parser(prog_name)

View File

@ -27,12 +27,18 @@ WORKERS_DEBUG = 'workers-debug'
# Router commands expect a 'router_id' argument in the payload with
# the UUID of the router
# Put a router in debug/manage mode
# Put a resource in debug/manage mode
RESOURCE_DEBUG = 'resource-debug'
RESOURCE_MANAGE = 'resource-manage'
# Send an updated config to the resource whether it is needed or not
RESOURCE_UPDATE = 'resource-update'
# Rebuild a resource from scratch
RESOURCE_REBUILD = 'resource-rebuild'
# These are the deprecated versions of the above, to be removed in M.
ROUTER_DEBUG = 'router-debug'
ROUTER_MANAGE = 'router-manage'
# Send an updated config to the router whether it is needed or not
ROUTER_UPDATE = 'router-update'
# Rebuild a router from scratch
ROUTER_REBUILD = 'router-rebuild'
# Put a tenant in debug/manage mode
@ -43,7 +49,7 @@ TENANT_MANAGE = 'tenant-manage'
# Configuration commands
CONFIG_RELOAD = 'config-reload'
# Force a poll of all routers right now
# Force a poll of all resources right now
POLL = 'poll'
GLOBAL_DEBUG = 'global-debug'

View File

@ -40,37 +40,37 @@ class Connection(object):
pass
@abc.abstractmethod
def enable_router_debug(self, router_uuid, reason=None):
"""Enter a router into debug mode
def enable_resource_debug(self, resource_uuid, reason=None):
"""Enter a resource into debug mode
:param router_uuid: str uuid of the router to be placed into debug
:param resource_uuid: str uuid of the resource to be placed into debug
mode
:param reason: str (optional) reason for entering router into debug
:param reason: str (optional) reason for entering resource into debug
mode
"""
@abc.abstractmethod
def disable_router_debug(self, router_uuid):
"""Remove a router into debug mode
def disable_resource_debug(self, resource_uuid):
"""Remove a resource into debug mode
:param router_uuid: str uuid of the router to be removed from debug
:param resource_uuid: str uuid of the resource to be removed from debug
mode
"""
@abc.abstractmethod
def router_in_debug(self, router_uuid):
"""Determines if a router is in debug mode
def resource_in_debug(self, resource_uuid):
"""Determines if a resource is in debug mode
:param router_uuid: str the uuid of the router to query
:returns: tuple (False, None) if router is not in debug mode or
:param resource_uuid: str the uuid of the resource to query
:returns: tuple (False, None) if resource is not in debug mode or
(True, "reason") if it is.
"""
@abc.abstractmethod
def routers_in_debug(self):
"""Queries all routers in debug mode
def resources_in_debug(self):
"""Queries all resources in debug mode
:returns: a set of (router_uuid, reason) tuples
:returns: a set of (resource_uuid, reason) tuples
"""
@abc.abstractmethod

View File

@ -32,14 +32,14 @@ import sqlalchemy as sa
def upgrade():
op.create_table(
'router_debug',
'resource_debug',
sa.Column('created_at', sa.DateTime(), nullable=True),
sa.Column('updated_at', sa.DateTime(), nullable=True),
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('uuid', sa.String(length=36), nullable=False),
sa.Column('reason', sa.String(length=255), nullable=True),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('uuid', name='uniq_debug_router0uuid'),
sa.UniqueConstraint('uuid', name='uniq_debug_resource0uuid'),
)
op.create_table(
'tenant_debug',

View File

@ -96,24 +96,24 @@ class Connection(api.Connection):
res = model_query(model).all()
return set((r.uuid, r.reason) for r in res)
def enable_router_debug(self, router_uuid, reason=None):
def enable_resource_debug(self, resource_uuid, reason=None):
self._enable_debug(
model=models.RouterDebug(),
uuid=router_uuid,
model=models.ResourceDebug(),
uuid=resource_uuid,
reason=reason,
)
def disable_router_debug(self, router_uuid):
def disable_resource_debug(self, resource_uuid):
self._disable_debug(
model=models.RouterDebug,
uuid=router_uuid,
model=models.ResourceDebug,
uuid=resource_uuid,
)
def router_in_debug(self, router_uuid):
return self._check_debug(models.RouterDebug, router_uuid)
def resource_in_debug(self, resource_uuid):
return self._check_debug(models.ResourceDebug, resource_uuid)
def routers_in_debug(self):
return self._list_debug(models.RouterDebug)
def resources_in_debug(self):
return self._list_debug(models.ResourceDebug)
def enable_tenant_debug(self, tenant_uuid, reason=None):
self._enable_debug(

View File

@ -74,12 +74,12 @@ class AkandaBase(models.TimestampMixin,
Base = declarative_base(cls=AkandaBase)
class RouterDebug(Base):
"""Represents a router in debug mode."""
class ResourceDebug(Base):
"""Represents a resource in debug mode."""
__tablename__ = 'router_debug'
__tablename__ = 'resource_debug'
__table_args__ = (
schema.UniqueConstraint('uuid', name='uniq_debug_router0uuid'),
schema.UniqueConstraint('uuid', name='uniq_debug_resource0uuid'),
table_args()
)
id = Column(Integer, primary_key=True)

View File

@ -0,0 +1,61 @@
# Copyright (c) 2015 Akanda, Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_config import cfg
from oslo_log import log as logging
from akanda.rug.drivers.router import Router
DRIVER_OPTS = [
cfg.ListOpt('enabled_drivers',
default=['router', ],
help='list of drivers the rug process will load'),
]
cfg.CONF.register_opts(DRIVER_OPTS)
LOG = logging.getLogger(__name__)
AVAILABLE_DRIVERS = {'router': Router}
class InvalidDriverException(Exception):
"""Triggered when driver is not available in AVAILABLE_DRIVERS"""
pass
def get(requested_driver):
"""Returns driver class based on the requested_driver param
will raise InvalidDriverException if not listed in the config option
cfg.CONF.available_drivers.
:param requested_driver: name of desired driver
:return: returns driver object
"""
if requested_driver in AVAILABLE_DRIVERS:
return AVAILABLE_DRIVERS[requested_driver]
raise InvalidDriverException(
'Failed loading driver: %s' % requested_driver
)
def enabled_drivers():
for driver in cfg.CONF.enabled_drivers:
try:
d = get(driver)
yield d
except InvalidDriverException as e:
LOG.exception(e)
pass

182
akanda/rug/drivers/base.py Normal file
View File

@ -0,0 +1,182 @@
# Copyright (c) 2015 AKANDA, INC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import log as logging
class BaseDriver(object):
RESOURCE_NAME = 'BaseDriver'
def __init__(self, worker_context, id, log=None):
"""This is the abstract for rug drivers.
:param id: logical resource id
:param log: override default log
"""
self.id = id
self.external_port = None
self.details = []
self.flavor = None
self.image_uuid = None
self.name = 'ak-%s-%s' % (self.RESOURCE_NAME, self.id)
if log:
self.log = log
else:
self.log = logging.getLogger(self.name)
self.post_init(worker_context)
def post_init(self, worker_context):
"""post init hook
:param worker_context:
:returns: None
"""
pass
def pre_boot(self, worker_context):
"""pre boot hook
:param worker_context:
:returns: None
"""
pass
def post_boot(self, worker_context):
"""post boot hook
:param worker_context:
:returns: None
"""
pass
def update_state(self, worker_context, silent=False):
"""returns state of logical resource.
:param worker_context:
:param silent:
:returns: None
"""
pass
def build_config(self, worker_context, mgt_port, iface_map):
"""gets config of logical resource attached to worker_context.
:param worker_context:
:returns: None
"""
pass
def update_config(self, management_address, config):
"""Updates appliance configuratino
This is responsible for pushing configuration to the managed
appliance
"""
pass
def synchronize_state(self, worker_context, state):
"""sometimes a driver will need to update a service behind it with a
new state.
:param state: a valid state
"""
pass
def make_ports(self, worker_context):
"""Make ports call back for the nova client.
This is expected to create the management port for the instance
and any required instance ports.
:param worker_context:
:returns: A tuple (managment_port, [instance_ports])
"""
def _make_ports():
pass
return _make_ports
@staticmethod
def pre_populate_hook():
"""called in populate.py durring driver loading loop.
"""
pass
def pre_plug(self, worker_context):
"""pre-plug hook
:param worker_context:
:returs: None
"""
@staticmethod
def get_resource_id_for_tenant(worker_context, tenant_id, message):
"""Find the id of a resource for a given tenant id and message.
For some resources simply searching by tenant_id is enough, for
others some context from the message payload may be necessary.
:param worker_context: A worker context with instantiated clients
:param tenant_id: The tenant uuid to search for
:param message: The message associated with the request
:returns: uuid of the resource owned by the tenant
"""
pass
@staticmethod
def process_notification(tenant_id, event_type, payload):
"""Process an incoming notification event
This gets called from the notifications layer to determine whether
a driver should process an incoming notification event. It is
responsible for translating an incoming notification to an Event
object appropriate for that driver.
:param tenant_id: str The UUID tenant_id for the incoming event
:param event_type: str event type, for example router.create.end
:param payload: The payload body of the incoming event
:returns: A populated Event objet if it should process, or None if not
"""
pass
@property
def ports(self):
"""Lists ports associated with the resource.
:returns: A list of akanda.rug.api.neutron.Port objects or []
"""
def get_interfaces(self, management_address):
"""Lists interfaces attached to the resource.
This lists the interfaces attached to the resource from the POV
of the resource iteslf.
:returns: A list of interfaces
"""
pass
def is_alive(self, management_address):
"""Determines whether the managed resource is alive
:returns: bool True if alive, False if not
"""
def get_state(self, worker_context):
"""Returns the state of the managed resource"""

View File

@ -0,0 +1,346 @@
# Copyright (c) 2015 AKANDA, INC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import time
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import timeutils
from neutronclient.common import exceptions as q_exceptions
from akanda.rug.common.i18n import _
from akanda.rug.api import akanda_client
from akanda.rug.api import configuration
from akanda.rug import event
from akanda.rug.api import neutron
from akanda.rug.drivers.base import BaseDriver
from akanda.rug.drivers import states
from akanda.rug.common.i18n import _LW
LOG = logging.getLogger(__name__)
ROUTER_OPTS = [
cfg.StrOpt('image_uuid',
help='image_uuid for router instances.',
deprecated_opts=[
cfg.DeprecatedOpt('router_image_uuid',
group='DEFAULT')]),
cfg.IntOpt('instance_flavor',
help='nova flavor to use for router instances',
deprecated_opts=[
cfg.DeprecatedOpt('router_instance_flavor',
group='DEFAULT')]),
cfg.IntOpt('mgt_service_port', default=5000,
help='The port on which the router API service listens on '
'router appliances',
deprecated_opts=[
cfg.DeprecatedOpt('akanda_mgt_service_port',
group='DEFAULT')]),
]
cfg.CONF.register_group(cfg.OptGroup(name='router'))
cfg.CONF.register_opts(ROUTER_OPTS, 'router')
STATUS_MAP = {
states.DOWN: neutron.STATUS_DOWN,
states.BOOTING: neutron.STATUS_BUILD,
states.UP: neutron.STATUS_BUILD,
states.CONFIGURED: neutron.STATUS_ACTIVE,
states.ERROR: neutron.STATUS_ERROR,
}
_ROUTER_INTERFACE_NOTIFICATIONS = set([
'router.interface.create',
'router.interface.delete',
])
_ROUTER_INTERESTING_NOTIFICATIONS = set([
'subnet.create.end',
'subnet.change.end',
'subnet.delete.end',
'port.create.end',
'port.change.end',
'port.delete.end',
])
DRIVER_NAME = 'router'
class Router(BaseDriver):
RESOURCE_NAME = DRIVER_NAME
_last_synced_status = None
def post_init(self, worker_context):
"""Called at end of __init__ in BaseDriver.
Populates the _router object from neutron and sets image_uuid and
flavor from cfg.
:param worker_context:
"""
self.image_uuid = cfg.CONF.router.image_uuid
self.flavor = cfg.CONF.router.instance_flavor
self.mgt_port = cfg.CONF.router.mgt_service_port
self._ensure_cache(worker_context)
def _ensure_cache(self, worker_context):
try:
self._router = worker_context.neutron.get_router_detail(self.id)
except neutron.RouterGone:
self._router = None
@property
def ports(self):
"""Lists ports associated with the resource.
:returns: A list of akanda.rug.api.neutron.Port objects or []
"""
if self._router:
return [p for p in self._router.ports]
else:
return []
def pre_boot(self, worker_context):
"""pre boot hook
Calls self.pre_plug().
:param worker_context:
:returns: None
"""
self.pre_plug(worker_context)
def post_boot(self, worker_context):
"""post boot hook
:param worker_context:
:returns: None
"""
pass
def build_config(self, worker_context, mgt_port, iface_map):
"""Builds / rebuilds config
:param worker_context:
:param mgt_port:
:param iface_map:
:returns: configuration object
"""
self._ensure_cache(worker_context)
return configuration.build_config(
worker_context.neutron,
self._router,
mgt_port,
iface_map
)
def update_config(self, management_address, config):
"""Updates appliance configuration
This is responsible for pushing configuration to the managed
appliance
"""
self.log.info(_('Updating config for %s'), self.name)
start_time = timeutils.utcnow()
akanda_client.update_config(
management_address, self.mgt_port, config)
delta = timeutils.delta_seconds(start_time, timeutils.utcnow())
self.log.info(_('Config updated for %s after %s seconds'),
self.name, round(delta, 2))
def pre_plug(self, worker_context):
"""pre-plug hook
Sets up the external port.
:param worker_context:
:returs: None
"""
if self._router.external_port is None:
# FIXME: Need to do some work to pick the right external
# network for a tenant.
self.log.debug('Adding external port to router %s')
ext_port = worker_context.neutron.create_router_external_port(
self._router)
self._router.external_port = ext_port
def make_ports(self, worker_context):
"""make ports call back for the nova client.
:param worker_context:
:returns: A tuple (managment_port, [instance_ports])
"""
def _make_ports():
self._ensure_cache(worker_context)
mgt_port = worker_context.neutron.create_management_port(
self.id
)
# FIXME(mark): ideally this should be ordered and de-duped
instance_ports = [
worker_context.neutron.create_vrrp_port(self.id, n)
for n in (p.network_id for p in self._router.ports)
]
return mgt_port, instance_ports
return _make_ports
@staticmethod
def pre_populate_hook():
"""Fetch the existing routers from neutrom then and returns list back
to populate to be distributed to workers.
Wait for neutron to return the list of the existing routers.
Pause up to max_sleep seconds between each attempt and ignore
neutron client exceptions.
"""
nap_time = 1
max_sleep = 15
neutron_client = neutron.Neutron(cfg.CONF)
while True:
try:
neutron_routers = neutron_client.get_routers(detailed=False)
resources = []
for router in neutron_routers:
resources.append(
event.Resource(driver=DRIVER_NAME,
id=router.id,
tenant_id=router.tenant_id)
)
return resources
except (q_exceptions.Unauthorized, q_exceptions.Forbidden) as err:
LOG.warning(_LW('PrePopulateWorkers thread failed: %s'), err)
return
except Exception as err:
LOG.warning(
_LW('Could not fetch routers from neutron: %s'), err)
LOG.warning(_LW(
'sleeping %s seconds before retrying'), nap_time)
time.sleep(nap_time)
# FIXME(rods): should we get max_sleep from the config file?
nap_time = min(nap_time * 2, max_sleep)
@staticmethod
def get_resource_id_for_tenant(worker_context, tenant_id, message):
"""Find the id of the router owned by tenant
:param tenant_id: The tenant uuid to search for
:param message: message associated /w request (unused here)
:returns: uuid of the router owned by the tenant
"""
router = worker_context.neutron.get_router_for_tenant(tenant_id)
if not router:
LOG.debug('Router not found for tenant %s.',
tenant_id)
return None
return router.id
@staticmethod
def process_notification(tenant_id, event_type, payload):
"""Process an incoming notification event
This gets called from the notifications layer to determine whether
this driver should process an incoming notification event. It is
responsible for translating an incoming notificatino to an Event
object appropriate for this driver.
:param tenant_id: str The UUID tenant_id for the incoming event
:param event_type: str event type, for example router.create.end
:param payload: The payload body of the incoming event
:returns: A populated Event objet if it should process, or None if not
"""
router_id = payload.get('router', {}).get('id')
crud = event.UPDATE
if event_type.startswith('routerstatus.update'):
# We generate these events ourself, so ignore them.
return
if event_type == 'router.create.end':
crud = event.CREATE
elif event_type == 'router.delete.end':
crud = event.DELETE
router_id = payload.get('router_id')
elif event_type in _ROUTER_INTERFACE_NOTIFICATIONS:
crud = event.UPDATE
router_id = payload.get('router.interface', {}).get('id')
elif event_type in _ROUTER_INTERESTING_NOTIFICATIONS:
crud = event.UPDATE
elif event_type.endswith('.end'):
crud = event.UPDATE
else:
LOG.debug('Not processing event: %s' % event_type)
return
resource = event.Resource(driver=DRIVER_NAME,
id=router_id,
tenant_id=tenant_id)
e = event.Event(
resource=resource,
crud=crud,
body=payload,
)
return e
def get_state(self, worker_context):
self._ensure_cache(worker_context)
if not self._router:
return states.GONE
else:
# NOTE(adam_g): We probably want to map this status back to
# an internal akanda status
return self._router.status
def synchronize_state(self, worker_context, state):
self._ensure_cache(worker_context)
if not self._router:
LOG.debug('Not synchronizing state with missing router %s',
self.id)
return
new_status = STATUS_MAP.get(state)
old_status = self._last_synced_status
if not old_status or old_status != new_status:
LOG.debug('Synchronizing router %s state %s->%s',
self.id, old_status, new_status)
worker_context.neutron.update_router_status(self.id, new_status)
self._last_synced_status = new_status
def get_interfaces(self, management_address):
"""Lists interfaces attached to the resource.
This lists the interfaces attached to the resource from the POV
of the resource iteslf.
:returns: A list of interfaces
"""
return akanda_client.get_interfaces(management_address,
self.mgt_port)
def is_alive(self, management_address):
"""Determines whether the managed resource is alive
:returns: bool True if alive, False if not
"""
return akanda_client.is_alive(management_address, self.mgt_port)

View File

@ -0,0 +1,29 @@
# Copyright (c) 2015 AKANDA, INC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Reference states.
Each driver maps these to which ever neutron or other
services state.
"""
DOWN = 'down'
BOOTING = 'booting'
UP = 'up'
CONFIGURED = 'configured'
RESTART = 'restart'
REPLUG = 'replug'
GONE = 'gone'
ERROR = 'error'
# base list of ready states, driver can use its own list.
READY_STATES = (UP, CONFIGURED)

View File

@ -14,17 +14,7 @@
# License for the specific language governing permissions and limitations
# under the License.
"""Common event format for events passed within the RUG
"""
import collections
Event = collections.namedtuple(
'Event',
['tenant_id', 'router_id', 'crud', 'body'],
)
# CRUD operations tracked in Event.crud
CREATE = 'create'
READ = 'read'
UPDATE = 'update'
@ -32,3 +22,75 @@ DELETE = 'delete'
POLL = 'poll'
COMMAND = 'command' # an external command to be processed
REBUILD = 'rebuild'
class Event(object):
"""Rug Event object
Events are constructed from incoming messages accepted by the Rug.
They are responsible for holding the message payload (body), the
correpsonding CRUD operation and the logical resource that the
event affects.
"""
def __init__(self, resource, crud, body):
"""
:param resource: Resource instance holding context about the logical
resource that is affected by the Event.
:param crud: CRUD operation that is to be completed by the
correpsonding state machine when it is delivered.
:param body: The original message payload dict.
"""
self.resource = resource
self.crud = crud
self.body = body
def __eq__(self, other):
if not type(self) == type(other):
return False
for k, v in vars(self).iteritems():
if k not in vars(other):
return False
if vars(other)[k] != v:
return False
return True
def __ne__(self, other):
return not self.__eq__(other)
def __repr__(self):
return '<%s (resource=%s, crud=%s, body=%s)>' % (
self.__class__.__name__,
self.resource,
self.crud,
self.body)
class Resource(object):
"""Rug Resource object
A Resource object represents one instance of a logical resource
that is to be managed by the rug (ie, a router).
"""
def __init__(self, driver, id, tenant_id):
"""
:param driver: str name of the driver that corresponds to the resource
type.
:param id: ID of the resource (ie, the Neutron router's UUID).
:param tenant_id: The UUID of the tenant that owns this resource.
"""
self.driver = driver
self.id = id
self.tenant_id = tenant_id
def __eq__(self, other):
return type(self) == type(other) and vars(self) == vars(other)
def __ne__(self, other):
return not self.__eq__(other)
def __repr__(self):
return '<%s (driver=%s, id=%s, tenant_id=%s)>' % (
self.__class__.__name__,
self.driver,
self.id,
self.tenant_id)

View File

@ -46,9 +46,13 @@ def _health_inspector(scheduler):
while True:
time.sleep(period)
LOG.debug('waking up')
e = event.Event(
r = event.Resource(
id='*',
tenant_id='*',
router_id='*',
driver='*',
)
e = event.Event(
resource=r,
crud=event.POLL,
body={},
)

View File

@ -14,69 +14,69 @@
# License for the specific language governing permissions and limitations
# under the License.
from datetime import datetime
from functools import wraps
import time
from oslo_config import cfg
from akanda.rug.api import configuration
from akanda.rug.api import akanda_client as router_api
from akanda.rug.api import neutron
from akanda.rug.common.i18n import _LE, _LI, _LW
DOWN = 'down'
BOOTING = 'booting'
UP = 'up'
CONFIGURED = 'configured'
RESTART = 'restart'
REPLUG = 'replug'
GONE = 'gone'
ERROR = 'error'
STATUS_MAP = {
DOWN: neutron.STATUS_DOWN,
BOOTING: neutron.STATUS_BUILD,
UP: neutron.STATUS_BUILD,
CONFIGURED: neutron.STATUS_ACTIVE,
ERROR: neutron.STATUS_ERROR,
}
from akanda.rug.drivers import states
from akanda.rug.common.i18n import _LE, _LI
CONF = cfg.CONF
INSTANCE_MANAGER_OPTS = [
cfg.IntOpt(
'hotplug_timeout', default=10,
'hotplug_timeout',
default=10,
help='The amount of time to wait for nova to hotplug/unplug '
'networks from the router instances'),
'networks from the instances.'),
cfg.IntOpt(
'boot_timeout', default=600),
cfg.IntOpt(
'error_state_cooldown',
'error_state_coolstates.DOWN',
default=30,
help=('Number of seconds to ignore new events when a router goes '
'into ERROR state'),
help='Number of seconds to ignore new events when an instance goes '
'into states.ERROR state.',
),
]
CONF.register_opts(INSTANCE_MANAGER_OPTS)
def synchronize_router_status(f):
@wraps(f)
def wrapper(self, worker_context, silent=False):
old_status = self._last_synced_status
val = f(self, worker_context, silent)
if not self.router_obj:
return val
new_status = STATUS_MAP.get(self.state, neutron.STATUS_ERROR)
if not old_status or old_status != new_status:
worker_context.neutron.update_router_status(
self.router_obj.id,
new_status
def synchronize_driver_state(f):
"""Wrapper that triggers a driver's synchronize_state function"""
def wrapper(self, *args, **kw):
state = f(self, *args, **kw)
self.driver.synchronize_state(*args, state=state)
return state
return wrapper
def ensure_cache(f):
"""Decorator to wrap around any function that uses self.instance_info.
Insures that self.instance_info is up to date and catches instances in a
GONE or missing state before wasting cycles trying to do something with it.
NOTE: This replaces the old function called _ensure_cache made a Decorator
rather than calling it explicitly at the start of all those functions.
"""
def wrapper(self, worker_context, *args, **kw):
# insure that self.instance_info is current before doing anything.
if not self.instance_info:
# attempt to populate instance_info
self.instance_info = (
worker_context.nova_client.get_instance_info(self.driver.name)
)
self._last_synced_status = new_status
return val
if self.instance_info:
(
self.instance_info.management_port,
self.instance_info.ports
) = worker_context.neutron.get_ports_for_instance(
self.instance_info.id_
)
return f(self, worker_context, *args, **kw)
return wrapper
@ -97,159 +97,172 @@ class BootAttemptCounter(object):
class InstanceManager(object):
def __init__(self, router_id, tenant_id, log, worker_context):
self.router_id = router_id
self.tenant_id = tenant_id
self.log = log
self.state = DOWN
self.router_obj = None
def __init__(self, driver, resource_id, worker_context):
"""The instance manager is your interface to the running instance.
wether it be virtual, container or physical.
Service specific code lives in the driver which is passed in here.
:param driver: driver object
:param resource_id: UUID of logical resource
:param worker_context:
"""
self.driver = driver
self.id = resource_id
self.log = self.driver.log
self.state = states.DOWN
self.instance_info = None
self.last_error = None
self._boot_counter = BootAttemptCounter()
self._last_synced_status = None
self.update_state(worker_context, silent=True)
self.state = self.update_state(worker_context, silent=True)
@property
def attempts(self):
"""Property which returns the boot count.
:returns Int:
"""
return self._boot_counter.count
def reset_boot_counter(self):
"""Resets the boot counter.
:returns None:
"""
self._boot_counter.reset()
@synchronize_router_status
@synchronize_driver_state
def update_state(self, worker_context, silent=False):
"""Updates state of the instance and, by extension, its logical resource
:param worker_context:
:param silent:
:returns: state
"""
self._ensure_cache(worker_context)
if self.state == GONE:
self.log.debug('not updating state of deleted router')
if self.driver.get_state(worker_context) == states.GONE:
self.log.debug('%s driver reported its state is GONE',
self.driver.RESOURCE_NAME)
self.state = states.GONE
return self.state
if self.instance_info is None:
self.log.debug('no backing instance, marking router as down')
self.state = DOWN
self.log.info(_LI('no backing instance, marking as down'))
self.state = states.DOWN
return self.state
addr = self.instance_info.management_address
for i in xrange(cfg.CONF.max_retries):
if router_api.is_alive(addr, cfg.CONF.akanda_mgt_service_port):
if self.state != CONFIGURED:
self.state = UP
if self.driver.is_alive(self.instance_info.management_address):
if self.state != states.CONFIGURED:
self.state = states.UP
break
if not silent:
self.log.debug(
'Alive check failed. Attempt %d of %d',
i,
cfg.CONF.max_retries,
)
self.log.debug('Alive check failed. Attempt %d of %d',
i,
cfg.CONF.max_retries)
time.sleep(cfg.CONF.retry_delay)
else:
old_state = self.state
self._check_boot_timeout()
# If the router isn't responding, make sure Nova knows about it
instance = worker_context.nova_client.get_instance_for_obj(
self.router_id
)
if instance is None and self.state != ERROR:
self.log.info(_LI('No instance was found; rebooting'))
self.state = DOWN
# If the instance isn't responding, make sure Nova knows about it
instance = worker_context.nova_client.get_instance_for_obj(self.id)
if instance is None and self.state != states.ERROR:
self.log.info('No instance was found; rebooting')
self.state = states.DOWN
self.instance_info = None
# update_state() is called from Alive() to check the
# status of the router. If we can't talk to the API at
# that point, the router should be considered missing and
# we should reboot it, so mark it down if we think it was
# we should reboot it, so mark it states.DOWN if we think it was
# configured before.
if old_state == CONFIGURED and self.state != ERROR:
self.log.debug(
'Did not find router alive, marking it as down',
)
self.state = DOWN
if old_state == states.CONFIGURED and self.state != states.ERROR:
self.log.debug('Instance not alive, marking it as DOWN')
self.state = states.DOWN
# After the router is all the way up, record how long it took
# After the instance is all the way up, record how long it took
# to boot and accept a configuration.
if self.instance_info.booting and self.state == CONFIGURED:
# If we didn't boot the server (because we were restarted
if self.instance_info.booting and self.state == states.CONFIGURED:
# If we didn't boot the instance (because we were restarted
# while it remained running, for example), we won't have a
# duration to log.
self.instance_info.confirm_up()
if self.instance_info.boot_duration:
self.log.info(
_LI('Router booted in %s seconds after %s attempts'),
self.instance_info.boot_duration.total_seconds(),
self._boot_counter.count)
self.log.info('%s booted in %s seconds after %s attempts',
self.driver.RESOURCE_NAME,
self.instance_info.boot_duration.total_seconds(),
self._boot_counter.count)
# Always reset the boot counter, even if we didn't boot
# the server ourself, so we don't accidentally think we
# have an erroring router.
self._boot_counter.reset()
return self.state
def boot(self, worker_context, router_image_uuid):
self._ensure_cache(worker_context)
if self.state == GONE:
self.log.info(_LI('Not booting deleted router'))
return
def boot(self, worker_context):
"""Boots the instance with driver pre/post boot hooks.
self.log.info(_LI('Booting router'))
self.state = DOWN
:returns: None
"""
self._ensure_cache(worker_context)
self.log.info('Booting %s' % self.driver.RESOURCE_NAME)
self.state = states.DOWN
self._boot_counter.start()
def make_vrrp_ports():
mgt_port = worker_context.neutron.create_management_port(
self.router_obj.id
)
# FIXME(mark): ideally this should be ordered and de-duped
instance_ports = [
worker_context.neutron.create_vrrp_port(self.router_obj.id, n)
for n in (p.network_id for p in self.router_obj.ports)
]
return mgt_port, instance_ports
# driver preboot hook
self.driver.pre_boot(worker_context)
# try to boot the instance
try:
# TODO(mark): make this pluggable
self._ensure_provider_ports(self.router_obj, worker_context)
# TODO(mark): make this handle errors more gracefully on cb fail
# TODO(mark): checkout from a pool - boot on demand for now
instance_info = worker_context.nova_client.boot_instance(
self.instance_info,
self.router_obj.id,
router_image_uuid,
make_vrrp_ports
self.driver.name,
self.driver.image_uuid,
self.driver.flavor,
self.driver.make_ports(worker_context)
)
if not instance_info:
self.log.info(_LI('Previous router is deleting'))
# Reset the VM manager, causing the state machine to start
# again with a new VM.
self.log.info(_LI('Previous instance is still deleting'))
# Reset the boot counter, causing the state machine to start
# again with a new Instance.
self.reset_boot_counter()
self.instance_info = None
return
except:
self.log.exception(_LE('Router failed to start boot'))
# TODO(mark): attempt clean-up of failed ports
self.log.exception(_LE('Instance failed to start boot'))
return
else:
# We have successfully started a (re)boot attempt so
# record the timestamp so we can report how long it takes.
self.state = BOOTING
self.state = states.BOOTING
self.instance_info = instance_info
def check_boot(self, worker_context):
ready_states = (UP, CONFIGURED)
if self.update_state(worker_context, silent=True) in ready_states:
self.log.info(_LI('Router has booted, attempting initial config'))
self.configure(worker_context, BOOTING, attempts=1)
if self.state != CONFIGURED:
self._check_boot_timeout()
return self.state == CONFIGURED
# driver post boot hook
self.driver.post_boot(worker_context)
self.log.debug('Router is %s', self.state.upper())
def check_boot(self, worker_context):
"""Checks status of instance, if ready triggers self.configure
"""
state = self.update_state(worker_context, silent=True)
if state in states.READY_STATES:
self.log.info('Instance has booted, attempting initial config')
self.configure(worker_context, states.BOOTING, attempts=1)
if self.state != states.CONFIGURED:
self._check_boot_timeout()
return self.state == states.CONFIGURED
self.log.debug('Instance is %s' % self.state.upper())
return False
@synchronize_router_status
@synchronize_driver_state
def set_error(self, worker_context, silent=False):
"""Set the internal and neutron status for the router to ERROR.
"""Set the internal and neutron status for the router to states.ERROR.
This is called from outside when something notices the router
is "broken". We don't use it internally because this class is
@ -257,14 +270,11 @@ class InstanceManager(object):
whether or not the router is fatally broken.
"""
self._ensure_cache(worker_context)
if self.state == GONE:
self.log.debug('not updating state of deleted router')
return self.state
self.state = ERROR
self.state = states.ERROR
self.last_error = datetime.utcnow()
return self.state
@synchronize_router_status
@synchronize_driver_state
def clear_error(self, worker_context, silent=False):
"""Clear the internal error state.
@ -276,16 +286,14 @@ class InstanceManager(object):
# Clear the boot counter.
self._boot_counter.reset()
self._ensure_cache(worker_context)
if self.state == GONE:
self.log.debug('not updating state of deleted router')
return self.state
self.state = DOWN
self.state = states.DOWN
return self.state
@property
def error_cooldown(self):
# Returns True if the router was recently set to ERROR state.
if self.last_error and self.state == ERROR:
"""Returns True if the instance was recently set to states.ERROR state.
"""
if self.last_error and self.state == states.ERROR:
seconds_since_error = (
datetime.utcnow() - self.last_error
).total_seconds()
@ -293,58 +301,63 @@ class InstanceManager(object):
return True
return False
@synchronize_driver_state
def stop(self, worker_context):
"""Attempts to destroy the instance with configured timeout.
:param worker_context:
:returns:
"""
self._ensure_cache(worker_context)
if self.state == GONE:
self.log.info(_LI('Destroying router neutron has deleted'))
else:
self.log.info(_LI('Destroying router'))
self.log.info(_LI('Destroying instance'))
if not self.instance_info:
self.log.info(_LI('Instance already destroyed.'))
return
try:
nova_client = worker_context.nova_client
nova_client.destroy_instance(self.instance_info)
worker_context.nova_client.destroy_instance(self.instance_info)
except Exception:
self.log.exception(_LE('Error deleting router instance'))
start = time.time()
while time.time() - start < cfg.CONF.boot_timeout:
if not nova_client.get_instance_by_id(self.instance_info.id_):
if self.state != GONE:
self.state = DOWN
return
if not worker_context.nova_client.\
get_instance_by_id(self.instance_info.id_):
if self.state != states.GONE:
self.state = states.DOWN
return self.state
self.log.debug('Router has not finished stopping')
time.sleep(cfg.CONF.retry_delay)
self.log.error(_LE(
'Router failed to stop within %d secs'),
cfg.CONF.boot_timeout)
def configure(self, worker_context, failure_state=RESTART, attempts=None):
self.log.debug('Begin router config')
self.state = UP
def configure(self, worker_context,
failure_state=states.RESTART, attempts=None):
"""Pushes config to instance
:param worker_context:
:param failure_state:
:param attempts:
:returns:
"""
self.log.debug('Begin instance config')
self.state = states.UP
attempts = attempts or cfg.CONF.max_retries
# FIXME: This might raise an error, which doesn't mean the
# *router* is broken, but does mean we can't update it.
# Change the exception to something the caller can catch
# safely.
self._ensure_cache(worker_context)
if self.state == GONE:
if self.driver.get_state(worker_context) == states.GONE:
return
# FIXME: This should raise an explicit exception so the caller
interfaces = self.driver.get_interfaces(
self.instance_info.management_address)
# knows that we could not talk to the router (versus the issue
# above).
interfaces = router_api.get_interfaces(
self.instance_info.management_address,
cfg.CONF.akanda_mgt_service_port
)
if not self._verify_interfaces(self.router_obj, interfaces):
# FIXME: Need a REPLUG state when we support hot-plugging
if not self._verify_interfaces(self.driver.ports, interfaces):
# FIXME: Need a states.REPLUG state when we support hot-plugging
# interfaces.
self.log.debug("Interfaces aren't plugged as expected.")
self.state = REPLUG
self.state = states.REPLUG
return
# TODO(mark): We're in the first phase of VRRP, so we need
@ -357,17 +370,15 @@ class InstanceManager(object):
# Add in the management port
mgt_port = self.instance_info.management_port
port_mac_to_net[mgt_port.mac_address] = mgt_port.network_id
# this is a network to logical interface id
iface_map = {
port_mac_to_net[i['lladdr']]: i['ifname']
for i in interfaces if i['lladdr'] in port_mac_to_net
}
# FIXME: Need to catch errors talking to neutron here.
config = configuration.build_config(
worker_context.neutron,
self.router_obj,
# sending all the standard config over to the driver for final updates
config = self.driver.build_config(
worker_context,
mgt_port,
iface_map
)
@ -375,15 +386,13 @@ class InstanceManager(object):
for i in xrange(attempts):
try:
router_api.update_config(
self.driver.update_config(
self.instance_info.management_address,
cfg.CONF.akanda_mgt_service_port,
config
)
config)
except Exception:
if i == attempts - 1:
# Only log the traceback if we encounter it many times.
self.log.exception(_LE('Failed to update config'))
self.log.exception(_LE('failed to update config'))
else:
self.log.debug(
'failed to update config, attempt %d',
@ -391,22 +400,25 @@ class InstanceManager(object):
)
time.sleep(cfg.CONF.retry_delay)
else:
self.state = CONFIGURED
self.log.info(_LI('Router config updated'))
self.state = states.CONFIGURED
self.log.info('Instance config updated')
return
else:
# FIXME: We failed to configure the router too many times,
# so restart it.
self.state = failure_state
def replug(self, worker_context):
self.log.debug('Attempting to replug...')
self._ensure_provider_ports(self.router_obj, worker_context)
"""Attempts to replug the network ports for an instance.
:param worker_context:
:returns:
"""
self.log.debug('Attempting to replug...')
self.driver.pre_plug(worker_context)
interfaces = self.driver.get_interfaces(
self.instance_info.management_address)
interfaces = router_api.get_interfaces(
self.instance_info.management_address,
cfg.CONF.akanda_mgt_service_port
)
actual_macs = set((iface['lladdr'] for iface in interfaces))
instance_macs = set(p.mac_address for p in self.instance_info.ports)
instance_macs.add(self.instance_info.management_port.mac_address)
@ -414,17 +426,17 @@ class InstanceManager(object):
if instance_macs != actual_macs:
# our cached copy of the ports is wrong reboot and clean up
self.log.warning(
_LW('Instance macs(%s) do not match actual macs (%s). '
'Instance cache appears out-of-sync'),
('Instance macs(%s) do not match actual macs (%s). Instance '
'cache appears out-of-sync'),
instance_macs, actual_macs
)
self.state = RESTART
self.state = states.RESTART
return
instance_ports = {p.network_id: p for p in self.instance_info.ports}
instance_networks = set(instance_ports.keys())
logical_networks = set(p.network_id for p in self.router_obj.ports)
logical_networks = set(p.network_id for p in self.driver.ports)
if logical_networks != instance_networks:
instance = worker_context.nova_client.get_instance_by_id(
@ -434,7 +446,7 @@ class InstanceManager(object):
# For each port that doesn't have a mac address on the instance...
for network_id in logical_networks - instance_networks:
port = worker_context.neutron.create_vrrp_port(
self.router_obj.id,
self.driver.id,
network_id
)
self.log.debug(
@ -445,8 +457,8 @@ class InstanceManager(object):
try:
instance.interface_attach(port.id, None, None)
except:
self.log.exception(_LE('Interface attach failed'))
self.state = RESTART
self.log.exception('Interface attach failed')
self.state = states.RESTART
return
self.instance_info.ports.append(port)
@ -460,8 +472,8 @@ class InstanceManager(object):
try:
instance.interface_detach(port.id)
except:
self.log.exception(_LE('Interface detach failed'))
self.state = RESTART
self.log.exception('Interface detach failed')
self.state = states.RESTART
return
self.instance_info.ports.remove(port)
@ -475,36 +487,24 @@ class InstanceManager(object):
self.log.debug(
"Waiting for interface attachments to take effect..."
)
interfaces = router_api.get_interfaces(
self.instance_info.management_address,
cfg.CONF.akanda_mgt_service_port
)
if self._verify_interfaces(self.router_obj, interfaces):
interfaces = self.driver.get_interfaces(
self.instance_info.management_address)
if self._verify_interfaces(self.driver.ports, interfaces):
# replugging was successful
# TODO(mark) update port states
return
time.sleep(1)
replug_seconds -= 1
self.log.debug("Interfaces aren't plugged as expected, rebooting.")
self.state = RESTART
self.state = states.RESTART
def _ensure_cache(self, worker_context):
try:
self.router_obj = worker_context.neutron.get_router_detail(
self.router_id
)
except neutron.RouterGone:
# The router has been deleted, set our state accordingly
# and return without doing any more work.
self.state = GONE
self.router_obj = None
if not self.instance_info:
self.instance_info = (
worker_context.nova_client.get_instance_info_for_obj(
self.router_id
)
worker_context.nova_client.get_instance_info(self.driver.name)
)
if self.instance_info:
@ -516,6 +516,9 @@ class InstanceManager(object):
)
def _check_boot_timeout(self):
"""If the instance was created more than `boot_timeout` seconds
ago, log an error and set the state set to states.DOWN
"""
time_since_boot = self.instance_info.time_since_boot
if time_since_boot:
@ -524,31 +527,32 @@ class InstanceManager(object):
# condition already. The state will be reset when
# the router starts responding again, or when the
# error is cleared from a forced rebuild.
if self.state != ERROR:
self.state = BOOTING
if self.state != states.ERROR:
self.state = states.BOOTING
else:
# If the instance was created more than `boot_timeout` seconds
# ago, log an error and set the state set to DOWN
# ago, log an error and set the state set to states.DOWN
self.log.info(
_LI('Router is DOWN. Created over %d secs ago.'),
'Router is states.DOWN. Created over %d secs ago.',
cfg.CONF.boot_timeout)
# Do not reset the state if we have an error condition
# already. The state will be reset when the router starts
# responding again, or when the error is cleared from a
# forced rebuild.
if self.state != ERROR:
self.state = DOWN
def _verify_interfaces(self, logical_config, interfaces):
router_macs = set((iface['lladdr'] for iface in interfaces))
self.log.debug('MACs found: %s', ', '.join(sorted(router_macs)))
if self.state != states.ERROR:
self.state = states.DOWN
def _verify_interfaces(self, ports, interfaces):
"""Verifies the network interfaces are what they should be.
"""
actual_macs = set((iface['lladdr'] for iface in interfaces))
self.log.debug('MACs found: %s', ', '.join(sorted(actual_macs)))
if not all(
getattr(p, 'mac_address', None) for p in logical_config.ports
getattr(p, 'mac_address', None) for p in ports
):
return False
num_logical_ports = len(list(logical_config.ports))
num_logical_ports = len(list(ports))
num_instance_ports = len(list(self.instance_info.ports))
if num_logical_ports != num_instance_ports:
return False
@ -558,15 +562,4 @@ class InstanceManager(object):
expected_macs.add(self.instance_info.management_port.mac_address)
self.log.debug('MACs expected: %s', ', '.join(sorted(expected_macs)))
return router_macs == expected_macs
def _ensure_provider_ports(self, router, worker_context):
if router.external_port is None:
# FIXME: Need to do some work to pick the right external
# network for a tenant.
self.log.debug('Adding external port to router')
ext_port = worker_context.neutron.create_router_external_port(
router
)
router.external_port = ext_port
return router
return actual_macs == expected_macs

View File

@ -105,7 +105,6 @@ def main(argv=sys.argv[1:]):
# description
# Change the process and thread name so the logs are cleaner.
p = multiprocessing.current_process()
p.name = 'pmain'
t = threading.current_thread()

View File

@ -22,6 +22,7 @@ import Queue
import threading
from akanda.rug import commands
from akanda.rug import drivers
from akanda.rug import event
from akanda.rug.common import rpc
@ -29,7 +30,7 @@ from oslo_config import cfg
from oslo_context import context
from oslo_log import log as logging
from akanda.rug.common.i18n import _LE, _LW
from akanda.rug.common.i18n import _LE
cfg.CONF.register_group(cfg.OptGroup(name='rabbit',
@ -93,12 +94,12 @@ def _get_tenant_id_for_message(context, payload=None):
return None
_INTERFACE_NOTIFICATIONS = set([
_ROUTER_INTERFACE_NOTIFICATIONS = set([
'router.interface.create',
'router.interface.delete',
])
_INTERESTING_NOTIFICATIONS = set([
_ROUTER_INTERESTING_NOTIFICATIONS = set([
'subnet.create.end',
'subnet.change.end',
'subnet.delete.end',
@ -111,21 +112,6 @@ _INTERESTING_NOTIFICATIONS = set([
L3_AGENT_TOPIC = 'l3_agent'
def _handle_connection_error(exception, interval):
""" Log connection retry attempts."""
LOG.warning(_LW("Error establishing connection: %s"), exception)
LOG.warning(_LW("Retrying in %d seconds"), interval)
def _kombu_configuration(conf):
"""Return a dict of kombu connection parameters from oslo.config."""
cfg_keys = ('max_retries',
'interval_start',
'interval_step',
'interval_max')
return {k: getattr(conf.CONF.rabbit, k) for k in cfg_keys}
class L3RPCEndpoint(object):
"""A RPC endpoint for servicing L3 Agent RPC requests"""
def __init__(self, notification_queue):
@ -133,9 +119,12 @@ class L3RPCEndpoint(object):
def router_deleted(self, ctxt, router_id):
tenant_id = _get_tenant_id_for_message(ctxt)
resource = event.Resource('router', router_id, tenant_id)
crud = event.DELETE
e = event.Event(tenant_id, router_id, crud, None)
self.notification_queue.put((e.tenant_id, e))
e = event.Event(resource, crud, None)
self.notification_queue.put((e.resource.tenant_id, e))
class NotificationsEndpoint(object):
@ -144,45 +133,46 @@ class NotificationsEndpoint(object):
self.notification_queue = notification_queue
def info(self, ctxt, publisher_id, event_type, payload, metadata):
# Router id is not always present, but look for it as though
# it is to avoid duplicating this line a few times.
router_id = payload.get('router', {}).get('id')
tenant_id = _get_tenant_id_for_message(ctxt, payload)
crud = event.UPDATE
if event_type.startswith('routerstatus.update'):
# We generate these events ourself, so ignore them.
return None
if event_type == 'router.create.end':
crud = event.CREATE
elif event_type == 'router.delete.end':
crud = event.DELETE
router_id = payload.get('router_id')
elif event_type in _INTERFACE_NOTIFICATIONS:
crud = event.UPDATE
router_id = payload.get('router.interface', {}).get('id')
elif event_type in _INTERESTING_NOTIFICATIONS:
crud = event.UPDATE
elif event_type.endswith('.end'):
crud = event.UPDATE
elif event_type.startswith('akanda.rug.command'):
e = None
events = []
if event_type.startswith('akanda.rug.command'):
LOG.debug('received a command: %r', payload)
# If the message does not specify a tenant, send it to everyone
tenant_id = payload.get('tenant_id', '*')
router_id = payload.get('router_id')
crud = event.COMMAND
if payload.get('command') == commands.POLL:
e = event.Event(
tenant_id='*',
router_id='*',
resource='*',
crud=event.POLL,
body={})
self.notification_queue.put((e.tenant_id, e))
self.notification_queue.put(('*', e))
return
else:
# If the message does not specify a tenant, send it to everyone
tenant_id = payload.get('tenant_id', '*')
router_id = payload.get('router_id')
resource = event.Resource(
driver='*',
id=router_id,
tenant_id=tenant_id)
events.append(event.Event(resource, crud, payload))
else:
for driver in drivers.enabled_drivers():
driver_event = driver.process_notification(
tenant_id, event_type, payload)
if driver_event:
events.append(driver_event)
if not events:
LOG.debug('Could not construct any events from %s /w payload: %s',
event_type, payload)
return
e = event.Event(tenant_id, router_id, crud, payload)
self.notification_queue.put((e.tenant_id, e))
LOG.debug('Generated %s events from %s /w payload: %s',
len(events), event_type, payload)
for e in events:
self.notification_queue.put((e.resource.tenant_id, e))
def listen(notification_queue):

View File

@ -19,61 +19,40 @@
"""
import threading
import time
from oslo_config import cfg
from oslo_log import log as logging
from neutronclient.common import exceptions as q_exceptions
from akanda.rug import event
from akanda.rug.api import neutron
from akanda.rug.common.i18n import _LW
from akanda.rug import drivers
LOG = logging.getLogger(__name__)
def _pre_populate_workers(scheduler):
"""Fetch the existing routers from neutron.
Wait for neutron to return the list of the existing routers.
Pause up to max_sleep seconds between each attempt and ignore
neutron client exceptions.
"""Loops through enabled drivers triggering each drivers pre_populate_hook
which is a static method for each driver.
"""
nap_time = 1
max_sleep = 15
for driver in drivers.enabled_drivers():
resources = driver.pre_populate_hook()
neutron_client = neutron.Neutron(cfg.CONF)
if not resources:
# just skip to the next one the drivers pre_populate_hook already
# handled the exception or error and outputs to logs
LOG.debug('No %s resources found to pre-populate', driver)
continue
while True:
try:
neutron_routers = neutron_client.get_routers(detailed=False)
break
except (q_exceptions.Unauthorized, q_exceptions.Forbidden) as err:
LOG.warning(_LW('PrePopulateWorkers thread failed: %s'), err)
return
except Exception as err:
LOG.warning(
_LW('Could not fetch routers from neutron: %s'), err)
LOG.warning(_LW('sleeping %s seconds before retrying'), nap_time)
time.sleep(nap_time)
# FIXME(rods): should we get max_sleep from the config file?
nap_time = min(nap_time * 2, max_sleep)
LOG.debug('Start pre-populating %d workers for the %s driver',
len(resources),
driver.RESOURCE_NAME)
LOG.debug('Start pre-populating the workers with %d fetched routers',
len(neutron_routers))
for router in neutron_routers:
message = event.Event(
tenant_id=router.tenant_id,
router_id=router.id,
crud=event.POLL,
body={}
)
scheduler.handle_message(router.tenant_id, message)
for resource in resources:
message = event.Event(
resource=resource,
crud=event.POLL,
body={}
)
scheduler.handle_message(resource.tenant_id, message)
def pre_populate_workers(scheduler):

View File

@ -97,7 +97,7 @@ class Dispatcher(object):
class Scheduler(object):
"""Managers a worker pool and redistributes messages.
"""Manages a worker pool and redistributes messages.
"""
def __init__(self, worker_factory):

View File

@ -20,28 +20,27 @@
"""
# See state machine diagram and description:
# https://docs.google.com/a/dreamhost.com/document/d/1Ed5wDqCHW-CUt67ufjOUq4uYj0ECS5PweHxoueUoYUI/edit # noqa
# http://akanda.readthedocs.org/en/latest/rug.html#state-machine-workers-and-router-lifecycle
import collections
import itertools
from oslo_config import cfg
from oslo_log import log as logging
from akanda.rug.common.i18n import _LE, _LI, _LW
from akanda.rug.event import POLL, CREATE, READ, UPDATE, DELETE, REBUILD
from akanda.rug import instance_manager
from akanda.rug.drivers import states
class StateParams(object):
def __init__(self, instance, log, queue, bandwidth_callback,
reboot_error_threshold, router_image_uuid):
def __init__(self, driver, instance, queue, bandwidth_callback,
reboot_error_threshold):
self.driver = driver
self.instance = instance
self.log = log
self.log = driver.log
self.queue = queue
self.bandwidth_callback = bandwidth_callback
self.reboot_error_threshold = reboot_error_threshold
self.router_image_uuid = router_image_uuid
self.image_uuid = driver.image_uuid
class State(object):
@ -62,8 +61,8 @@ class State(object):
return self.params.instance
@property
def router_image_uuid(self):
return self.params.router_image_uuid
def image_uuid(self):
return self.params.image_uuid
@property
def name(self):
@ -83,11 +82,11 @@ class CalcAction(State):
def execute(self, action, worker_context):
queue = self.queue
if DELETE in queue:
self.log.debug('shortcutting to delete')
self.params.driver.log.debug('shortcutting to delete')
return DELETE
while queue:
self.log.debug(
self.params.driver.log.debug(
'action = %s, len(queue) = %s, queue = %s',
action,
len(queue),
@ -97,21 +96,22 @@ class CalcAction(State):
if action == UPDATE and queue[0] == CREATE:
# upgrade to CREATE from UPDATE by taking the next
# item from the queue
self.log.debug('upgrading from update to create')
self.params.driver.log.debug('upgrading from update to create')
action = queue.popleft()
continue
elif action in (CREATE, UPDATE) and queue[0] == REBUILD:
# upgrade to REBUILD from CREATE/UPDATE by taking the next
# item from the queue
self.log.debug('upgrading from %s to rebuild', action)
self.params.driver.log.debug('upgrading from %s to rebuild',
action)
action = queue.popleft()
continue
elif action == CREATE and queue[0] == UPDATE:
# CREATE implies an UPDATE so eat the update event
# without changing the action
self.log.debug('merging create and update')
self.params.driver.log.debug('merging create and update')
queue.popleft()
continue
@ -119,8 +119,9 @@ class CalcAction(State):
# Throw away a poll following any other valid action,
# because a create or update will automatically handle
# the poll and repeated polls are not needed.
self.log.debug('discarding poll event following action %s',
action)
self.params.driver.log.debug('discarding poll event following '
'action %s',
action)
queue.popleft()
continue
@ -128,28 +129,28 @@ class CalcAction(State):
# We are not polling and the next action is something
# different from what we are doing, so just do the
# current action.
self.log.debug('done collapsing events')
self.params.driver.log.debug('done collapsing events')
break
self.log.debug('popping action from queue')
self.params.driver.log.debug('popping action from queue')
action = queue.popleft()
return action
def transition(self, action, worker_context):
if self.instance.state == instance_manager.GONE:
if self.instance.state == states.GONE:
next_action = StopInstance(self.params)
elif action == DELETE:
next_action = StopInstance(self.params)
elif action == REBUILD:
next_action = RebuildInstance(self.params)
elif self.instance.state == instance_manager.BOOTING:
elif self.instance.state == states.BOOTING:
next_action = CheckBoot(self.params)
elif self.instance.state == instance_manager.DOWN:
elif self.instance.state == states.DOWN:
next_action = CreateInstance(self.params)
else:
next_action = Alive(self.params)
if self.instance.state == instance_manager.ERROR:
if self.instance.state == states.ERROR:
if action == POLL:
# If the selected action is to poll, and we are in an
# error state, then an event slipped through the
@ -157,8 +158,10 @@ class CalcAction(State):
# here.
next_action = self
elif self.instance.error_cooldown:
self.log.debug('Router is in ERROR cooldown, ignoring '
'event.')
self.params.driver.log.debug(
'Resource is in ERROR cooldown, '
'ignoring event.'
)
next_action = self
else:
# If this isn't a POLL, and the configured `error_cooldown`
@ -206,15 +209,15 @@ class Alive(State):
return action
def transition(self, action, worker_context):
if self.instance.state == instance_manager.GONE:
if self.instance.state == states.GONE:
return StopInstance(self.params)
elif self.instance.state == instance_manager.DOWN:
elif self.instance.state == states.DOWN:
return CreateInstance(self.params)
elif action == POLL and \
self.instance.state == instance_manager.CONFIGURED:
self.instance.state == states.CONFIGURED:
return CalcAction(self.params)
elif action == READ and \
self.instance.state == instance_manager.CONFIGURED:
self.instance.state == states.CONFIGURED:
return ReadStats(self.params)
else:
return ConfigureInstance(self.params)
@ -222,25 +225,26 @@ class Alive(State):
class CreateInstance(State):
def execute(self, action, worker_context):
# Check for a loop where the router keeps failing to boot or
# Check for a loop where the resource keeps failing to boot or
# accept the configuration.
if self.instance.attempts >= self.params.reboot_error_threshold:
self.log.info(_LI('Dropping out of boot loop after %s trials'),
self.instance.attempts)
self.params.driver.log.info(_LI('Dropping out of boot loop after '
' %s trials'),
self.instance.attempts)
self.instance.set_error(worker_context)
return action
self.instance.boot(worker_context, self.router_image_uuid)
self.log.debug('CreateInstance attempt %s/%s',
self.instance.attempts,
self.params.reboot_error_threshold)
self.instance.boot(worker_context)
self.params.driver.log.debug('CreateInstance attempt %s/%s',
self.instance.attempts,
self.params.reboot_error_threshold)
return action
def transition(self, action, worker_context):
if self.instance.state == instance_manager.GONE:
if self.instance.state == states.GONE:
return StopInstance(self.params)
elif self.instance.state == instance_manager.ERROR:
elif self.instance.state == states.ERROR:
return CalcAction(self.params)
elif self.instance.state == instance_manager.DOWN:
elif self.instance.state == states.DOWN:
return CreateInstance(self.params)
return CheckBoot(self.params)
@ -251,18 +255,18 @@ class CheckBoot(State):
# Put the action back on the front of the queue so that we can yield
# and handle it in another state machine traversal (which will proceed
# from CalcAction directly to CheckBoot).
if self.instance.state not in (instance_manager.DOWN,
instance_manager.GONE):
if self.instance.state not in (states.DOWN,
states.GONE):
self.queue.appendleft(action)
return action
def transition(self, action, worker_context):
if self.instance.state == instance_manager.REPLUG:
if self.instance.state == states.REPLUG:
return ReplugInstance(self.params)
if self.instance.state in (instance_manager.DOWN,
instance_manager.GONE):
if self.instance.state in (states.DOWN,
states.GONE):
return StopInstance(self.params)
if self.instance.state == instance_manager.UP:
if self.instance.state == states.UP:
return ConfigureInstance(self.params)
return CalcAction(self.params)
@ -273,7 +277,7 @@ class ReplugInstance(State):
return action
def transition(self, action, worker_context):
if self.instance.state == instance_manager.RESTART:
if self.instance.state == states.RESTART:
return StopInstance(self.params)
return ConfigureInstance(self.params)
@ -281,17 +285,17 @@ class ReplugInstance(State):
class StopInstance(State):
def execute(self, action, worker_context):
self.instance.stop(worker_context)
if self.instance.state == instance_manager.GONE:
if self.instance.state == states.GONE:
# Force the action to delete since the router isn't there
# any more.
return DELETE
return action
def transition(self, action, worker_context):
if self.instance.state not in (instance_manager.DOWN,
instance_manager.GONE):
if self.instance.state not in (states.DOWN,
states.GONE):
return self
if self.instance.state == instance_manager.GONE:
if self.instance.state == states.GONE:
return Exit(self.params)
if action == DELETE:
return Exit(self.params)
@ -301,7 +305,7 @@ class StopInstance(State):
class RebuildInstance(State):
def execute(self, action, worker_context):
self.instance.stop(worker_context)
if self.instance.state == instance_manager.GONE:
if self.instance.state == states.GONE:
# Force the action to delete since the router isn't there
# any more.
return DELETE
@ -310,10 +314,10 @@ class RebuildInstance(State):
return CREATE
def transition(self, action, worker_context):
if self.instance.state not in (instance_manager.DOWN,
instance_manager.GONE):
if self.instance.state not in (states.DOWN,
states.GONE):
return self
if self.instance.state == instance_manager.GONE:
if self.instance.state == states.GONE:
return Exit(self.params)
return CreateInstance(self.params)
@ -325,7 +329,7 @@ class Exit(State):
class ConfigureInstance(State):
def execute(self, action, worker_context):
self.instance.configure(worker_context)
if self.instance.state == instance_manager.CONFIGURED:
if self.instance.state == states.CONFIGURED:
if action == READ:
return READ
else:
@ -334,15 +338,15 @@ class ConfigureInstance(State):
return action
def transition(self, action, worker_context):
if self.instance.state == instance_manager.REPLUG:
if self.instance.state == states.REPLUG:
return ReplugInstance(self.params)
if self.instance.state in (instance_manager.RESTART,
instance_manager.DOWN,
instance_manager.GONE):
if self.instance.state in (states.RESTART,
states.DOWN,
states.GONE):
return StopInstance(self.params)
if self.instance.state == instance_manager.UP:
if self.instance.state == states.UP:
return PushUpdate(self.params)
# Below here, assume instance.state == instance_manager.CONFIGURED
# Below here, assume instance.state == states.CONFIGURED
if action == READ:
return ReadStats(self.params)
return CalcAction(self.params)
@ -359,13 +363,14 @@ class ReadStats(State):
class Automaton(object):
def __init__(self, router_id, tenant_id,
def __init__(self, driver, resource_id, tenant_id,
delete_callback, bandwidth_callback,
worker_context, queue_warning_threshold,
reboot_error_threshold):
"""
:param router_id: UUID of the router being managed
:type router_id: str
:param driver: An instantiated driver object for the managed resource
:param resource_id: UUID of the resource being managed
:type resource_id: str
:param tenant_id: UUID of the tenant being managed
:type tenant_id: str
:param delete_callback: Invoked when the Automaton decides
@ -384,7 +389,8 @@ class Automaton(object):
the router puts it into an error state.
:type reboot_error_threshold: int
"""
self.router_id = router_id
self.driver = driver
self.resource_id = resource_id
self.tenant_id = tenant_id
self._delete_callback = delete_callback
self._queue_warning_threshold = queue_warning_threshold
@ -392,20 +398,17 @@ class Automaton(object):
self.deleted = False
self.bandwidth_callback = bandwidth_callback
self._queue = collections.deque()
self.log = logging.getLogger(__name__ + '.' + router_id)
self.action = POLL
self.instance = instance_manager.InstanceManager(router_id,
tenant_id,
self.log,
self.instance = instance_manager.InstanceManager(self.driver,
self.resource_id,
worker_context)
self._state_params = StateParams(
self.driver,
self.instance,
self.log,
self._queue,
self.bandwidth_callback,
self._reboot_error_threshold,
cfg.CONF.router_image_uuid
)
self.state = CalcAction(self._state_params)
@ -414,7 +417,7 @@ class Automaton(object):
def _do_delete(self):
if self._delete_callback is not None:
self.log.debug('calling delete callback')
self.driver.log.debug('calling delete callback')
self._delete_callback()
# Avoid calling the delete callback more than once.
self._delete_callback = None
@ -426,26 +429,26 @@ class Automaton(object):
while self._queue:
while True:
if self.deleted:
self.log.debug(
self.driver.log.debug(
'skipping update because the router is being deleted'
)
return
try:
self.log.debug('%s.execute(%s) instance.state=%s',
self.state,
self.action,
self.instance.state)
self.driver.log.debug('%s.execute(%s) instance.state=%s',
self.state,
self.action,
self.instance.state)
self.action = self.state.execute(
self.action,
worker_context,
)
self.log.debug('%s.execute -> %s instance.state=%s',
self.state,
self.action,
self.instance.state)
self.driver.log.debug('%s.execute -> %s instance.state=%s',
self.state,
self.action,
self.instance.state)
except:
self.log.exception(
self.driver.log.exception(
_LE('%s.execute() failed for action: %s'),
self.state,
self.action
@ -456,9 +459,13 @@ class Automaton(object):
self.action,
worker_context,
)
self.log.debug('%s.transition(%s) -> %s instance.state=%s',
old_state, self.action, self.state,
self.instance.state)
self.driver.log.debug(
'%s.transition(%s) -> %s instance.state=%s',
old_state,
self.action,
self.state,
self.instance.state
)
# Yield control each time we stop to figure out what
# to do next.
@ -475,7 +482,7 @@ class Automaton(object):
"Called when the worker put a message in the state machine queue"
if self.deleted:
# Ignore any more incoming messages
self.log.debug(
self.driver.log.debug(
'deleted state machine, ignoring incoming message %s',
message)
return False
@ -487,43 +494,43 @@ class Automaton(object):
# process something on a router that isn't going to actually
# do any work.
if message.crud == POLL and \
self.instance.state == instance_manager.ERROR:
self.log.info(_LI(
'Router status is ERROR, ignoring POLL message: %s'),
self.instance.state == states.ERROR:
self.driver.log.info(_LI(
'Resource status is ERROR, ignoring POLL message: %s'),
message,
)
return False
if message.crud == REBUILD:
if message.body.get('router_image_uuid'):
self.log.info(_LI(
'Router is being REBUILT with custom image %s'),
message.body['router_image_uuid']
if message.body.get('image_uuid'):
self.driver.log.info(_LI(
'Resource is being REBUILT with custom image %s'),
message.body['image_uuid']
)
self.router_image_uuid = message.body['router_image_uuid']
self.image_uuid = message.body['image_uuid']
else:
self.router_image_uuid = cfg.CONF.router_image_uuid
self.image_uuid = self.driver.image_uuid
self._queue.append(message.crud)
queue_len = len(self._queue)
if queue_len > self._queue_warning_threshold:
logger = self.log.warning
logger = self.driver.log.warning
else:
logger = self.log.debug
logger = self.driver.log.debug
logger(_LW('incoming message brings queue length to %s'), queue_len)
return True
@property
def router_image_uuid(self):
return self.state.params.router_image_uuid
def image_uuid(self):
return self.state.params.image_uuid
@router_image_uuid.setter
def router_image_uuid(self, value):
self.state.params.router_image_uuid = value
@image_uuid.setter
def image_uuid(self, value):
self.state.params.image_uuid = value
def has_more_work(self):
"Called to check if there are more messages in the state machine queue"
return (not self.deleted) and bool(self._queue)
def has_error(self):
return self.instance.state == instance_manager.ERROR
return self.instance.state == states.ERROR

View File

@ -15,7 +15,7 @@
# under the License.
"""Manage the routers for a given tenant.
"""Manage the resources for a given tenant.
"""
import collections
@ -25,6 +25,7 @@ from oslo_log import log as logging
from akanda.rug.common.i18n import _LE
from akanda.rug import state
from akanda.rug import drivers
from akanda.rug.openstack.common import timeutils
@ -35,7 +36,7 @@ class InvalidIncomingMessage(Exception):
pass
class RouterContainer(object):
class ResourceContainer(object):
def __init__(self):
self.state_machines = {}
@ -58,14 +59,14 @@ class RouterContainer(object):
with self.lock:
return list(self.state_machines.values())
def has_been_deleted(self, router_id):
"""Check if a router has been deleted.
def has_been_deleted(self, resource_id):
"""Check if a resource has been deleted.
:param router_id: The router's id to check against the deleted list
:returns: Returns True if the router_id has been deleted.
:param resource_id: The resource's id to check against the deleted list
:returns: Returns True if the resource_id has been deleted.
"""
with self.lock:
return router_id in self.deleted
return resource_id in self.deleted
def __getitem__(self, item):
with self.lock:
@ -80,8 +81,9 @@ class RouterContainer(object):
return item in self.state_machines
class TenantRouterManager(object):
"""Keep track of the state machines for the routers for a given tenant.
class TenantResourceManager(object):
"""Keep track of the state machines for the logical resources for a given
tenant.
"""
def __init__(self, tenant_id, notify_callback,
@ -91,62 +93,63 @@ class TenantRouterManager(object):
self.notify = notify_callback
self._queue_warning_threshold = queue_warning_threshold
self._reboot_error_threshold = reboot_error_threshold
self.state_machines = RouterContainer()
self._default_router_id = None
self.state_machines = ResourceContainer()
self._default_resource_id = None
def _delete_router(self, router_id):
"Called when the Automaton decides the router can be deleted"
if router_id in self.state_machines:
LOG.debug('deleting state machine for %s', router_id)
del self.state_machines[router_id]
if self._default_router_id == router_id:
self._default_router_id = None
def _delete_resource(self, resource_id):
"Called when the Automaton decides the resource can be deleted"
if resource_id in self.state_machines:
LOG.debug('deleting state machine for %s', resource_id)
del self.state_machines[resource_id]
if self._default_resource_id == resource_id:
self._default_resource_id = None
def shutdown(self):
LOG.info('shutting down')
for rid, sm in self.state_machines.items():
for resource_id, sm in self.state_machines.items():
try:
sm.service_shutdown()
except Exception:
LOG.exception(_LE(
'Failed to shutdown state machine for %s'), rid
'Failed to shutdown state machine for %s'), resource_id
)
def _report_bandwidth(self, router_id, bandwidth):
LOG.debug('reporting bandwidth for %s', router_id)
def _report_bandwidth(self, resource_id, bandwidth):
LOG.debug('reporting bandwidth for %s', resource_id)
msg = {
'tenant_id': self.tenant_id,
'timestamp': timeutils.isotime(),
'event_type': 'akanda.bandwidth.used',
'payload': dict((b.pop('name'), b) for b in bandwidth),
'router_id': router_id,
'uuid': resource_id,
}
self.notify(msg)
def get_state_machines(self, message, worker_context):
"""Return the state machines and the queue for sending it messages for
the router being addressed by the message.
the logical resource being addressed by the message.
"""
router_id = message.router_id
if not router_id:
LOG.error(_LE('Cannot get state machine for message with '
'no router_id'))
raise InvalidIncomingMessage()
# Ignore messages to deleted routers.
if self.state_machines.has_been_deleted(router_id):
LOG.debug('dropping message for deleted router')
return []
if (not message.resource or
(message.resource and not message.resource.id)):
LOG.error(_LE(
'Cannot get state machine for message with '
'no message.resource'))
raise InvalidIncomingMessage()
state_machines = []
# Send to all of our routers.
if router_id == '*':
# Send to all of our resources.
if message.resource.id == '*':
LOG.debug('routing to all state machines')
state_machines = self.state_machines.values()
# Send to routers that have an ERROR status
elif router_id == 'error':
# Ignore messages to deleted resources.
elif self.state_machines.has_been_deleted(message.resource.id):
LOG.debug('dropping message for deleted resource')
return []
# Send to resources that have an ERROR status
elif message.resource.id == 'error':
state_machines = [
sm for sm in self.state_machines.values()
if sm.has_error()
@ -155,14 +158,31 @@ class TenantRouterManager(object):
len(state_machines))
# Create a new state machine for this router.
elif router_id not in self.state_machines:
LOG.debug('creating state machine for %s', router_id)
elif message.resource.id not in self.state_machines:
LOG.debug('creating state machine for %s', message.resource.id)
# load the driver
if not message.resource.driver:
LOG.error(_LE('cannot create state machine without specifying'
'a driver.'))
return []
driver_obj = \
drivers.get(message.resource.driver)(worker_context,
message.resource.id)
if not driver_obj:
# this means the driver didn't load for some reason..
# this might not be needed at all.
LOG.debug('for some reason loading the driver failed')
return []
def deleter():
self._delete_router(router_id)
self._delete_resource(message.resource.id)
sm = state.Automaton(
router_id=router_id,
new_state_machine = state.Automaton(
driver=driver_obj,
resource_id=message.resource.id,
tenant_id=self.tenant_id,
delete_callback=deleter,
bandwidth_callback=self._report_bandwidth,
@ -170,18 +190,24 @@ class TenantRouterManager(object):
queue_warning_threshold=self._queue_warning_threshold,
reboot_error_threshold=self._reboot_error_threshold,
)
self.state_machines[router_id] = sm
state_machines = [sm]
self.state_machines[message.resource.id] = new_state_machine
state_machines = [new_state_machine]
# Send directly to an existing router.
elif router_id:
sm = self.state_machines[router_id]
state_machines = [sm]
elif message.resource.id:
state_machines = [self.state_machines[message.resource.id]]
# Filter out any deleted state machines.
return [
machine
for machine in state_machines
if (not machine.deleted and
not self.state_machines.has_been_deleted(machine.router_id))
not self.state_machines.has_been_deleted(machine.resource_id))
]
def get_state_machine_by_resource_id(self, resource_id):
try:
return self.state_machines[resource_id]
except KeyError:
return None
return self.state_machines.get_by_resource_id(resource_id)

View File

@ -19,12 +19,13 @@ import copy
import mock
import netaddr
import unittest2 as unittest
from akanda.rug.test.unit import base
from akanda.rug.api import neutron
class TestuNeutronModels(unittest.TestCase):
class TestuNeutronModels(base.RugTestBase):
def test_router(self):
r = neutron.Router(
'1', 'tenant_id', 'name', True, 'ACTIVE', 'ext', ['int'], ['fip'])
@ -243,7 +244,7 @@ class FakeConf:
auth_region = 'RegionOne'
class TestNeutronWrapper(unittest.TestCase):
class TestNeutronWrapper(base.RugTestBase):
@mock.patch('akanda.rug.api.neutron.cfg')
@mock.patch('akanda.rug.api.neutron.AkandaExtClientWrapper')
@ -268,14 +269,14 @@ class TestNeutronWrapper(unittest.TestCase):
@mock.patch('akanda.rug.api.neutron.AkandaExtClientWrapper')
def test_neutron_router_status_update_error(self, client_wrapper):
urs = client_wrapper.return_value.update_router_status
urs = client_wrapper.return_value.update_status
urs.side_effect = RuntimeError('should be caught')
conf = mock.Mock()
neutron_wrapper = neutron.Neutron(conf)
neutron_wrapper.update_router_status('router-id', 'new-status')
class TestExternalPort(unittest.TestCase):
class TestExternalPort(base.RugTestBase):
EXTERNAL_NET_ID = 'a0c63b93-2c42-4346-909e-39c690f53ba0'
EXTERNAL_PORT_ID = '089ae859-10ec-453c-b264-6c452fc355e5'
@ -361,6 +362,7 @@ class TestExternalPort(unittest.TestCase):
]
def setUp(self):
super(TestExternalPort, self).setUp()
self.conf = mock.Mock()
self.conf.external_network_id = 'ext'
self.conf.max_retries = 3

View File

@ -136,7 +136,7 @@ class TestNovaWrapper(unittest.TestCase):
mock_userdata.return_value = 'fake_userdata'
expected = [
mock.call.servers.create(
'ak-router_id',
'ak-instance-name',
nics=[{'port-id': '2',
'net-id': 'mgt-net',
'v4-fixed-ip': ''},
@ -154,7 +154,8 @@ class TestNovaWrapper(unittest.TestCase):
]
self.nova.create_instance(
'router_id', 'GLANCE-IMAGE-123', fake_make_ports_callback)
'ak-instance-name', 'GLANCE-IMAGE-123',
1, fake_make_ports_callback)
self.client.assert_has_calls(expected)
def test_get_instance_for_obj(self):
@ -162,10 +163,10 @@ class TestNovaWrapper(unittest.TestCase):
self.client.servers.list.return_value = [instance]
expected = [
mock.call.servers.list(search_opts={'name': 'ak-router_id'})
mock.call.servers.list(search_opts={'name': 'foo_instance_name'})
]
result = self.nova.get_instance_for_obj('router_id')
result = self.nova.get_instance_for_obj('foo_instance_name')
self.client.assert_has_calls(expected)
self.assertEqual(result, instance)
@ -173,10 +174,10 @@ class TestNovaWrapper(unittest.TestCase):
self.client.servers.list.return_value = []
expected = [
mock.call.servers.list(search_opts={'name': 'ak-router_id'})
mock.call.servers.list(search_opts={'name': 'foo_instance_name'})
]
result = self.nova.get_instance_for_obj('router_id')
result = self.nova.get_instance_for_obj('foo_instance_name')
self.client.assert_has_calls(expected)
self.assertIsNone(result)
@ -195,30 +196,30 @@ class TestNovaWrapper(unittest.TestCase):
result = self.nova.get_instance_by_id('instance_id')
self.assertEqual(result, None)
def test_destroy_router_instance(self):
def test_destroy_instance(self):
self.nova.destroy_instance(self.INSTANCE_INFO)
self.client.servers.delete.assert_called_with(self.INSTANCE_INFO.id_)
@mock.patch.object(nova, '_router_ssh_key')
@mock.patch.object(nova, '_ssh_key')
def test_format_userdata(self, fake_ssh_key):
fake_ssh_key.return_value = 'fake_key'
result = nova._format_userdata(fake_int_port)
self.assertEqual(result.strip(), EXPECTED_USERDATA.strip())
@mock.patch.object(__builtins__, 'open', autospec=True)
def test_router_ssh_key(self, fake_open):
def test_ssh_key(self, fake_open):
mock_key_file = mock.MagicMock(spec=file)
mock_key_file.read.return_value = 'fake-key'
mock_key_file.__enter__.return_value = mock_key_file
fake_open.return_value = mock_key_file
result = nova._router_ssh_key()
result = nova._ssh_key()
self.assertEqual(result, 'fake-key')
@mock.patch.object(nova, 'LOG', autospec=True)
@mock.patch.object(__builtins__, 'open', autospec=True)
def test_router_ssh_key_not_found(self, fake_open, fake_log):
def test_ssh_key_not_found(self, fake_open, fake_log):
fake_open.side_effect = IOError
result = nova._router_ssh_key()
result = nova._ssh_key()
self.assertEqual(result, '')
self.assertTrue(fake_log.warning.called)
@ -228,16 +229,18 @@ class TestNovaWrapper(unittest.TestCase):
fake_create_instance.return_value = 'fake_new_instance_info'
res = self.nova.boot_instance(
prev_instance_info=None,
router_id='foo_router_id',
router_image_uuid='foo_image',
name='foo_instance_name',
image_uuid='foo_image',
flavor='foo_flavor',
make_ports_callback='foo_callback',
)
fake_create_instance.assert_called_with(
'foo_router_id',
'foo_instance_name',
'foo_image',
'foo_flavor',
'foo_callback',
)
fake_get.assert_called_with('foo_router_id')
fake_get.assert_called_with('foo_instance_name')
self.assertEqual(res, 'fake_new_instance_info')
@mock.patch.object(nova.Nova, 'create_instance')
@ -250,11 +253,12 @@ class TestNovaWrapper(unittest.TestCase):
fake_create_instance.return_value = 'fake_new_instance_info'
res = self.nova.boot_instance(
prev_instance_info=None,
router_id='foo_router_id',
router_image_uuid='foo_image',
name='foo_instance_name',
image_uuid='foo_image',
flavor='foo_flavor',
make_ports_callback='foo_callback',
)
fake_get.assert_called_with('foo_router_id')
fake_get.assert_called_with('foo_instance_name')
self.client.servers.delete.assert_called_with('existing_instance_id')
self.assertIsNone(res)
@ -268,11 +272,12 @@ class TestNovaWrapper(unittest.TestCase):
fake_create_instance.return_value = 'fake_new_instance_info'
res = self.nova.boot_instance(
prev_instance_info=None,
router_id='foo_router_id',
router_image_uuid='foo_image',
name='foo_instance_name',
image_uuid='foo_image',
flavor='foo_flavor',
make_ports_callback='foo_callback',
)
fake_get.assert_called_with('foo_router_id')
fake_get.assert_called_with('foo_instance_name')
self.assertIsInstance(res, nova.InstanceInfo)
self.assertEqual(res.id_, 'existing_instance_id')
self.assertEqual(res.name, 'ak-appliance')
@ -284,14 +289,16 @@ class TestNovaWrapper(unittest.TestCase):
fake_create_instance.return_value = 'fake_new_instance_info'
res = self.nova.boot_instance(
prev_instance_info=self.INSTANCE_INFO,
router_id='foo_router_id',
router_image_uuid='foo_image',
name='foo_instance_name',
image_uuid='foo_image',
flavor='foo_flavor',
make_ports_callback='foo_callback',
)
fake_get.assert_called_with(self.INSTANCE_INFO.id_)
fake_create_instance.assert_called_with(
'foo_router_id',
'foo_instance_name',
'foo_image',
'foo_flavor',
'foo_callback',
)
self.assertEqual(res, 'fake_new_instance_info')
@ -307,8 +314,9 @@ class TestNovaWrapper(unittest.TestCase):
fake_create_instance.return_value = 'fake_new_instance_info'
res = self.nova.boot_instance(
prev_instance_info=self.INSTANCE_INFO,
router_id='foo_router_id',
router_image_uuid='foo_image',
name='foo_instance_name',
image_uuid='foo_image',
flavor='foo_flavor',
make_ports_callback='foo_callback',
)
fake_get.assert_called_with(self.INSTANCE_INFO.id_)
@ -316,7 +324,7 @@ class TestNovaWrapper(unittest.TestCase):
self.assertIsNone(res)
@mock.patch.object(nova.Nova, 'create_instance')
@mock.patch.object(nova.Nova, 'get_instance_by_id')
@mock.patch.object(nova.Nova, 'get_instance_for_obj')
def test_boot_instance_exists_build_prev_inst(self, fake_get,
fake_create_instance):
fake_instance = fake_nova_instance
@ -325,12 +333,13 @@ class TestNovaWrapper(unittest.TestCase):
fake_get.return_value = fake_instance
fake_create_instance.return_value = 'fake_new_instance_info'
res = self.nova.boot_instance(
prev_instance_info=self.INSTANCE_INFO,
router_id='foo_router_id',
router_image_uuid='foo_image',
prev_instance_info=None,
name='foo_instance_name',
image_uuid='foo_image',
flavor='foo_flavor',
make_ports_callback='foo_callback',
)
# assert we get back the same instance_info but with updated status
self.assertEqual(res.nova_status, 'BUILD')
self.assertEqual(res.id_, self.INSTANCE_INFO.id_)
self.assertEqual(res.id_, fake_instance.id)
self.assertIsInstance(res, nova.InstanceInfo)

View File

@ -42,33 +42,33 @@ class TestDBDebugModes(base.DbTestCase):
self.assertFalse(enabled)
self.assertIsNone(reason)
def test_router_debug(self):
def test_resource_debug(self):
r_id = uuid.uuid4().hex
self.dbapi.enable_router_debug(
router_uuid=r_id)
enabled, reason = self.dbapi.router_in_debug(
router_uuid=r_id)
self.dbapi.enable_resource_debug(
resource_uuid=r_id)
enabled, reason = self.dbapi.resource_in_debug(
resource_uuid=r_id)
self.assertTrue(enabled)
self.assertIsNone(reason)
self.dbapi.router_in_debug('foo_router')
self.dbapi.resource_in_debug('foo_resource')
def test_router_debug_with_reason(self):
def test_resource_debug_with_reason(self):
r_id = uuid.uuid4().hex
self.dbapi.enable_router_debug(
router_uuid=r_id, reason='foo')
enabled, reason = self.dbapi.router_in_debug(
router_uuid=r_id)
self.dbapi.enable_resource_debug(
resource_uuid=r_id, reason='foo')
enabled, reason = self.dbapi.resource_in_debug(
resource_uuid=r_id)
self.assertTrue(enabled)
self.assertEqual(reason, 'foo')
def test_routers_in_debug(self):
def test_resources_in_debug(self):
r_ids = [uuid.uuid4().hex for i in range(1, 3)]
for r_id in r_ids:
self.dbapi.enable_router_debug(
router_uuid=r_id, reason='router %s is broken' % r_id)
for debug_r_id, reason in self.dbapi.routers_in_debug():
self.dbapi.enable_resource_debug(
resource_uuid=r_id, reason='resource %s is broken' % r_id)
for debug_r_id, reason in self.dbapi.resources_in_debug():
self.assertIn(debug_r_id, r_ids)
self.assertEqual(reason, 'router %s is broken' % debug_r_id)
self.assertEqual(reason, 'resource %s is broken' % debug_r_id)
def test_tenant_debug(self):
t_id = uuid.uuid4().hex

View File

View File

@ -0,0 +1,43 @@
# Copyright (c) 2015 Akanda, Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from akanda.rug.test.unit import base
from akanda.rug import drivers
class DriverFactoryTest(base.RugTestBase):
def test_get_driver(self):
for k, v in drivers.AVAILABLE_DRIVERS.iteritems():
self.assertEqual(drivers.get(k), v)
def test_get_bad_driver(self):
self.assertRaises(
drivers.InvalidDriverException,
drivers.get, 'foodriver'
)
def test_enabled_drivers(self):
all_driver_cfg = drivers.AVAILABLE_DRIVERS.keys()
all_driver_obj = drivers.AVAILABLE_DRIVERS.values()
self.config(enabled_drivers=all_driver_cfg)
enabled_drivers = [d for d in drivers.enabled_drivers()]
self.assertEqual(set(all_driver_obj), set(enabled_drivers))
def test_enabled_drivers_nonexistent_left_out(self):
all_driver_cfg = drivers.AVAILABLE_DRIVERS.keys() + ['foodriver']
all_driver_obj = drivers.AVAILABLE_DRIVERS.values()
self.config(enabled_drivers=all_driver_cfg)
enabled_drivers = [d for d in drivers.enabled_drivers()]
self.assertEqual(set(all_driver_obj), set(enabled_drivers))

View File

@ -0,0 +1,407 @@
# Copyright (c) 2015 Akanda, Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from neutronclient.common import exceptions as neutron_exceptions
from akanda.rug import event
from akanda.rug.api import neutron
from akanda.rug.drivers import router, states
from akanda.rug.test.unit import base, fakes
class RouterDriverTest(base.RugTestBase):
def setUp(self):
super(RouterDriverTest, self).setUp()
self.router_id = 'fake_router_id'
self.image_uuid = 'fake_router_image_uuid'
self.flavor = 'fake_router_flavor'
self.mgt_port = '5555'
self.ctx = mock.Mock()
self.config(group='router', image_uuid=self.image_uuid)
self.config(group='router', instance_flavor=self.flavor)
self.config(group='router', mgt_service_port=self.mgt_port)
self.ctx = fakes.fake_worker_context()
self.addCleanup(mock.patch.stopall)
def _init_driver(self):
return router.Router(
worker_context=self.ctx,
id=self.router_id,
)
@mock.patch('akanda.rug.drivers.router.Router.post_init')
def test_init(self, mock_post_init):
rtr = self._init_driver()
rtr.post_init = mock.Mock()
self.assertEqual(
rtr.name,
'ak-%s-%s' % (rtr.RESOURCE_NAME, self.router_id))
mock_post_init.assert_called_with(self.ctx)
@mock.patch('akanda.rug.drivers.router.Router._ensure_cache')
def test_post_init(self, mock_ensure_cache):
rtr = self._init_driver()
rtr.post_init(self.ctx)
self.assertEqual(rtr.image_uuid, self.image_uuid)
self.assertEqual(rtr.flavor, self.flavor)
self.assertEqual(rtr.mgt_port, self.mgt_port)
mock_ensure_cache.assert_called_with(self.ctx)
def test__ensure_cache_no_router(self):
self.ctx.neutron.get_router_detail.return_value = None
rtr = self._init_driver()
self.assertEqual(rtr._router, None)
def test__ensure_cache_with_router(self):
rtr = self._init_driver()
self.ctx.neutron.get_router_detail.return_value = 'fake_router'
rtr._ensure_cache(self.ctx)
self.assertEqual(rtr._router, 'fake_router')
def test_ports_no_router(self):
rtr = self._init_driver()
self.assertEqual(rtr.ports, [])
def test_ports_with_router(self):
rtr = self._init_driver()
fake_router_obj = fakes.fake_router()
rtr._router = fake_router_obj
self.assertEqual(set(rtr.ports), set(fake_router_obj.ports))
@mock.patch('akanda.rug.drivers.router.Router.pre_plug')
def test_pre_boot(self, mock_pre_plug):
rtr = self._init_driver()
rtr.pre_boot(self.ctx)
mock_pre_plug.assert_called_with(self.ctx)
@mock.patch('akanda.rug.api.configuration.build_config')
@mock.patch('akanda.rug.drivers.router.Router._ensure_cache')
def test_build_config(self, mock_ensure_cache, mock_build_config):
rtr = self._init_driver()
fake_router_obj = fakes.fake_router()
fake_mgt_port = mock.Mock()
fake_iface_map = mock.Mock()
rtr._router = fake_router_obj
mock_build_config.return_value = 'fake_config'
res = rtr.build_config(self.ctx, fake_mgt_port, fake_iface_map)
self.assertTrue(mock_ensure_cache.called)
mock_build_config.return_value = 'fake_config'
mock_build_config.assert_called_with(
self.ctx.neutron, rtr._router, fake_mgt_port, fake_iface_map)
self.assertEqual(res, 'fake_config')
@mock.patch('akanda.rug.api.akanda_client.update_config')
def test_update_config(self, mock_update_config):
rtr = self._init_driver()
rtr.update_config(management_address='10.0.0.1', config='fake_config')
mock_update_config.assert_called_with(
'10.0.0.1',
rtr.mgt_port,
'fake_config',)
def test_pre_plug_no_external_port(self):
rtr = self._init_driver()
fake_router_obj = fakes.fake_router()
fake_router_obj.external_port = None
rtr._router = fake_router_obj
self.ctx.neutron.create_router_external_port.return_value = 'fake_port'
rtr.pre_plug(self.ctx)
self.ctx.neutron.create_router_external_port.assert_called_with(
fake_router_obj,
)
self.assertEqual(rtr._router.external_port, 'fake_port')
def test_pre_plug_with_external_port(self):
rtr = self._init_driver()
fake_router_obj = fakes.fake_router()
fake_router_obj.external_port = 'fake_port'
rtr.pre_plug(self.ctx)
self.assertFalse(self.ctx.neutron.create_router_external_port.called)
@mock.patch('akanda.rug.drivers.router.Router._ensure_cache')
def test_make_ports(self, mock_ensure_cache):
rtr = self._init_driver()
fake_router_obj = fakes.fake_router()
rtr._router = fake_router_obj
self.ctx.neutron.create_management_port.return_value = 'fake_mgt_port'
self.ctx.neutron.create_vrrp_port.side_effect = [
'fake_port_%s' % p.network_id for p in fake_router_obj.ports
]
callback = rtr.make_ports(self.ctx)
res = callback()
expected_instance_ports = [
'fake_port_%s' % p.network_id for p in fake_router_obj.ports
]
self.assertEqual(res, ('fake_mgt_port', expected_instance_ports))
@mock.patch('akanda.rug.api.neutron.Neutron')
def test_pre_populate_retry_loop(self, mocked_neutron_api):
neutron_client = mock.Mock()
returned_value = [Exception, []]
neutron_client.get_routers.side_effect = returned_value
mocked_neutron_api.return_value = neutron_client
rtr = self._init_driver()
with mock.patch('time.sleep'):
rtr.pre_populate_hook()
self.assertEqual(
neutron_client.get_routers.call_args_list,
[
mock.call(detailed=False)
for value in xrange(len(returned_value))
]
)
self.assertEqual(
neutron_client.get_routers.call_count,
len(returned_value)
)
def _exit_loop_bad_auth(self, mocked_neutron_api, log, exc):
neutron_client = mock.Mock()
neutron_client.get_routers.side_effect = exc
mocked_neutron_api.return_value = neutron_client
rtr = self._init_driver()
rtr.pre_populate_hook()
log.warning.assert_called_once_with(
'PrePopulateWorkers thread failed: %s',
mock.ANY
)
@mock.patch('akanda.rug.drivers.router.LOG')
@mock.patch('akanda.rug.api.neutron.Neutron')
def test_pre_populate_unauthorized(self, mocked_neutron_api, log):
exc = neutron_exceptions.Unauthorized
self._exit_loop_bad_auth(mocked_neutron_api, log, exc)
@mock.patch('akanda.rug.drivers.router.LOG')
@mock.patch('akanda.rug.api.neutron.Neutron')
def test_pre_populate_forbidden(self, mocked_neutron_api, log):
exc = neutron_exceptions.Forbidden
self._exit_loop_bad_auth(mocked_neutron_api, log, exc)
@mock.patch('akanda.rug.drivers.router.LOG.warning')
@mock.patch('akanda.rug.drivers.router.LOG.debug')
@mock.patch('akanda.rug.api.neutron.Neutron')
def test_pre_populate_retry_loop_logging(
self, mocked_neutron_api, log_debug, log_warning):
neutron_client = mock.Mock()
message = mock.Mock(tenant_id='1', id='2')
returned_value = [
neutron_exceptions.NeutronClientException,
[message]
]
neutron_client.get_routers.side_effect = returned_value
mocked_neutron_api.return_value = neutron_client
rtr = self._init_driver()
with mock.patch('time.sleep'):
res = rtr.pre_populate_hook()
self.assertEqual(2, log_warning.call_count)
expected_resource = event.Resource(
driver=rtr.RESOURCE_NAME,
id='2',
tenant_id='1',
)
self.assertEqual(res, [expected_resource])
def test_get_resource_id_for_tenant(self):
fake_router = fakes.fake_router()
self.ctx.neutron.get_router_for_tenant.return_value = fake_router
res = router.Router.get_resource_id_for_tenant(
self.ctx, 'fake_tenant_id', 'fake_message')
self.assertEqual(res, fake_router.id)
self.ctx.neutron.get_router_for_tenant.assert_called_with(
'fake_tenant_id')
def test_get_resource_id_for_tenant_no_router(self):
self.ctx.neutron.get_router_for_tenant.return_value = None
res = router.Router.get_resource_id_for_tenant(
self.ctx, 'fake_tenant_id', 'fake_message')
self.assertEqual(res, None)
self.ctx.neutron.get_router_for_tenant.assert_called_with(
'fake_tenant_id')
def _test_notification(self, event_type, payload, expected):
tenant_id = 'fake_tenant_id'
res = router.Router.process_notification(
tenant_id, event_type, payload)
self.assertEqual(res, expected)
def test_process_notification_routerstatus(self):
self._test_notification('routerstatus.update', {}, None)
def test_process_notification_router_create(self):
payload = {'router': {'id': 'fake_router_id'}}
r = event.Resource(
driver=router.Router.RESOURCE_NAME,
id='fake_router_id',
tenant_id='fake_tenant_id')
e = event.Event(
resource=r,
crud=event.CREATE,
body=payload,
)
self._test_notification('router.create.end', payload, e)
def test_process_notification_router_delete(self):
payload = {'router_id': 'fake_router_id'}
r = event.Resource(
driver=router.Router.RESOURCE_NAME,
id='fake_router_id',
tenant_id='fake_tenant_id')
e = event.Event(
resource=r,
crud=event.DELETE,
body=payload,
)
self._test_notification('router.delete.end', payload, e)
def test_process_notification_interface_notifications(self):
for notification in router._ROUTER_INTERFACE_NOTIFICATIONS:
payload = {'router.interface': {'id': 'fake_router_id'}}
r = event.Resource(
driver=router.Router.RESOURCE_NAME,
id='fake_router_id',
tenant_id='fake_tenant_id')
e = event.Event(
resource=r,
crud=event.UPDATE,
body=payload,
)
self._test_notification(notification, payload, e)
def test_process_notification_interesting_notifications(self):
for notification in router._ROUTER_INTERESTING_NOTIFICATIONS:
payload = {'router': {'id': 'fake_router_id'}}
r = event.Resource(
driver=router.Router.RESOURCE_NAME,
id='fake_router_id',
tenant_id='fake_tenant_id')
e = event.Event(
resource=r,
crud=event.UPDATE,
body=payload,
)
self._test_notification(notification, payload, e)
def test_process_notification_arbitrary_end_event(self):
payload = {'router': {'id': 'fake_router_id'}}
r = event.Resource(
driver=router.Router.RESOURCE_NAME,
id='fake_router_id',
tenant_id='fake_tenant_id')
e = event.Event(
resource=r,
crud=event.UPDATE,
body=payload,
)
self._test_notification('foo.bar.end', payload, e)
def test_process_notification_not_subscribed(self):
payload = {'router': {'id': 'fake_router_id'}}
self._test_notification('whocares.about.this', payload, None)
@mock.patch('akanda.rug.drivers.router.Router._ensure_cache')
def test_get_state_no_router(self, mock_ensure_cache):
rtr = self._init_driver()
rtr._router = None
self.assertEqual(
rtr.get_state(self.ctx),
states.GONE,
)
mock_ensure_cache.assert_called_with(self.ctx)
@mock.patch('akanda.rug.drivers.router.Router._ensure_cache')
def test_get_state(self, mock_ensure_cache):
rtr = self._init_driver()
fake_router = fakes.fake_router()
rtr._router = fake_router
self.assertEqual(
rtr.get_state(self.ctx),
fake_router.status,
)
mock_ensure_cache.assert_called_with(self.ctx)
@mock.patch('akanda.rug.drivers.router.Router._ensure_cache')
def test_synchronize_state_no_router(self, mock_ensure_cache):
rtr = self._init_driver()
rtr._router = None
rtr.synchronize_state(self.ctx, states.DOWN)
mock_ensure_cache.assert_called_with(self.ctx)
self.assertFalse(self.ctx.neutron.update_router_status.called)
@mock.patch('akanda.rug.drivers.router.Router._ensure_cache')
def test_synchronize_state(self, mock_ensure_cache):
rtr = self._init_driver()
fake_router_obj = fakes.fake_router()
rtr._router = fake_router_obj
rtr.synchronize_state(self.ctx, states.CONFIGURED)
mock_ensure_cache.assert_called_with(self.ctx)
self.ctx.neutron.update_router_status.assert_called_with(
rtr.id,
'ACTIVE',
)
self.assertEquals(rtr._last_synced_status, 'ACTIVE')
@mock.patch('akanda.rug.drivers.router.Router._ensure_cache')
def test_synchronize_state_no_change(self, mock_ensure_cache):
rtr = self._init_driver()
fake_router_obj = fakes.fake_router()
rtr._router = fake_router_obj
rtr._last_synced_status = 'ACTIVE'
rtr.synchronize_state(self.ctx, states.CONFIGURED)
mock_ensure_cache.assert_called_with(self.ctx)
self.assertFalse(self.ctx.neutron.update_router_status.called)
@mock.patch('akanda.rug.api.akanda_client.get_interfaces')
def test_get_interfaces(self, mock_get_interfaces):
mock_get_interfaces.return_value = ['fake_interface']
rtr = self._init_driver()
self.assertEquals(
rtr.get_interfaces('fake_mgt_addr'), ['fake_interface'])
mock_get_interfaces.assert_called_with(
'fake_mgt_addr', self.mgt_port)
@mock.patch('akanda.rug.api.akanda_client.is_alive')
def test_is_alive(self, mock_is_alive):
mock_is_alive.return_value = False
rtr = self._init_driver()
self.assertFalse(rtr.is_alive('fake_mgt_addr'))
mock_is_alive.assert_called_with(
'fake_mgt_addr', self.mgt_port)
def test_post_boot(self):
self._init_driver().post_boot(self.ctx)
def test__ensure_cache(self):
rtr = self._init_driver()
self.ctx.neutron.get_router_detail.return_value = 'fake_router'
rtr._ensure_cache(self.ctx)
self.assertEqual(rtr._router, 'fake_router')
self.ctx.neutron.get_router_detail.assert_called_with(rtr.id)
def test__ensure_cache_not_found(self):
rtr = self._init_driver()
self.ctx.neutron.get_router_detail.side_effect = [neutron.RouterGone]
rtr._ensure_cache(self.ctx)
self.assertEqual(rtr._router, None)
self.ctx.neutron.get_router_detail.assert_called_with(rtr.id)

View File

@ -0,0 +1,86 @@
# Copyright (c) 2015 Akanda, Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from akanda.rug.drivers import base
from akanda.rug.api import neutron, nova
from akanda.rug import worker
def fake_router():
router_gateway_port = {
'id': 'ext',
'name': 'router_gateway_port',
'device_id': 'device_id',
'fixed_ips': [],
'mac_address': 'aa:bb:cc:dd:ee:ff',
'network_id': 'net_id',
'device_owner': 'network:router_gateway'
}
router_internal_port = {
'id': 'ext',
'name': 'router_internal_port',
'device_id': 'device_id',
'fixed_ips': [],
'mac_address': 'aa:bb:cc:dd:ee:ff',
'network_id': 'net_id',
'device_owner': 'network:router_interface'
}
router_fip = {
'id': 'fip',
'floating_ip_address': '9.9.9.9',
'fixed_ip_address': '192.168.1.1'
}
router_dict = {
'id': '1',
'tenant_id': 'tenant_id',
'name': 'name',
'admin_state_up': True,
'status': 'ACTIVE',
'gw_port': router_gateway_port,
'_interfaces': [router_internal_port],
'_floatingips': [router_fip]
}
return neutron.Router.from_dict(router_dict)
def fake_driver(resource_id=None):
"""A factory for generating fake driver instances suitable for testing"""
fake_driver = mock.Mock(base.BaseDriver, autospec=True)
fake_driver.RESOURCE_NAME = 'FakeDriver'
fake_driver.id = resource_id or 'fake_resource_id'
fake_driver.log = mock.Mock()
fake_driver.flavor = 'fake_flavor'
fake_driver.name = 'ak-FakeDriver-fake_resource_id'
fake_driver.image_uuid = 'fake_image_uuid'
fake_driver.make_ports.return_value = 'fake_ports_callback'
return fake_driver
def fake_worker_context():
"""Patches client API libs in the worker context.
Caller should addCleanup(mock.patch.stopall).
"""
fake_neutron_obj = mock.patch.object(
neutron, 'Neutron', autospec=True).start()
mock.patch.object(
neutron, 'Neutron', return_value=fake_neutron_obj).start()
fake_nova_obj = mock.patch.object(
nova, 'Nova', autospec=True).start()
mock.patch.object(
nova, 'Nova', return_value=fake_nova_obj).start()
return worker.WorkerContext()

View File

@ -0,0 +1,50 @@
# Copyright (c) 2015 Akanda, Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from akanda.rug import event
from akanda.rug import health
from akanda.rug.test.unit import base
class BreakLoop(Exception):
pass
class HealthTest(base.RugTestBase):
@mock.patch('time.sleep')
def test_health_inspector(self, fake_sleep):
fake_scheduler = mock.Mock(
handle_message=mock.Mock()
)
# raise the exception to break out of the while loop.
fake_scheduler.handle_message.side_effect = BreakLoop()
try:
health._health_inspector(fake_scheduler)
except BreakLoop:
pass
exp_res = event.Resource(
id='*',
tenant_id='*',
driver='*',
)
exp_event = event.Event(
resource=exp_res,
crud=event.POLL,
body={},
)
fake_scheduler.handle_message.assert_called_with('*', exp_event)

View File

@ -1,4 +1,5 @@
# Copyright 2014 DreamHost, LLC
#
# Author: DreamHost, LLC
#
@ -21,10 +22,12 @@ import unittest2 as unittest
from datetime import datetime, timedelta
from akanda.rug import instance_manager
from akanda.rug.api import neutron, nova
from akanda.rug.api import nova
from akanda.rug.drivers import states
from akanda.rug.test.unit import fakes
instance_manager.RETRY_DELAY = 0.4
instance_manager.BOOT_WAIT = 1
states.RETRY_DELAY = 0.4
states.BOOT_WAIT = 1
LOG = logging.getLogger(__name__)
@ -63,6 +66,7 @@ fake_add_port = FakeModel(
class TestInstanceManager(unittest.TestCase):
def setUp(self):
self.fake_driver = fakes.fake_driver()
self.ctx = mock.Mock()
self.neutron = self.ctx.neutron
self.conf = mock.patch.object(instance_manager.cfg, 'CONF').start()
@ -77,21 +81,31 @@ class TestInstanceManager(unittest.TestCase):
'update_state'
)
ports = [fake_int_port, fake_ext_port]
self.fake_driver.get_interfaces.return_value = [
{'ifname': 'ge0', 'lladdr': fake_mgt_port.mac_address},
{'ifname': 'ge1', 'lladdr': fake_ext_port.mac_address},
{'ifname': 'ge2', 'lladdr': fake_int_port.mac_address},
]
self.fake_driver.ports = ports
self.INSTANCE_INFO = nova.InstanceInfo(
instance_id='fake_instance_id',
name='fake_name',
image_uuid='fake_image_id',
name='ak-router-83f16d4c-66d8-11e5-938a-525400cfc326',
management_port=fake_mgt_port,
ports=[fake_int_port, fake_ext_port, fake_mgt_port],
image_uuid='9f3dbe8e-66d8-11e5-9952-525400cfc326',
booting=False,
last_boot=(datetime.utcnow() - timedelta(minutes=15)),
ports=[fake_int_port, fake_ext_port, fake_mgt_port],
management_port=fake_mgt_port,
)
self.mock_update_state = self.update_state_p.start()
self.instance_mgr = instance_manager.InstanceManager('the_id',
'tenant_id',
self.log,
self.ctx)
self.instance_mgr = instance_manager.InstanceManager(
self.fake_driver,
'fake_resource_id',
self.ctx
)
self.instance_mgr.instance_info = self.INSTANCE_INFO
mock.patch.object(self.instance_mgr, '_ensure_cache', mock.Mock)
@ -103,40 +117,36 @@ class TestInstanceManager(unittest.TestCase):
return self.instance_mgr.state
self.mock_update_state.side_effect = next_state
@mock.patch('akanda.rug.instance_manager.router_api')
def test_update_state_is_alive(self, router_api):
def test_update_state_is_alive(self):
self.update_state_p.stop()
router_api.is_alive.return_value = True
self.fake_driver.is_alive.return_value = True
self.assertEqual(self.instance_mgr.update_state(self.ctx),
instance_manager.UP)
router_api.is_alive.assert_called_once_with(
self.INSTANCE_INFO.management_address,
self.conf.akanda_mgt_service_port)
states.UP)
self.fake_driver.is_alive.assert_called_once_with(
self.INSTANCE_INFO.management_address)
@mock.patch('time.sleep', lambda *a: None)
@mock.patch('akanda.rug.instance_manager.router_api')
@mock.patch('akanda.rug.api.configuration.build_config')
def test_router_status_sync(self, config, router_api):
def test_router_status_sync(self):
self.update_state_p.stop()
router_api.is_alive.return_value = False
rtr = mock.sentinel.router
rtr.id = 'R1'
rtr.management_port = mock.Mock()
rtr.external_port = mock.Mock()
self.ctx.neutron.get_router_detail.return_value = rtr
n = self.neutron
self.fake_driver.is_alive.return_value = False
# Router state should start down
self.instance_mgr.update_state(self.ctx)
n.update_router_status.assert_called_once_with('R1', 'DOWN')
n.update_router_status.reset_mock()
self.fake_driver.synchronize_state.assert_called_with(
self.ctx,
state='down',
)
self.fake_driver.synchronize_state.reset_mock()
# Bring the router to UP with `is_alive = True`
router_api.is_alive.return_value = True
self.fake_driver.is_alive.return_value = True
self.instance_mgr.update_state(self.ctx)
n.update_router_status.assert_called_once_with('R1', 'BUILD')
n.update_router_status.reset_mock()
self.fake_driver.synchronize_state.assert_called_with(
self.ctx,
state='up',
)
self.fake_driver.synchronize_state.reset_mock()
# Configure the router and make sure state is synchronized as ACTIVE
with mock.patch.object(self.instance_mgr,
@ -145,99 +155,86 @@ class TestInstanceManager(unittest.TestCase):
self.instance_mgr.last_boot = datetime.utcnow()
self.instance_mgr.configure(self.ctx)
self.instance_mgr.update_state(self.ctx)
n.update_router_status.assert_called_once_with('R1', 'ACTIVE')
n.update_router_status.reset_mock()
self.fake_driver.synchronize_state.assert_called_with(
self.ctx,
state='configured',
)
self.fake_driver.synchronize_state.reset_mock()
@mock.patch('time.sleep', lambda *a: None)
@mock.patch('akanda.rug.instance_manager.router_api')
@mock.patch('akanda.rug.api.configuration.build_config')
def test_router_status_caching(self, config, router_api):
def test_router_status_caching(self):
self.update_state_p.stop()
router_api.is_alive.return_value = False
rtr = mock.sentinel.router
rtr.id = 'R1'
rtr.management_port = mock.Mock()
rtr.external_port = mock.Mock()
self.ctx.neutron.get_router_detail.return_value = rtr
n = self.neutron
self.fake_driver.is_alive.return_value = False
# Router state should start down
self.instance_mgr.update_state(self.ctx)
n.update_router_status.assert_called_once_with('R1', 'DOWN')
n.update_router_status.reset_mock()
# Router state should not be updated in neutron if it didn't change
self.instance_mgr.update_state(self.ctx)
self.assertEqual(n.update_router_status.call_count, 0)
self.fake_driver.synchronize_state.assert_called_once_with(
self.ctx, state='down')
@mock.patch('time.sleep')
@mock.patch('akanda.rug.instance_manager.router_api')
def test_boot_timeout_still_booting(self, router_api, sleep):
def test_boot_timeout_still_booting(self, sleep):
now = datetime.utcnow()
self.INSTANCE_INFO.last_boot = now
self.instance_mgr.last_boot = now
self.update_state_p.stop()
router_api.is_alive.return_value = False
self.fake_driver.is_alive.return_value = False
self.assertEqual(
self.instance_mgr.update_state(self.ctx),
instance_manager.BOOTING
states.BOOTING
)
router_api.is_alive.assert_has_calls([
mock.call(self.INSTANCE_INFO.management_address, 5000),
mock.call(self.INSTANCE_INFO.management_address, 5000),
mock.call(self.INSTANCE_INFO.management_address, 5000),
self.fake_driver.is_alive.assert_has_calls([
mock.call(self.INSTANCE_INFO.management_address),
mock.call(self.INSTANCE_INFO.management_address),
mock.call(self.INSTANCE_INFO.management_address),
])
@mock.patch('time.sleep')
@mock.patch('akanda.rug.instance_manager.router_api')
def test_boot_timeout_error(self, router_api, sleep):
self.instance_mgr.state = instance_manager.ERROR
def test_boot_timeout_error(self, sleep):
self.instance_mgr.state = states.ERROR
self.instance_mgr.last_boot = datetime.utcnow()
self.update_state_p.stop()
router_api.is_alive.return_value = False
self.fake_driver.is_alive.return_value = False
self.assertEqual(
self.instance_mgr.update_state(self.ctx),
instance_manager.ERROR,
states.ERROR,
)
router_api.is_alive.assert_has_calls([
mock.call(self.INSTANCE_INFO.management_address, 5000),
mock.call(self.INSTANCE_INFO.management_address, 5000),
mock.call(self.INSTANCE_INFO.management_address, 5000),
self.fake_driver.is_alive.assert_has_calls([
mock.call(self.INSTANCE_INFO.management_address),
mock.call(self.INSTANCE_INFO.management_address),
mock.call(self.INSTANCE_INFO.management_address),
])
@mock.patch('time.sleep')
@mock.patch('akanda.rug.instance_manager.router_api')
def test_boot_timeout_error_no_last_boot(self, router_api, sleep):
self.instance_mgr.state = instance_manager.ERROR
def test_boot_timeout_error_no_last_boot(self, sleep):
self.instance_mgr.state = states.ERROR
self.instance_mgr.last_boot = None
self.update_state_p.stop()
router_api.is_alive.return_value = False
self.fake_driver.is_alive.return_value = False
self.assertEqual(
self.instance_mgr.update_state(self.ctx),
instance_manager.ERROR,
states.ERROR,
)
router_api.is_alive.assert_has_calls([
mock.call(self.INSTANCE_INFO.management_address, 5000),
mock.call(self.INSTANCE_INFO.management_address, 5000),
mock.call(self.INSTANCE_INFO.management_address, 5000),
self.fake_driver.is_alive.assert_has_calls([
mock.call(self.INSTANCE_INFO.management_address),
mock.call(self.INSTANCE_INFO.management_address),
mock.call(self.INSTANCE_INFO.management_address),
])
@mock.patch('time.sleep')
@mock.patch('akanda.rug.instance_manager.router_api')
def test_boot_timeout(self, router_api, sleep):
def test_boot_timeout(self, sleep):
self.instance_mgr.last_boot = datetime.utcnow() - timedelta(minutes=5)
self.update_state_p.stop()
router_api.is_alive.return_value = False
self.fake_driver.is_alive.return_value = False
self.assertEqual(self.instance_mgr.update_state(self.ctx),
instance_manager.DOWN)
router_api.is_alive.assert_has_calls([
mock.call(self.INSTANCE_INFO.management_address, 5000),
mock.call(self.INSTANCE_INFO.management_address, 5000),
mock.call(self.INSTANCE_INFO.management_address, 5000),
states.DOWN)
self.fake_driver.is_alive.assert_has_calls([
mock.call(self.INSTANCE_INFO.management_address),
mock.call(self.INSTANCE_INFO.management_address),
mock.call(self.INSTANCE_INFO.management_address),
])
self.instance_mgr.log.info.assert_called_once_with(
mock.ANY,
@ -245,53 +242,45 @@ class TestInstanceManager(unittest.TestCase):
)
@mock.patch('time.sleep')
@mock.patch('akanda.rug.instance_manager.router_api')
def test_update_state_is_down(self, router_api, sleep):
def test_update_state_is_down(self, sleep):
self.update_state_p.stop()
router_api.is_alive.return_value = False
self.fake_driver.is_alive.return_value = False
self.assertEqual(self.instance_mgr.update_state(self.ctx),
instance_manager.DOWN)
router_api.is_alive.assert_has_calls([
mock.call(self.INSTANCE_INFO.management_address, 5000),
mock.call(self.INSTANCE_INFO.management_address, 5000),
mock.call(self.INSTANCE_INFO.management_address, 5000),
states.DOWN)
self.fake_driver.is_alive.assert_has_calls([
mock.call(self.INSTANCE_INFO.management_address),
mock.call(self.INSTANCE_INFO.management_address),
mock.call(self.INSTANCE_INFO.management_address),
])
@mock.patch('time.sleep')
@mock.patch('akanda.rug.instance_manager.router_api')
def test_update_state_retry_delay(self, router_api, sleep):
def test_update_state_retry_delay(self, sleep):
self.update_state_p.stop()
router_api.is_alive.side_effect = [False, False, True]
self.fake_driver.is_alive.side_effect = [False, False, True]
max_retries = 5
self.conf.max_retries = max_retries
self.instance_mgr.update_state(self.ctx, silent=False)
self.assertEqual(sleep.call_count, 2)
self.log.debug.assert_has_calls([
mock.call('Alive check failed. Attempt %d of %d', 0, max_retries),
mock.call('Alive check failed. Attempt %d of %d', 1, max_retries)
])
@mock.patch('time.sleep')
def test_boot_success(self, sleep):
self.next_state = instance_manager.UP
rtr = mock.sentinel.router
self.ctx.neutron.get_router_detail.return_value = rtr
rtr.id = 'ROUTER1'
rtr.management_port = None
rtr.external_port = None
rtr.ports = mock.MagicMock()
rtr.ports.__iter__.return_value = []
self.instance_mgr.boot(self.ctx, 'GLANCE-IMAGE-123')
self.assertEqual(self.instance_mgr.state, instance_manager.BOOTING)
self.next_state = states.UP
self.instance_mgr.boot(self.ctx)
self.assertEqual(self.instance_mgr.state, states.BOOTING)
self.ctx.nova_client.boot_instance.assert_called_once_with(
self.INSTANCE_INFO, rtr.id, 'GLANCE-IMAGE-123', mock.ANY)
self.INSTANCE_INFO,
self.fake_driver.name,
self.fake_driver.image_uuid,
self.fake_driver.flavor,
'fake_ports_callback')
self.assertEqual(1, self.instance_mgr.attempts)
@mock.patch('time.sleep')
def test_boot_instance_deleted(self, sleep):
self.ctx.nova_client.boot_instance.return_value = None
self.instance_mgr.boot(self.ctx, 'GLANCE-IMAGE-123')
self.instance_mgr.boot(self.ctx)
# a deleted VM should reset the vm mgr state and not as a failed
# attempt
self.assertEqual(self.instance_mgr.attempts, 0)
@ -299,40 +288,33 @@ class TestInstanceManager(unittest.TestCase):
@mock.patch('time.sleep')
def test_boot_fail(self, sleep):
self.next_state = instance_manager.DOWN
rtr = mock.sentinel.router
self.ctx.neutron.get_router_detail.return_value = rtr
rtr.id = 'ROUTER1'
rtr.management_port = None
rtr.external_port = None
rtr.ports = mock.MagicMock()
rtr.ports.__iter__.return_value = []
self.instance_mgr.boot(self.ctx, 'GLANCE-IMAGE-123')
self.assertEqual(self.instance_mgr.state, instance_manager.BOOTING)
self.next_state = states.DOWN
self.instance_mgr.boot(self.ctx)
self.assertEqual(self.instance_mgr.state, states.BOOTING)
self.ctx.nova_client.boot_instance.assert_called_once_with(
self.INSTANCE_INFO, rtr.id, 'GLANCE-IMAGE-123', mock.ANY)
self.INSTANCE_INFO,
self.fake_driver.name,
self.fake_driver.image_uuid,
self.fake_driver.flavor,
'fake_ports_callback')
self.assertEqual(1, self.instance_mgr.attempts)
@mock.patch('time.sleep')
def test_boot_exception(self, sleep):
rtr = mock.sentinel.router
self.ctx.neutron.get_router_detail.return_value = rtr
rtr.id = 'ROUTER1'
rtr.management_port = None
rtr.external_port = None
rtr.ports = mock.MagicMock()
rtr.ports.__iter__.return_value = []
self.ctx.nova_client.boot_instance.side_effect = RuntimeError
self.instance_mgr.boot(self.ctx, 'GLANCE-IMAGE-123')
self.assertEqual(self.instance_mgr.state, instance_manager.DOWN)
self.instance_mgr.boot(self.ctx)
self.assertEqual(self.instance_mgr.state, states.DOWN)
self.ctx.nova_client.boot_instance.assert_called_once_with(
self.INSTANCE_INFO, rtr.id, 'GLANCE-IMAGE-123', mock.ANY)
self.INSTANCE_INFO,
self.fake_driver.name,
self.fake_driver.image_uuid,
self.fake_driver.flavor,
'fake_ports_callback')
self.assertEqual(1, self.instance_mgr.attempts)
@mock.patch('time.sleep')
def test_boot_with_port_cleanup(self, sleep):
self.next_state = instance_manager.UP
self.next_state = states.UP
management_port = mock.Mock(id='mgmt', device_id='INSTANCE1')
external_port = mock.Mock(id='ext', device_id='INSTANCE1')
@ -349,14 +331,14 @@ class TestInstanceManager(unittest.TestCase):
rtr.ports = mock.MagicMock()
rtr.ports.__iter__.return_value = [management_port, external_port,
internal_port]
self.instance_mgr.boot(self.ctx, 'GLANCE-IMAGE-123')
self.assertEqual(self.instance_mgr.state, instance_manager.BOOTING)
self.instance_mgr.boot(self.ctx)
self.assertEqual(self.instance_mgr.state, states.BOOTING)
self.ctx.nova_client.boot_instance.assert_called_once_with(
self.INSTANCE_INFO,
rtr.id,
'GLANCE-IMAGE-123',
mock.ANY, # TODO(adam_g): actually test make_vrrp_ports()
)
self.fake_driver.name,
self.fake_driver.image_uuid,
self.fake_driver.flavor,
'fake_ports_callback')
def test_boot_check_up(self):
with mock.patch.object(
@ -367,17 +349,17 @@ class TestInstanceManager(unittest.TestCase):
instance_manager.InstanceManager,
'configure'
) as configure:
update_state.return_value = instance_manager.UP
update_state.return_value = states.UP
configure.side_effect = lambda *a, **kw: setattr(
self.instance_mgr,
'state',
instance_manager.CONFIGURED
states.CONFIGURED
)
assert self.instance_mgr.check_boot(self.ctx) is True
update_state.assert_called_once_with(self.ctx, silent=True)
configure.assert_called_once_with(
self.ctx,
instance_manager.BOOTING,
states.BOOTING,
attempts=1
)
@ -390,17 +372,17 @@ class TestInstanceManager(unittest.TestCase):
instance_manager.InstanceManager,
'configure'
) as configure:
update_state.return_value = instance_manager.CONFIGURED
update_state.return_value = states.CONFIGURED
configure.side_effect = lambda *a, **kw: setattr(
self.instance_mgr,
'state',
instance_manager.CONFIGURED
states.CONFIGURED
)
assert self.instance_mgr.check_boot(self.ctx) is True
update_state.assert_called_once_with(self.ctx, silent=True)
configure.assert_called_once_with(
self.ctx,
instance_manager.BOOTING,
states.BOOTING,
attempts=1
)
@ -409,7 +391,7 @@ class TestInstanceManager(unittest.TestCase):
instance_manager.InstanceManager,
'update_state'
) as update_state:
update_state.return_value = instance_manager.BOOTING
update_state.return_value = states.BOOTING
assert self.instance_mgr.check_boot(self.ctx) is False
update_state.assert_called_once_with(self.ctx, silent=True)
@ -422,130 +404,109 @@ class TestInstanceManager(unittest.TestCase):
instance_manager.InstanceManager,
'configure'
) as configure:
update_state.return_value = instance_manager.CONFIGURED
update_state.return_value = states.CONFIGURED
configure.side_effect = lambda *a, **kw: setattr(
self.instance_mgr,
'state',
instance_manager.BOOTING
states.BOOTING
)
assert self.instance_mgr.check_boot(self.ctx) is False
update_state.assert_called_once_with(self.ctx, silent=True)
configure.assert_called_once_with(
self.ctx,
instance_manager.BOOTING,
states.BOOTING,
attempts=1
)
@mock.patch('time.sleep')
def test_stop_success(self, sleep):
self.instance_mgr.state = instance_manager.UP
self.instance_mgr.state = states.UP
self.ctx.nova_client.get_instance_by_id.return_value = None
self.instance_mgr.stop(self.ctx)
self.ctx.nova_client.destroy_instance.assert_called_once_with(
self.INSTANCE_INFO
)
self.assertEqual(self.instance_mgr.state, instance_manager.DOWN)
self.assertEqual(self.instance_mgr.state, states.DOWN)
@mock.patch('time.sleep')
def test_stop_fail(self, sleep):
self.instance_mgr.state = instance_manager.UP
self.instance_mgr.state = states.UP
self.ctx.nova_client.get_router_instance_status.return_value = 'UP'
self.instance_mgr.stop(self.ctx)
self.assertEqual(self.instance_mgr.state, instance_manager.UP)
self.assertEqual(self.instance_mgr.state, states.UP)
self.ctx.nova_client.destroy_instance.assert_called_once_with(
self.INSTANCE_INFO
)
self.log.error.assert_called_once_with(mock.ANY, 1)
@mock.patch('time.sleep')
def test_stop_router_already_deleted_from_neutron(self, sleep):
self.instance_mgr.state = instance_manager.GONE
self.instance_mgr.state = states.GONE
self.instance_mgr.stop(self.ctx)
self.ctx.nova_client.destroy_instance.assert_called_once_with(
self.INSTANCE_INFO)
self.assertEqual(self.instance_mgr.state, instance_manager.GONE)
@mock.patch('akanda.rug.instance_manager.router_api')
@mock.patch('akanda.rug.api.configuration.build_config')
def test_configure_success(self, config, router_api):
rtr = mock.sentinel.router
self.ctx.neutron.get_router_detail.return_value = rtr
config.return_value = 'fake_config'
router_api.get_interfaces.return_value = []
self.assertEqual(self.instance_mgr.state, states.GONE)
def test_configure_success(self):
self.fake_driver.build_config.return_value = 'fake_config'
with mock.patch.object(self.instance_mgr,
'_verify_interfaces') as verify:
verify.return_value = True
self.instance_mgr.configure(self.ctx)
verify.assert_called_once_with(rtr, [])
config.assert_called_once_with(
self.ctx.neutron, rtr, fake_mgt_port, {})
router_api.update_config.assert_called_once_with(
self.INSTANCE_INFO.management_address, 5000, 'fake_config',
verify.assert_called_once_with(
self.fake_driver.ports,
self.fake_driver.get_interfaces.return_value)
self.fake_driver.build_config.assert_called_once_with(
self.ctx,
self.INSTANCE_INFO.management_port,
{'ext-net': 'ge1', 'int-net': 'ge2', 'mgt-net': 'ge0'})
self.fake_driver.update_config.assert_called_once_with(
self.INSTANCE_INFO.management_address, 'fake_config',
)
self.assertEqual(self.instance_mgr.state,
instance_manager.CONFIGURED)
@mock.patch('akanda.rug.instance_manager.router_api')
def test_configure_mismatched_interfaces(self, router_api):
rtr = mock.sentinel.router
self.neutron.get_router_detail.return_value = rtr
states.CONFIGURED)
def test_configure_mismatched_interfaces(self):
with mock.patch.object(self.instance_mgr,
'_verify_interfaces') as verify:
verify.return_value = False
self.instance_mgr.configure(self.ctx)
interfaces = router_api.get_interfaces.return_value
verify.assert_called_once_with(
self.fake_driver.ports,
self.fake_driver.get_interfaces.return_value)
verify.assert_called_once_with(rtr, interfaces)
self.assertFalse(router_api.update_config.called)
self.assertEqual(self.instance_mgr.state, instance_manager.REPLUG)
self.assertFalse(self.fake_driver.update_config.called)
self.assertEqual(self.instance_mgr.state, states.REPLUG)
@mock.patch('time.sleep')
@mock.patch('akanda.rug.instance_manager.router_api')
@mock.patch('akanda.rug.api.configuration.build_config')
def test_configure_failure(self, config, router_api, sleep):
rtr = {'id': 'the_id'}
def test_configure_failure(self, sleep):
self.neutron.get_router_detail.return_value = rtr
router_api.update_config.side_effect = Exception
config.return_value = 'fake_config'
self.fake_driver.update_config.side_effect = Exception
self.fake_driver.build_config.return_value = 'fake_config'
with mock.patch.object(self.instance_mgr,
'_verify_interfaces') as verify:
verify.return_value = True
self.instance_mgr.configure(self.ctx)
interfaces = router_api.get_interfaces.return_value
verify.assert_called_once_with(rtr, interfaces)
interfaces = self.fake_driver.get_interfaces.return_value
verify.assert_called_once_with(
self.fake_driver.ports, interfaces)
config.assert_called_once_with(
self.neutron, rtr, fake_mgt_port, {})
expected_calls = [
mock.call(self.INSTANCE_INFO.management_address, 5000,
mock.call(self.INSTANCE_INFO.management_address,
'fake_config')
for i in range(0, 2)]
router_api.update_config.assert_has_calls(expected_calls)
self.assertEqual(self.instance_mgr.state, instance_manager.RESTART)
self.fake_driver.update_config.assert_has_calls(expected_calls)
self.assertEqual(self.instance_mgr.state, states.RESTART)
@mock.patch('time.sleep', lambda *a: None)
@mock.patch('akanda.rug.instance_manager.router_api')
def test_replug_add_new_port_success(self, router_api):
self.instance_mgr.state = instance_manager.REPLUG
def test_replug_add_new_port_success(self):
self.instance_mgr.state = states.REPLUG
fake_router = mock.Mock()
fake_router.id = 'fake_router_id'
fake_router.ports = [fake_ext_port, fake_int_port, fake_add_port]
self.neutron.get_router_detail.return_value = fake_router
self.instance_mgr.router_obj = fake_router
router_api.get_interfaces.return_value = [
self.fake_driver.get_interfaces.return_value = [
{'lladdr': fake_mgt_port.mac_address},
{'lladdr': fake_ext_port.mac_address},
{'lladdr': fake_int_port.mac_address}
@ -555,7 +516,8 @@ class TestInstanceManager(unittest.TestCase):
fake_instance = mock.MagicMock()
self.ctx.nova_client.get_instance_by_id = mock.Mock(
return_value=fake_instance)
fake_new_port = mock.Mock(id='fake_new_port_id')
fake_new_port = fake_add_port
self.fake_driver.ports.append(fake_new_port)
self.ctx.neutron.create_vrrp_port.return_value = fake_new_port
with mock.patch.object(self.instance_mgr,
@ -564,26 +526,19 @@ class TestInstanceManager(unittest.TestCase):
self.instance_mgr.replug(self.ctx)
self.ctx.neutron.create_vrrp_port.assert_called_with(
fake_router.id, 'additional-net'
self.fake_driver.id, 'additional-net'
)
self.assertEqual(self.instance_mgr.state, instance_manager.REPLUG)
self.assertEqual(self.instance_mgr.state, states.REPLUG)
fake_instance.interface_attach.assert_called_once_with(
fake_new_port.id, None, None
)
self.assertIn(fake_new_port, self.INSTANCE_INFO.ports)
@mock.patch('time.sleep', lambda *a: None)
@mock.patch('akanda.rug.instance_manager.router_api')
def test_replug_add_new_port_failure(self, router_api):
self.instance_mgr.state = instance_manager.REPLUG
def test_replug_add_new_port_failure(self):
self.instance_mgr.state = states.REPLUG
fake_router = mock.Mock()
fake_router.id = 'fake_router_id'
fake_router.ports = [fake_ext_port, fake_int_port, fake_add_port]
self.neutron.get_router_detail.return_value = fake_router
self.instance_mgr.router_obj = fake_router
router_api.get_interfaces.return_value = [
self.fake_driver.get_interfaces.return_value = [
{'lladdr': fake_mgt_port.mac_address},
{'lladdr': fake_ext_port.mac_address},
{'lladdr': fake_int_port.mac_address}
@ -594,36 +549,30 @@ class TestInstanceManager(unittest.TestCase):
self.ctx.nova_client.get_instance_by_id = mock.Mock(
return_value=fake_instance)
fake_new_port = mock.Mock(id='fake_new_port_id')
fake_new_port = fake_add_port
self.fake_driver.ports.append(fake_new_port)
self.ctx.neutron.create_vrrp_port.return_value = fake_new_port
with mock.patch.object(self.instance_mgr,
'_verify_interfaces') as verify:
verify.return_value = False # The hotplug didn't work!
self.instance_mgr.replug(self.ctx)
self.assertEqual(self.instance_mgr.state, instance_manager.RESTART)
self.assertEqual(self.instance_mgr.state, states.RESTART)
fake_instance.interface_attach.assert_called_once_with(
fake_new_port.id, None, None
)
@mock.patch('time.sleep', lambda *a: None)
@mock.patch('akanda.rug.instance_manager.router_api')
def test_replug_remove_port_success(self, router_api):
self.instance_mgr.state = instance_manager.REPLUG
def test_replug_remove_port_success(self):
self.instance_mgr.state = states.REPLUG
fake_router = mock.Mock()
fake_router.id = 'fake_router_id'
# Router lacks the fake_ext_port, it will be unplugged
fake_router.ports = [fake_mgt_port, fake_int_port]
self.neutron.get_router_detail.return_value = fake_router
self.instance_mgr.router_obj = fake_router
router_api.get_interfaces.return_value = [
# Resource lacks the fake_ext_port, it will be unplugged
self.fake_driver.ports = [fake_mgt_port, fake_int_port]
self.fake_driver.get_interfaces.return_value = [
{'lladdr': fake_mgt_port.mac_address},
{'lladdr': fake_int_port.mac_address},
{'lladdr': fake_ext_port.mac_address},
{'lladdr': fake_int_port.mac_address}
]
self.conf.hotplug_timeout = 5
@ -635,26 +584,19 @@ class TestInstanceManager(unittest.TestCase):
'_verify_interfaces') as verify:
verify.return_value = True # the unplug worked!
self.instance_mgr.replug(self.ctx)
self.assertEqual(self.instance_mgr.state, instance_manager.REPLUG)
self.assertEqual(self.instance_mgr.state, states.REPLUG)
fake_instance.interface_detach.assert_called_once_with(
fake_ext_port.id
)
self.assertNotIn(fake_ext_port, self.INSTANCE_INFO.ports)
@mock.patch('time.sleep', lambda *a: None)
@mock.patch('akanda.rug.instance_manager.router_api')
def test_replug_remove_port_failure(self, router_api):
self.instance_mgr.state = instance_manager.REPLUG
fake_router = mock.Mock()
fake_router.id = 'fake_router_id'
def test_replug_remove_port_failure(self):
self.instance_mgr.state = states.REPLUG
# Router lacks the fake_ext_port, it will be unplugged
fake_router.ports = [fake_mgt_port, fake_int_port]
self.neutron.get_router_detail.return_value = fake_router
self.instance_mgr.router_obj = fake_router
router_api.get_interfaces.return_value = [
self.fake_driver.ports = [fake_mgt_port, fake_int_port]
self.fake_driver.get_interfaces.return_value = [
{'lladdr': fake_mgt_port.mac_address},
{'lladdr': fake_ext_port.mac_address},
{'lladdr': fake_int_port.mac_address}
@ -670,36 +612,24 @@ class TestInstanceManager(unittest.TestCase):
verify.return_value = False # the unplug failed!
self.instance_mgr.replug(self.ctx)
self.assertEquals(self.instance_mgr.state,
instance_manager.RESTART)
states.RESTART)
fake_instance.interface_detach.assert_called_once_with(
fake_ext_port.id
)
def test_verify_interfaces(self):
rtr = mock.Mock()
rtr.management_port.mac_address = fake_mgt_port.mac_address
rtr.external_port.mac_address = fake_ext_port.mac_address
p = mock.Mock()
p.mac_address = fake_int_port.mac_address
rtr.internal_ports = [p]
rtr.ports = [p, rtr.management_port, rtr.external_port]
self.fake_driver.ports = [fake_mgt_port, fake_ext_port, fake_int_port]
interfaces = [
{'lladdr': fake_mgt_port.mac_address},
{'lladdr': fake_ext_port.mac_address},
{'lladdr': fake_int_port.mac_address}
]
self.assertTrue(self.instance_mgr._verify_interfaces(rtr, interfaces))
self.assertTrue(self.instance_mgr._verify_interfaces(
self.fake_driver.ports, interfaces))
def test_verify_interfaces_with_cleared_gateway(self):
rtr = mock.Mock()
rtr.management_port = mock.MagicMock(spec=[])
rtr.external_port.mac_address = 'd:c:b:a'
p = mock.Mock()
p.mac_address = 'a:a:a:a'
rtr.internal_ports = [p]
rtr.ports = [p, rtr.management_port, rtr.external_port]
self.fake_driver.ports = [fake_mgt_port, fake_ext_port, fake_int_port]
interfaces = [
{'lladdr': 'a:b:c:d'},
@ -707,59 +637,25 @@ class TestInstanceManager(unittest.TestCase):
{'lladdr': 'a:a:a:a'}
]
self.assertFalse(self.instance_mgr._verify_interfaces(rtr, interfaces))
def test_ensure_provider_ports(self):
rtr = mock.Mock()
rtr.external_port = None
self.assertEqual(self.instance_mgr._ensure_provider_ports(rtr,
self.ctx),
rtr)
self.neutron.create_router_external_port.assert_called_once_with(rtr)
def test_set_error_when_gone(self):
self.instance_mgr.state = instance_manager.GONE
rtr = mock.sentinel.router
rtr.id = 'R1'
self.ctx.neutron.get_router_detail.return_value = rtr
self.instance_mgr.set_error(self.ctx)
self.neutron.update_router_status.assert_called_once_with('R1',
'ERROR')
self.assertEqual(instance_manager.GONE, self.instance_mgr.state)
self.assertFalse(self.instance_mgr._verify_interfaces(
self.fake_driver.ports, interfaces))
def test_set_error_when_booting(self):
self.instance_mgr.state = instance_manager.BOOTING
rtr = mock.sentinel.router
rtr.id = 'R1'
self.ctx.neutron.get_router_detail.return_value = rtr
self.instance_mgr.state = states.BOOTING
self.instance_mgr.set_error(self.ctx)
self.neutron.update_router_status.assert_called_once_with('R1',
'ERROR')
self.assertEqual(instance_manager.ERROR, self.instance_mgr.state)
self.fake_driver.synchronize_state.assert_called_once_with(
self.ctx, state='error')
self.assertEqual(states.ERROR, self.instance_mgr.state)
def test_clear_error_when_gone(self):
self.instance_mgr.state = instance_manager.GONE
rtr = mock.sentinel.router
rtr.id = 'R1'
self.ctx.neutron.get_router_detail.return_value = rtr
self.instance_mgr.state = states.GONE
self.instance_mgr.clear_error(self.ctx)
self.neutron.update_router_status.assert_called_once_with('R1',
'ERROR')
self.assertEqual(instance_manager.GONE, self.instance_mgr.state)
def test_set_error_when_error(self):
self.instance_mgr.state = instance_manager.ERROR
rtr = mock.sentinel.router
rtr.id = 'R1'
self.ctx.neutron.get_router_detail.return_value = rtr
self.instance_mgr.clear_error(self.ctx)
self.neutron.update_router_status.assert_called_once_with('R1',
'DOWN')
self.assertEqual(instance_manager.DOWN, self.instance_mgr.state)
self.fake_driver.synchronize_state(self.ctx, 'error')
self.assertEqual(states.DOWN, self.instance_mgr.state)
@mock.patch('time.sleep')
def test_boot_success_after_error(self, sleep):
self.next_state = instance_manager.UP
self.next_state = states.UP
rtr = mock.sentinel.router
self.ctx.neutron.get_router_detail.return_value = rtr
rtr.id = 'ROUTER1'
@ -768,17 +664,22 @@ class TestInstanceManager(unittest.TestCase):
rtr.ports = mock.MagicMock()
rtr.ports.__iter__.return_value = []
self.instance_mgr.set_error(self.ctx)
self.instance_mgr.boot(self.ctx, 'GLANCE-IMAGE-123')
self.assertEqual(self.instance_mgr.state, instance_manager.BOOTING)
self.instance_mgr.boot(self.ctx)
self.assertEqual(self.instance_mgr.state, states.BOOTING)
self.ctx.nova_client.boot_instance.assert_called_once_with(
self.INSTANCE_INFO, rtr.id, 'GLANCE-IMAGE-123', mock.ANY)
self.INSTANCE_INFO,
self.fake_driver.name,
self.fake_driver.image_uuid,
self.fake_driver.flavor,
'fake_ports_callback')
def test_error_cooldown(self):
self.conf.error_state_cooldown = 30
self.assertIsNone(self.instance_mgr.last_error)
self.assertFalse(self.instance_mgr.error_cooldown)
self.instance_mgr.state = instance_manager.ERROR
self.instance_mgr.state = states.ERROR
self.instance_mgr.last_error = datetime.utcnow() - timedelta(seconds=1)
self.assertTrue(self.instance_mgr.error_cooldown)
@ -801,44 +702,3 @@ class TestBootAttemptCounter(unittest.TestCase):
self.c._attempts = 2
self.c.reset()
self.assertEqual(0, self.c._attempts)
class TestSynchronizeRouterStatus(unittest.TestCase):
def setUp(self):
self.test_instance_manager = mock.Mock(spec=('router_obj',
'_last_synced_status',
'state'))
self.test_context = mock.Mock()
def test_router_is_deleted(self):
self.test_instance_manager.router_obj = None
v = instance_manager.synchronize_router_status(
lambda instance_manager_inst, ctx, silent: 1)
self.assertEqual(v(self.test_instance_manager, {}), 1)
def test_router_status_changed(self):
self.test_instance_manager.router_obj = mock.Mock(id='ABC123')
self.test_instance_manager._last_synced_status = neutron.STATUS_ACTIVE
self.test_instance_manager.state = instance_manager.DOWN
v = instance_manager.synchronize_router_status(
lambda instance_manager_inst, ctx, silent: 1)
self.assertEqual(v(self.test_instance_manager, self.test_context), 1)
self.test_context.neutron.update_router_status.\
assert_called_once_with(
'ABC123',
neutron.STATUS_DOWN)
self.assertEqual(self.test_instance_manager._last_synced_status,
neutron.STATUS_DOWN)
def test_router_status_same(self):
self.test_instance_manager.router_obj = mock.Mock(id='ABC123')
self.test_instance_manager._last_synced_status = neutron.STATUS_ACTIVE
self.test_instance_manager.state = instance_manager.CONFIGURED
v = instance_manager.synchronize_router_status(
lambda instance_manager_inst, ctx, silent: 1)
self.assertEqual(v(self.test_instance_manager, self.test_context), 1)
self.assertEqual(
self.test_context.neutron.update_router_status.call_count, 0)
self.assertEqual(self.test_instance_manager._last_synced_status,
neutron.STATUS_ACTIVE)

View File

@ -152,7 +152,7 @@ class TestGetCRUD(unittest.TestCase):
method='router_deleted',
router_id='fake_router_id')
self.assertEqual(event.DELETE, e.crud)
self.assertEqual(e.router_id, 'fake_router_id')
self.assertEqual(e.resource.id, 'fake_router_id')
def test_notification_port(self):
e = self._get_event_notification('port.create.start')
@ -209,7 +209,8 @@ class TestGetCRUD(unittest.TestCase):
}
}
e = self._get_event_notification('router.create.end', payload)
self.assertEqual(e.router_id, u'f95fb32d-0072-4675-b4bd-61d829a46aca')
self.assertEqual(e.resource.id,
u'f95fb32d-0072-4675-b4bd-61d829a46aca')
def test_interface_create_and_delete(self):
for action in ('create', 'delete'):
@ -227,7 +228,7 @@ class TestGetCRUD(unittest.TestCase):
self.assertEqual(event.UPDATE, e.crud)
self.assertEqual(
u'58868681-4a58-4f69-8dc0-b20955e7923f',
e.router_id
e.resource.id
)
def test_notification_akanda(self):

View File

@ -16,116 +16,52 @@
import mock
import unittest2 as unittest
from neutronclient.common import exceptions as q_exceptions
from akanda.rug.test.unit import base
from akanda.rug.test.unit import fakes
from akanda.rug import populate
from akanda.rug import event
from akanda.rug.event import Resource
class TestPrePopulateWorkers(unittest.TestCase):
class FakePopulateDriver(object):
pre_populate_hook = mock.Mock()
@mock.patch('akanda.rug.api.neutron.Neutron')
def test_retry_loop(self, mocked_neutron_api):
neutron_client = mock.Mock()
returned_value = [Exception, []]
neutron_client.get_routers.side_effect = returned_value
mocked_neutron_api.return_value = neutron_client
class TestPrePopulateWorkers(base.RugTestBase):
def setUp(self):
super(TestPrePopulateWorkers, self).setUp()
sched = mock.Mock()
populate._pre_populate_workers(sched)
self.assertEqual(
neutron_client.get_routers.call_args_list,
[
mock.call(detailed=False)
for value in xrange(len(returned_value))
]
)
self.assertEqual(
neutron_client.get_routers.call_count,
len(returned_value)
)
def _exit_loop_bad_auth(self, mocked_neutron_api, log, exc):
neutron_client = mock.Mock()
neutron_client.get_routers.side_effect = exc
mocked_neutron_api.return_value = neutron_client
sched = mock.Mock()
populate._pre_populate_workers(sched)
log.warning.assert_called_once_with(
'PrePopulateWorkers thread failed: %s',
mock.ANY
)
@mock.patch('akanda.rug.populate.LOG')
@mock.patch('akanda.rug.api.neutron.Neutron')
def test_exit_loop_unauthorized(self, mocked_neutron_api, log):
exc = q_exceptions.Unauthorized
self._exit_loop_bad_auth(mocked_neutron_api, log, exc)
@mock.patch('akanda.rug.populate.LOG')
@mock.patch('akanda.rug.api.neutron.Neutron')
def test_exit_loop_forbidden(self, mocked_neutron_api, log):
exc = q_exceptions.Forbidden
self._exit_loop_bad_auth(mocked_neutron_api, log, exc)
@mock.patch('akanda.rug.populate.LOG.warning')
@mock.patch('akanda.rug.populate.LOG.debug')
@mock.patch('akanda.rug.api.neutron.Neutron')
def test_retry_loop_logging(
self, mocked_neutron_api, log_debug, log_warning):
neutron_client = mock.Mock()
message = mock.Mock(tenant_id='1', router_id='2')
returned_value = [
q_exceptions.NeutronClientException,
[message]
@mock.patch('akanda.rug.drivers.enabled_drivers')
def test_pre_populate_with_resources(self, enabled_drivers):
fake_scheduler = mock.Mock()
fake_scheduler.handle_message = mock.Mock()
fake_driver = fakes.fake_driver()
fake_resources = [
Resource(
id='fake_resource_%s' % i,
tenant_id='fake_tenant_%s' % i,
driver=fake_driver.RESOURCE_NAME,
) for i in range(2)
]
neutron_client.get_routers.side_effect = returned_value
fake_driver.pre_populate_hook.return_value = fake_resources
enabled_drivers.return_value = [fake_driver]
populate._pre_populate_workers(fake_scheduler)
for res in fake_resources:
e = event.Event(resource=res, crud=event.POLL, body={})
call = mock.call(res.tenant_id, e)
self.assertIn(call, fake_scheduler.handle_message.call_args_list)
mocked_neutron_api.return_value = neutron_client
sched = mock.Mock()
populate._pre_populate_workers(sched)
self.assertEqual(2, log_warning.call_count)
self.assertEqual(1, log_debug.call_count)
@mock.patch('akanda.rug.event.Event')
@mock.patch('akanda.rug.api.neutron.Neutron')
def test_scheduler_handle_message(self, mocked_neutron_api, event):
def message_to_router_args(message):
tmp = message.copy()
tmp['id'] = tmp.pop('router_id')
return tmp
neutron_client = mock.Mock()
message1 = {'tenant_id': '1', 'router_id': '2',
'body': {}, 'crud': 'poll'}
message2 = {'tenant_id': '3', 'router_id': '4',
'body': {}, 'crud': 'poll'}
return_value = [
mock.Mock(**message_to_router_args(message1)),
mock.Mock(**message_to_router_args(message2))
]
neutron_client.get_routers.return_value = return_value
sched = mock.Mock()
mocked_neutron_api.return_value = neutron_client
populate._pre_populate_workers(sched)
self.assertEqual(sched.handle_message.call_count, len(return_value))
expected = [
mock.call(message1['tenant_id'], mock.ANY),
mock.call(message2['tenant_id'], mock.ANY)
]
self.assertEqual(sched.handle_message.call_args_list, expected)
self.assertEqual(event.call_count, 2)
expected = [mock.call(**message1), mock.call(**message2)]
self.assertEqual(event.call_args_list, expected)
@mock.patch('akanda.rug.drivers.enabled_drivers')
def test_pre_populate_with_no_resources(self, enabled_drivers):
fake_scheduler = mock.Mock()
fake_scheduler.handle_message = mock.Mock()
fake_driver = fakes.fake_driver()
fake_driver.pre_populate_hook.return_value = []
enabled_drivers.return_value = [fake_driver]
populate._pre_populate_workers(fake_scheduler)
self.assertFalse(fake_scheduler.handle_message.called)
@mock.patch('threading.Thread')
def test_pre_populate_workers(self, thread):

View File

@ -15,7 +15,6 @@
# under the License.
import logging
from collections import deque
import mock
@ -24,31 +23,33 @@ import unittest2 as unittest
from akanda.rug import event
from akanda.rug import state
from akanda.rug import instance_manager
from akanda.rug.drivers import states
from akanda.rug.api.neutron import RouterGone
from akanda.rug.test.unit import fakes
class BaseTestStateCase(unittest.TestCase):
state_cls = state.State
def setUp(self):
self.ctx = mock.Mock() # worker context
log = logging.getLogger(__name__)
self.fake_driver = fakes.fake_driver()
instance_mgr_cls = \
mock.patch('akanda.rug.instance_manager.InstanceManager').start()
self.addCleanup(mock.patch.stopall)
self.instance = instance_mgr_cls.return_value
self.params = state.StateParams(
driver=self.fake_driver,
instance=self.instance,
log=log,
queue=deque(),
bandwidth_callback=mock.Mock(),
reboot_error_threshold=3,
router_image_uuid='GLANCE-IMAGE-123'
)
self.state = self.state_cls(self.params)
def _test_transition_hlpr(self, action, expected_class,
instance_state=state.instance_manager.UP):
instance_state=state.states.UP):
self.instance.state = instance_state
result = self.state.transition(action, self.ctx)
self.assertIsInstance(result, expected_class)
@ -135,7 +136,7 @@ class TestCalcActionState(BaseTestStateCase):
self._test_transition_hlpr(
event.UPDATE,
state.CheckBoot,
instance_manager.BOOTING
states.BOOTING
)
def test_transition_update_missing_router_not_down(self):
@ -144,7 +145,7 @@ class TestCalcActionState(BaseTestStateCase):
self._test_transition_hlpr(
event.UPDATE,
state.CheckBoot,
instance_manager.BOOTING
states.BOOTING
)
def test_transition_delete_missing_router_down(self):
@ -153,7 +154,7 @@ class TestCalcActionState(BaseTestStateCase):
self._test_transition_hlpr(
event.DELETE,
state.StopInstance,
instance_manager.DOWN
states.DOWN
)
def test_transition_delete_missing_router_not_down(self):
@ -162,13 +163,13 @@ class TestCalcActionState(BaseTestStateCase):
self._test_transition_hlpr(
event.DELETE,
state.StopInstance,
instance_manager.BOOTING
states.BOOTING
)
def test_transition_delete_down_instance(self):
self._test_transition_hlpr(event.DELETE,
state.StopInstance,
instance_manager.DOWN)
states.DOWN)
def test_transition_delete_up_instance(self):
self._test_transition_hlpr(event.DELETE, state.StopInstance)
@ -177,18 +178,18 @@ class TestCalcActionState(BaseTestStateCase):
for evt in [event.POLL, event.READ, event.UPDATE, event.CREATE]:
self._test_transition_hlpr(evt,
state.CreateInstance,
instance_manager.DOWN)
states.DOWN)
def test_transition_poll_up_instance(self):
self._test_transition_hlpr(event.POLL,
state.Alive,
instance_manager.UP)
states.UP)
def test_transition_poll_configured_instance(self):
self._test_transition_hlpr(
event.POLL,
state.Alive,
instance_manager.CONFIGURED
states.CONFIGURED
)
def test_transition_other_up_instance(self):
@ -200,7 +201,7 @@ class TestCalcActionState(BaseTestStateCase):
result = self._test_transition_hlpr(
event.UPDATE,
state.ClearError,
instance_manager.ERROR,
states.ERROR,
)
self.assertIsInstance(result._next_state, state.Alive)
@ -209,14 +210,14 @@ class TestCalcActionState(BaseTestStateCase):
self._test_transition_hlpr(
event.UPDATE,
state.CalcAction,
instance_manager.ERROR,
states.ERROR,
)
def test_transition_poll_error_instance(self):
self._test_transition_hlpr(
event.POLL,
state.CalcAction,
instance_manager.ERROR,
states.ERROR,
)
@ -234,34 +235,34 @@ class TestAliveState(BaseTestStateCase):
for evt in [event.POLL, event.READ, event.UPDATE, event.CREATE]:
self._test_transition_hlpr(evt,
state.CreateInstance,
instance_manager.DOWN)
states.DOWN)
def test_transition_poll_instance_configured(self):
self._test_transition_hlpr(
event.POLL,
state.CalcAction,
instance_manager.CONFIGURED
states.CONFIGURED
)
def test_transition_read_instance_configured(self):
self._test_transition_hlpr(
event.READ,
state.ReadStats,
instance_manager.CONFIGURED
states.CONFIGURED
)
def test_transition_up_to_configured(self):
self._test_transition_hlpr(
event.CREATE,
state.ConfigureInstance,
instance_manager.UP
states.UP
)
def test_transition_configured_instance_configured(self):
self._test_transition_hlpr(
event.CREATE,
state.ConfigureInstance,
instance_manager.CONFIGURED
states.CONFIGURED
)
@ -274,8 +275,7 @@ class TestCreateInstanceState(BaseTestStateCase):
self.state.execute('passthrough', self.ctx),
'passthrough'
)
self.instance.boot.assert_called_once_with(self.ctx,
'GLANCE-IMAGE-123')
self.instance.boot.assert_called_once_with(self.ctx)
def test_execute_too_many_attempts(self):
self.instance.attempts = self.params.reboot_error_threshold
@ -290,26 +290,26 @@ class TestCreateInstanceState(BaseTestStateCase):
self._test_transition_hlpr(
event.READ,
state.CheckBoot,
instance_manager.BOOTING
states.BOOTING
)
def test_transition_instance_up(self):
self._test_transition_hlpr(
event.READ,
state.CheckBoot,
instance_state=state.instance_manager.BOOTING
instance_state=state.states.BOOTING
)
def test_transition_instance_missing(self):
self._test_transition_hlpr(
event.READ,
state.CreateInstance,
instance_state=state.instance_manager.DOWN
instance_state=state.states.DOWN
)
def test_transition_instance_error(self):
self._test_transition_hlpr(event.READ, state.CalcAction,
instance_state=state.instance_manager.ERROR)
instance_state=state.states.ERROR)
class TestRebuildInstanceState(BaseTestStateCase):
@ -323,7 +323,7 @@ class TestRebuildInstanceState(BaseTestStateCase):
self.instance.stop.assert_called_once_with(self.ctx)
def test_execute_gone(self):
self.instance.state = instance_manager.GONE
self.instance.state = states.GONE
self.assertEqual(
self.state.execute('ignored', self.ctx),
event.DELETE,
@ -342,7 +342,7 @@ class TestClearErrorState(BaseTestStateCase):
self.instance.clear_error.assert_called_once_with(self.ctx)
def test_execute_after_error(self):
self.instance.state = instance_manager.ERROR
self.instance.state = states.ERROR
self.assertEqual(
self.state.execute('passthrough', self.ctx),
'passthrough',
@ -379,21 +379,21 @@ class TestCheckBootState(BaseTestStateCase):
self._test_transition_hlpr(
event.UPDATE,
state.ConfigureInstance,
instance_manager.UP
states.UP
)
def test_transition_hotplug(self):
self._test_transition_hlpr(
event.UPDATE,
state.ReplugInstance,
instance_manager.REPLUG
states.REPLUG
)
def test_transition_instance_booting(self):
self._test_transition_hlpr(
event.UPDATE,
state.CalcAction,
instance_manager.BOOTING
states.BOOTING
)
@ -413,12 +413,12 @@ class TestStopInstanceState(BaseTestStateCase):
def test_transition_delete_instance_down(self):
self._test_transition_hlpr(event.DELETE,
state.Exit,
instance_manager.DOWN)
states.DOWN)
def test_transition_restart_instance_down(self):
self._test_transition_hlpr(event.READ,
state.CreateInstance,
instance_manager.DOWN)
states.DOWN)
class TestReplugState(BaseTestStateCase):
@ -435,14 +435,14 @@ class TestReplugState(BaseTestStateCase):
self._test_transition_hlpr(
event.UPDATE,
state.ConfigureInstance,
instance_manager.REPLUG
states.REPLUG
)
def test_transition_hotplug_failed(self):
self._test_transition_hlpr(
event.UPDATE,
state.StopInstance,
instance_manager.RESTART
states.RESTART
)
@ -454,13 +454,13 @@ class TestConfigureInstanceState(BaseTestStateCase):
state_cls = state.ConfigureInstance
def test_execute_read_configure_success(self):
self.instance.state = instance_manager.CONFIGURED
self.instance.state = states.CONFIGURED
self.assertEqual(self.state.execute(event.READ, self.ctx),
event.READ)
self.instance.configure.assert_called_once_with(self.ctx)
def test_execute_update_configure_success(self):
self.instance.state = instance_manager.CONFIGURED
self.instance.state = states.CONFIGURED
self.assertEqual(self.state.execute(event.UPDATE, self.ctx),
event.POLL)
self.instance.configure.assert_called_once_with(self.ctx)
@ -475,30 +475,30 @@ class TestConfigureInstanceState(BaseTestStateCase):
def test_transition_not_configured_down(self):
self._test_transition_hlpr(event.READ,
state.StopInstance,
instance_manager.DOWN)
states.DOWN)
def test_transition_not_configured_restart(self):
self._test_transition_hlpr(event.READ,
state.StopInstance,
instance_manager.RESTART)
states.RESTART)
def test_transition_not_configured_up(self):
self._test_transition_hlpr(event.READ,
state.PushUpdate,
instance_manager.UP)
states.UP)
def test_transition_read_configured(self):
self._test_transition_hlpr(
event.READ,
state.ReadStats,
instance_manager.CONFIGURED
states.CONFIGURED
)
def test_transition_other_configured(self):
self._test_transition_hlpr(
event.POLL,
state.CalcAction,
instance_manager.CONFIGURED
states.CONFIGURED
)
@ -524,6 +524,7 @@ class TestAutomaton(unittest.TestCase):
super(TestAutomaton, self).setUp()
self.ctx = mock.Mock() # worker context
self.fake_driver = fakes.fake_driver()
self.instance_mgr_cls = \
mock.patch('akanda.rug.instance_manager.InstanceManager').start()
@ -533,7 +534,8 @@ class TestAutomaton(unittest.TestCase):
self.bandwidth_callback = mock.Mock()
self.sm = state.Automaton(
router_id='9306bbd8-f3cc-11e2-bd68-080027e60b25',
driver=self.fake_driver,
resource_id=self.fake_driver.id,
tenant_id='tenant-id',
delete_callback=self.delete_callback,
bandwidth_callback=self.bandwidth_callback,
@ -545,7 +547,7 @@ class TestAutomaton(unittest.TestCase):
def test_send_message(self):
message = mock.Mock()
message.crud = 'update'
with mock.patch.object(self.sm, 'log') as logger:
with mock.patch.object(self.sm.driver, 'log') as logger:
self.sm.send_message(message)
self.assertEqual(len(self.sm._queue), 1)
logger.debug.assert_called_with(
@ -558,7 +560,7 @@ class TestAutomaton(unittest.TestCase):
message.crud = 'update'
for i in range(3):
self.sm.send_message(message)
with mock.patch.object(self.sm, 'log') as logger:
with mock.patch.object(self.sm.driver, 'log') as logger:
self.sm.send_message(message)
logger.warning.assert_called_with(
'incoming message brings queue length to %s',
@ -575,7 +577,7 @@ class TestAutomaton(unittest.TestCase):
def test_send_message_in_error(self):
instance = self.instance_mgr_cls.return_value
instance.state = state.instance_manager.ERROR
instance.state = state.states.ERROR
message = mock.Mock()
message.crud = 'poll'
self.sm.send_message(message)
@ -584,7 +586,7 @@ class TestAutomaton(unittest.TestCase):
# Non-POLL events should *not* be ignored for routers in ERROR state
message.crud = 'create'
with mock.patch.object(self.sm, 'log') as logger:
with mock.patch.object(self.sm.driver, 'log') as logger:
self.sm.send_message(message)
self.assertEqual(len(self.sm._queue), 1)
logger.debug.assert_called_with(
@ -594,31 +596,29 @@ class TestAutomaton(unittest.TestCase):
def test_send_rebuild_message_with_custom_image(self):
instance = self.instance_mgr_cls.return_value
instance.state = state.instance_manager.DOWN
with mock.patch.object(instance_manager.cfg, 'CONF') as conf:
conf.router_image_uuid = 'DEFAULT'
self.sm.state.params.router_image_uuid = conf.router_image_uuid
instance.state = state.states.DOWN
with mock.patch.object(instance_manager.cfg, 'CONF'):
# rebuilds with custom
message = mock.Mock()
message.crud = 'rebuild'
message.body = {'router_image_uuid': 'ABC123'}
self.assertEqual(self.sm.router_image_uuid, conf.router_image_uuid)
message.body = {'image_uuid': 'ABC123'}
self.sm.send_message(message)
self.assertEqual(self.sm.router_image_uuid, 'ABC123')
self.assertEqual(self.sm.image_uuid, 'ABC123')
# rebuilds with image default.
message = mock.Mock()
message.crud = 'rebuild'
message.body = {}
self.sm.send_message(message)
self.assertEqual(self.sm.router_image_uuid, 'DEFAULT')
self.assertEqual(self.sm.image_uuid, self.fake_driver.image_uuid)
def test_has_more_work(self):
with mock.patch.object(self.sm, '_queue') as queue: # noqa
with mock.patch.object(self.sm, '_queue'):
self.assertTrue(self.sm.has_more_work())
def test_has_more_work_deleting(self):
self.sm.deleted = True
with mock.patch.object(self.sm, '_queue') as queue: # noqa
with mock.patch.object(self.sm, '_queue'):
self.assertFalse(self.sm.has_more_work())
def test_update_no_work(self):
@ -645,7 +645,7 @@ class TestAutomaton(unittest.TestCase):
self.sm.action = 'fake'
self.sm.state = fake_state
with mock.patch.object(self.sm, 'log') as log:
with mock.patch.object(self.sm.driver, 'log') as log:
self.sm.update(self.ctx)
log.exception.assert_called_once_with(mock.ANY, fake_state, 'fake')
@ -696,10 +696,10 @@ class TestAutomaton(unittest.TestCase):
def test_has_error(self):
with mock.patch.object(self.sm, 'instance') as instance:
instance.state = instance_manager.ERROR
instance.state = states.ERROR
self.assertTrue(self.sm.has_error())
def test_has_no_error(self):
with mock.patch.object(self.sm, 'instance') as instance:
instance.state = instance_manager.UP
instance.state = states.UP
self.assertFalse(self.sm.has_error())

View File

@ -17,23 +17,28 @@
import mock
import unittest2 as unittest
import uuid
from akanda.rug import event
from akanda.rug import tenant
from akanda.rug.drivers import router
from akanda.rug import state
from akanda.rug import instance_manager
from akanda.rug.drivers import states
from akanda.rug.test.unit import fakes
class TestTenantRouterManager(unittest.TestCase):
class TestTenantResourceManager(unittest.TestCase):
def setUp(self):
super(TestTenantRouterManager, self).setUp()
super(TestTenantResourceManager, self).setUp()
self.fake_driver = fakes.fake_driver()
self.tenant_id = 'cfb48b9c-66f6-11e5-a7be-525400cfc326'
self.instance_mgr = \
mock.patch('akanda.rug.instance_manager.InstanceManager').start()
self.addCleanup(mock.patch.stopall)
self.notifier = mock.Mock()
self.trm = tenant.TenantRouterManager(
self.trm = tenant.TenantResourceManager(
'1234',
notify_callback=self.notifier,
queue_warning_threshold=10,
@ -41,36 +46,56 @@ class TestTenantRouterManager(unittest.TestCase):
)
self.ctx = mock.Mock()
def test_new_router(self):
def test_new_resource(self):
r = event.Resource(
tenant_id=self.tenant_id,
id='5678',
driver=router.Router.RESOURCE_NAME,
)
msg = event.Event(
tenant_id='1234',
router_id='5678',
resource=r,
crud=event.CREATE,
body={'key': 'value'},
)
sm = self.trm.get_state_machines(msg, self.ctx)[0]
self.assertEqual(sm.router_id, '5678')
self.assertEqual(sm.resource_id, '5678')
self.assertIn('5678', self.trm.state_machines)
def test_get_state_machine_no_router_id(self):
def test_get_state_machine_no_resoruce_id(self):
r = event.Resource(
tenant_id=self.tenant_id,
id=None,
driver=router.Router.RESOURCE_NAME,
)
msg = event.Event(
tenant_id='1234',
router_id=None,
resource=r,
crud=event.CREATE,
body={'key': 'value'},
)
self.assertRaises(tenant.InvalidIncomingMessage,
self.trm.get_state_machines, msg, self.ctx)
def test_all_routers(self):
self.trm.state_machines.state_machines = {
str(i): state.Automaton(str(i), '1234',
None, None, None, 5, 5)
for i in range(5)
}
def test_all_resources(self):
for i in range(5):
rid = str(uuid.uuid4())
driver = fakes.fake_driver(rid)
sm = state.Automaton(
driver=driver,
worker_context=self.ctx,
resource_id=driver.id,
tenant_id=self.tenant_id,
delete_callback=None,
bandwidth_callback=None,
queue_warning_threshold=5,
reboot_error_threshold=5)
self.trm.state_machines[rid] = sm
r = event.Resource(
tenant_id=self.tenant_id,
id='*',
driver=router.Router.RESOURCE_NAME,
)
msg = event.Event(
tenant_id='1234',
router_id='*',
resource=r,
crud=event.CREATE,
body={'key': 'value'},
)
@ -80,30 +105,51 @@ class TestTenantRouterManager(unittest.TestCase):
def test_errored_routers(self):
self.trm.state_machines.state_machines = {}
for i in range(5):
sm = state.Automaton(str(i), '1234',
None, None, None, 5, 5)
rid = str(uuid.uuid4())
driver = fakes.fake_driver(rid)
sm = state.Automaton(
driver=driver,
worker_context=self.ctx,
resource_id=i,
tenant_id=self.tenant_id,
delete_callback=None,
bandwidth_callback=None,
queue_warning_threshold=5,
reboot_error_threshold=5)
self.trm.state_machines[rid] = sm
# Replace the default mock with one that has 'state' set.
if i == 2:
status = instance_manager.ERROR
status = states.ERROR
else:
status = instance_manager.UP
status = states.UP
sm.instance = mock.Mock(state=status)
self.trm.state_machines.state_machines[str(i)] = sm
r = event.Resource(
tenant_id=self.tenant_id,
id='2',
driver=router.Router.RESOURCE_NAME,
)
msg = event.Event(
tenant_id='1234',
router_id='error',
resource=r,
crud=event.CREATE,
body={'key': 'value'},
)
sms = self.trm.get_state_machines(msg, self.ctx)
self.assertEqual(1, len(sms))
self.assertEqual('2', sms[0].router_id)
self.assertEqual(2, sms[0].resource_id)
self.assertIs(self.trm.state_machines.state_machines['2'], sms[0])
def test_existing_router(self):
def test_existing_resource(self):
r = event.Resource(
tenant_id=self.tenant_id,
id='5678',
driver=router.Router.RESOURCE_NAME,
)
msg = event.Event(
tenant_id='1234',
router_id='5678',
resource=r,
crud=event.CREATE,
body={'key': 'value'},
)
@ -114,54 +160,67 @@ class TestTenantRouterManager(unittest.TestCase):
self.assertIs(sm1, sm2)
self.assertIs(sm1._queue, sm2._queue)
def test_existing_router_of_many(self):
def test_existing_resource_of_many(self):
sms = {}
for router_id in ['5678', 'ABCD', 'EFGH']:
for resource_id in ['5678', 'ABCD', 'EFGH']:
r = event.Resource(
tenant_id=self.tenant_id,
id=resource_id,
driver=router.Router.RESOURCE_NAME,
)
msg = event.Event(
tenant_id='1234',
router_id=router_id,
resource=r,
crud=event.CREATE,
body={'key': 'value'},
)
# First time creates...
sm1 = self.trm.get_state_machines(msg, self.ctx)[0]
sms[router_id] = sm1
sms[resource_id] = sm1
# Second time should return the same objects...
r = event.Resource(
id='5678',
tenant_id=self.tenant_id,
driver=router.Router.RESOURCE_NAME,
)
msg = event.Event(
tenant_id='1234',
router_id='5678',
resource=r,
crud=event.CREATE,
body={'key': 'value'},
)
sm2 = self.trm.get_state_machines(msg, self.ctx)[0]
self.assertIs(sm2, sms['5678'])
def test_delete_router(self):
def test_delete_resource(self):
self.trm.state_machines['1234'] = mock.Mock()
self.trm._delete_router('1234')
self.trm._delete_resource('1234')
self.assertNotIn('1234', self.trm.state_machines)
def test_delete_default_router(self):
self.trm._default_router_id = '1234'
def test_delete_default_resource(self):
self.trm._default_resource_id = '1234'
self.trm.state_machines['1234'] = mock.Mock()
self.trm._delete_router('1234')
self.trm._delete_resource('1234')
self.assertNotIn('1234', self.trm.state_machines)
self.assertIs(None, self.trm._default_router_id)
self.assertIs(None, self.trm._default_resource_id)
def test_delete_not_default_router(self):
self.trm._default_router_id = 'abcd'
def test_delete_not_default_resource(self):
self.trm._default_resource_id = 'abcd'
self.trm.state_machines['1234'] = mock.Mock()
self.trm._delete_router('1234')
self.assertEqual('abcd', self.trm._default_router_id)
self.trm._delete_resource('1234')
self.assertEqual('abcd', self.trm._default_resource_id)
def test_no_update_deleted_router(self):
self.trm._default_router_id = 'abcd'
def test_no_update_deleted_resource(self):
self.trm._default_resource_id = 'abcd'
self.trm.state_machines['5678'] = mock.Mock()
self.trm._delete_router('5678')
self.trm._delete_resource('5678')
self.assertEqual(self.trm.state_machines.values(), [])
msg = event.Event(
r = event.Resource(
tenant_id='1234',
router_id='5678',
id='5678',
driver=router.Router.RESOURCE_NAME,
)
msg = event.Event(
resource=r,
crud=event.CREATE,
body={'key': 'value'},
)
@ -170,9 +229,13 @@ class TestTenantRouterManager(unittest.TestCase):
self.assertIn('5678', self.trm.state_machines.deleted)
def test_deleter_callback(self):
msg = event.Event(
r = event.Resource(
tenant_id='1234',
router_id='5678',
id='5678',
driver=router.Router.RESOURCE_NAME,
)
msg = event.Event(
resource=r,
crud=event.CREATE,
body={'key': 'value'},
)
@ -195,8 +258,16 @@ class TestTenantRouterManager(unittest.TestCase):
)
n = notifications[0]
self.assertEqual('1234', n['tenant_id'])
self.assertIn('5678', n['router_id'])
self.assertIn('5678', n['uuid'])
self.assertIn('timestamp', n)
self.assertEqual('akanda.bandwidth.used', n['event_type'])
self.assertIn('a', n['payload'])
self.assertIn('b', n['payload'])
def test_get_state_machine_by_resource_id(self):
fake_sm = mock.Mock()
self.trm.state_machines['fake_resource_id'] = fake_sm
self.assertEqual(
self.trm.get_state_machine_by_resource_id('fake_resource_id'),
fake_sm
)

View File

@ -26,19 +26,22 @@ from oslo_config import cfg
from akanda.rug import commands
from akanda.rug import event
from akanda.rug import notifications
from akanda.rug.drivers import router
from akanda.rug import worker
from akanda.rug.api import neutron
from akanda.rug.test.unit.db import base
class FakeFetchedRouter(object):
id = 'fake_fetched_router_id'
class FakeFetchedResource(object):
id = 'fake_fetched_resource_id'
class WorkerTestBase(base.DbTestCase):
tenant_id = '1040f478-3c74-11e5-a72a-173606e0a6d0'
router_id = '18ffa532-3c74-11e5-a0e7-eb9f90a17ffb'
def setUp(self):
super(WorkerTestBase, self).setUp()
cfg.CONF.boot_timeout = 1
@ -53,54 +56,72 @@ class WorkerTestBase(base.DbTestCase):
fake_neutron_obj.get_ports_for_instance.return_value = (
'mgt_port', ['ext_port', 'int_port'])
fake_neutron_obj.get_router_for_tenant.return_value = (
FakeFetchedRouter())
FakeFetchedResource())
self.fake_neutron = mock.patch.object(
neutron, 'Neutron', return_value=fake_neutron_obj).start()
self.w = worker.Worker(mock.Mock())
self.addCleanup(mock.patch.stopall)
self.target = self.tenant_id
r = event.Resource(
tenant_id=self.tenant_id,
id=self.router_id,
driver=router.Router.RESOURCE_NAME,
)
self.msg = event.Event(
resource=r,
crud=event.CREATE,
body={'key': 'value'},
)
def tearDown(self):
self.w._shutdown()
super(WorkerTestBase, self).tearDown()
def enable_debug(self, router_uuid=None, tenant_uuid=None):
if router_uuid:
self.dbapi.enable_router_debug(router_uuid=router_uuid)
is_debug, _ = self.dbapi.router_in_debug(router_uuid)
if tenant_uuid:
self.dbapi.enable_tenant_debug(tenant_uuid=tenant_uuid)
is_debug, _ = self.dbapi.tenant_in_debug(tenant_uuid)
def enable_debug(self, resource_id=None, tenant_id=None):
if resource_id:
self.dbapi.enable_resource_debug(resource_uuid=resource_id)
is_debug, _ = self.dbapi.resource_in_debug(resource_id)
if tenant_id:
self.dbapi.enable_tenant_debug(tenant_uuid=tenant_id)
is_debug, _ = self.dbapi.tenant_in_debug(tenant_id)
self.assertTrue(is_debug)
def assert_not_in_debug(self, router_uuid=None, tenant_uuid=None):
if router_uuid:
is_debug, _ = self.dbapi.router_in_debug(router_uuid)
in_debug = self.dbapi.routers_in_debug()
uuid = router_uuid
if tenant_uuid:
is_debug, _ = self.dbapi.tenant_in_debug(tenant_uuid)
def assert_not_in_debug(self, resource_id=None, tenant_id=None):
if resource_id:
is_debug, _ = self.dbapi.resource_in_debug(resource_id)
in_debug = self.dbapi.resources_in_debug()
uuid = resource_id
if tenant_id:
is_debug, _ = self.dbapi.tenant_in_debug(tenant_id)
in_debug = self.dbapi.tenants_in_debug()
uuid = tenant_uuid
uuid = tenant_id
self.assertFalse(is_debug)
self.assertNotIn(uuid, in_debug)
class TestWorker(WorkerTestBase):
tenant_id = '1040f478-3c74-11e5-a72a-173606e0a6d0'
router_id = '18ffa532-3c74-11e5-a0e7-eb9f90a17ffb'
resource_id = '18ffa532-3c74-11e5-a0e7-eb9f90a17ffb'
driver = router.Router.RESOURCE_NAME
resource = None
def setUp(self):
super(TestWorker, self).setUp()
self.target = self.tenant_id
self.resource = event.Resource(
self.driver,
self.resource_id,
self.tenant_id)
self.msg = event.Event(
tenant_id=self.tenant_id,
router_id=self.router_id,
resource=self.resource,
crud=event.CREATE,
body={'key': 'value'},
)
self.fake_router_cache = worker.TenantRouterCache()
self.fake_router_cache.get_by_tenant = mock.MagicMock()
self.w.router_cache = self.fake_router_cache
self.fake_cache = worker.TenantResourceCache()
self.fake_cache.get_by_tenant = mock.MagicMock()
self.w.resource_cache = self.fake_cache
def test__should_process_true(self):
self.assertEqual(
@ -118,71 +139,44 @@ class TestWorker(WorkerTestBase):
self.w._should_process(self.msg))
def test__should_process_no_router_id(self):
self.fake_router_cache.get_by_tenant.return_value = (
self.fake_cache.get_by_tenant.return_value = (
'9846d012-3c75-11e5-b476-8321b3ff1a1d')
r = event.Resource(
driver=router.Router.RESOURCE_NAME,
id=None,
tenant_id='fake_tenant_id',
)
expected_r = event.Resource(
driver=router.Router.RESOURCE_NAME,
id='9846d012-3c75-11e5-b476-8321b3ff1a1d',
tenant_id='fake_tenant_id',
)
msg = event.Event(
tenant_id=self.tenant_id,
router_id=None,
resource=r,
crud=event.CREATE,
body={'key': 'value'},
)
expected = event.Event(
tenant_id=self.tenant_id,
router_id='9846d012-3c75-11e5-b476-8321b3ff1a1d',
resource=expected_r,
crud=event.CREATE,
body={'key': 'value'},
)
self.assertEquals(expected, self.w._should_process(msg))
def test__should_process_no_router_id_no_router_found(self):
self.fake_router_cache.get_by_tenant.return_value = None
self.fake_cache.get_by_tenant.return_value = None
r = event.Resource(
driver=router.Router.RESOURCE_NAME,
id=None,
tenant_id='fake_tenant_id',
)
msg = event.Event(
tenant_id=self.tenant_id,
router_id=None,
resource=r,
crud=event.CREATE,
body={'key': 'value'},
)
self.assertFalse(self.w._should_process(msg))
def test__populate_router_id_not_needed(self):
self.assertEqual(
self.w._populate_router_id(self.msg),
self.msg,
)
def test__populate_router_id(self):
self.msg = event.Event(
tenant_id=self.tenant_id,
router_id=None,
crud=event.CREATE,
body={'key': 'value'},
)
self.fake_router_cache.get_by_tenant.return_value = (
'ccd7048c-3c75-11e5-b6a7-53233f5cf591')
expected_msg = event.Event(
tenant_id=self.tenant_id,
router_id='ccd7048c-3c75-11e5-b6a7-53233f5cf591',
crud=event.CREATE,
body={'key': 'value'},
)
res = self.w._populate_router_id(self.msg)
self.assertEqual(res, expected_msg)
self.fake_router_cache.get_by_tenant.assert_called_with(
self.tenant_id, self.w._context)
def test__populate_router_id_not_found(self):
self.msg = event.Event(
tenant_id=self.tenant_id,
router_id=None,
crud=event.CREATE,
body={'key': 'value'},
)
self.fake_router_cache.get_by_tenant.return_value = None
res = self.w._populate_router_id(self.msg)
self.assertEqual(res, self.msg)
self.fake_router_cache.get_by_tenant.assert_called_with(
self.tenant_id, self.w._context)
@mock.patch('akanda.rug.worker.Worker._deliver_message')
@mock.patch('akanda.rug.worker.Worker._should_process')
def test_handle_message_should_process(self, fake_should_process,
@ -191,8 +185,7 @@ class TestWorker(WorkerTestBase):
# deliver_message, in case some processing has been done on
# it
new_msg = event.Event(
tenant_id='0e50fe44-3c77-11e5-9b4c-d75b3be53047',
router_id='11df3102-3c77-11e5-9c6f-2fd3b676e508',
resource=self.resource,
crud=event.CREATE,
body={'key': 'value'},
)
@ -212,49 +205,75 @@ class TestWorker(WorkerTestBase):
fake_should_process.assert_called_with(self.msg)
class TestRouterCache(WorkerTestBase):
class TestResourceCache(WorkerTestBase):
def setUp(self):
super(TestRouterCache, self).setUp()
self.router_cache = worker.TenantRouterCache()
super(TestResourceCache, self).setUp()
self.resource_cache = worker.TenantResourceCache()
self.worker_context = worker.WorkerContext()
def test_router_cache_hit(self):
self.router_cache._tenant_routers = {
'fake_tenant_id': 'fake_cached_router_id',
def test_resource_cache_hit(self):
self.resource_cache._tenant_resources = {
router.Router.RESOURCE_NAME: {
'fake_tenant_id': 'fake_cached_resource_id',
}
}
res = self.router_cache.get_by_tenant(
tenant_uuid='fake_tenant_id', worker_context=self.worker_context)
self.assertEqual(res, 'fake_cached_router_id')
r = event.Resource(
tenant_id='fake_tenant_id',
id='fake_resource_id',
driver=router.Router.RESOURCE_NAME,
)
msg = event.Event(resource=r, crud=event.UPDATE, body={})
res = self.resource_cache.get_by_tenant(
resource=r, worker_context=self.worker_context, message=msg)
self.assertEqual(res, 'fake_cached_resource_id')
self.assertFalse(self.w._context.neutron.get_router_for_tenant.called)
def test_router_cache_miss(self):
res = self.router_cache.get_by_tenant(
tenant_uuid='fake_tenant_id', worker_context=self.worker_context)
self.assertEqual(res, 'fake_fetched_router_id')
def test_resource_cache_miss(self):
r = event.Resource(
tenant_id='fake_tenant_id',
id='fake_fetched_resource_id',
driver=router.Router.RESOURCE_NAME,
)
msg = event.Event(
resource=r,
crud=event.UPDATE,
body={},
)
res = self.resource_cache.get_by_tenant(
resource=r,
worker_context=self.worker_context,
message=msg)
self.assertEqual(res, 'fake_fetched_resource_id')
self.w._context.neutron.get_router_for_tenant.assert_called_with(
'fake_tenant_id')
class TestCreatingRouter(WorkerTestBase):
class TestCreatingResource(WorkerTestBase):
def setUp(self):
super(TestCreatingRouter, self).setUp()
super(TestCreatingResource, self).setUp()
self.tenant_id = '98dd9c41-d3ac-4fd6-8927-567afa0b8fc3'
self.router_id = 'ac194fc5-f317-412e-8611-fb290629f624'
self.hostname = 'akanda'
self.resource = event.Resource(router.Router.RESOURCE_NAME,
self.router_id,
self.tenant_id)
self.msg = event.Event(
tenant_id=self.tenant_id,
router_id=self.router_id,
resource=self.resource,
crud=event.CREATE,
body={'key': 'value'},
)
self.w.handle_message(self.tenant_id, self.msg)
self.w._should_process_message = mock.MagicMock(return_value=self.msg)
def test_in_tenant_managers(self):
self.w.handle_message(self.tenant_id, self.msg)
self.assertIn(self.tenant_id, self.w.tenant_managers)
trm = self.w.tenant_managers[self.tenant_id]
self.assertEqual(self.tenant_id, trm.tenant_id)
def test_message_enqueued(self):
self.w.handle_message(self.tenant_id, self.msg)
trm = self.w.tenant_managers[self.tenant_id]
sm = trm.get_state_machines(self.msg, worker.WorkerContext())[0]
self.assertEqual(len(sm._queue), 1)
@ -264,35 +283,40 @@ class TestWildcardMessages(WorkerTestBase):
def setUp(self):
super(TestWildcardMessages, self).setUp()
self.tenant_id_1 = 'a8f964d4-6631-11e5-a79f-525400cfc32a'
self.tenant_id_2 = 'ef1a6e90-6631-11e5-83cb-525400cfc326'
# Create some tenants
for msg in [
event.Event(
tenant_id='98dd9c41-d3ac-4fd6-8927-567afa0b8fc3',
router_id='ABCD',
resource=event.Resource(
driver=router.Router.RESOURCE_NAME,
id='ABCD',
tenant_id=self.tenant_id_1,
),
crud=event.CREATE,
body={'key': 'value'},
),
event.Event(
tenant_id='ac194fc5-f317-412e-8611-fb290629f624',
router_id='EFGH',
resource=event.Resource(
driver=router.Router.RESOURCE_NAME,
id='EFGH',
tenant_id=self.tenant_id_2),
crud=event.CREATE,
body={'key': 'value'},
)]:
self.w.handle_message(msg.tenant_id, msg)
self.w.handle_message(msg.resource.tenant_id, msg)
def test_wildcard_to_all(self):
trms = self.w._get_trms('*')
ids = sorted(trm.tenant_id for trm in trms)
self.assertEqual(ids,
['98dd9c41-d3ac-4fd6-8927-567afa0b8fc3',
'ac194fc5-f317-412e-8611-fb290629f624'])
self.assertEqual(ids, [self.tenant_id_1, self.tenant_id_2])
def test_wildcard_to_error(self):
trms = self.w._get_trms('error')
ids = sorted(trm.tenant_id for trm in trms)
self.assertEqual(ids,
['98dd9c41-d3ac-4fd6-8927-567afa0b8fc3',
'ac194fc5-f317-412e-8611-fb290629f624'])
self.assertEqual(ids, [self.tenant_id_1, self.tenant_id_2])
class TestShutdown(WorkerTestBase):
@ -326,20 +350,12 @@ class TestUpdateStateMachine(WorkerTestBase):
self.worker_context = worker.WorkerContext()
def test(self):
tenant_id = '98dd9c41-d3ac-4fd6-8927-567afa0b8fc3'
router_id = 'ac194fc5-f317-412e-8611-fb290629f624'
msg = event.Event(
tenant_id=tenant_id,
router_id=router_id,
crud=event.CREATE,
body={'key': 'value'},
)
# Create the router manager and state machine so we can
# replace the update() method with a mock.
trm = self.w._get_trms(tenant_id)[0]
sm = trm.get_state_machines(msg, self.worker_context)[0]
trm = self.w._get_trms(self.tenant_id)[0]
sm = trm.get_state_machines(self.msg, self.worker_context)[0]
with mock.patch.object(sm, 'update') as meth:
self.w.handle_message(tenant_id, msg)
self.w.handle_message(self.tenant_id, self.msg)
# Add a null message so the worker loop will exit. We have
# to do this directly, because if we do it through
# handle_message() that triggers shutdown logic that keeps
@ -357,7 +373,7 @@ class TestReportStatus(WorkerTestBase):
with mock.patch.object(self.w, 'report_status') as meth:
self.w.handle_message(
'debug',
event.Event('*', '', event.COMMAND,
event.Event('*', event.COMMAND,
{'command': commands.WORKERS_DEBUG})
)
meth.assert_called_once_with()
@ -366,69 +382,82 @@ class TestReportStatus(WorkerTestBase):
with mock.patch('akanda.rug.worker.cfg.CONF') as conf:
self.w.handle_message(
'debug',
event.Event('*', '', event.COMMAND,
event.Event('*', event.COMMAND,
{'command': commands.WORKERS_DEBUG})
)
self.assertTrue(conf.log_opt_values.called)
class TestDebugRouters(WorkerTestBase):
def setUp(self):
super(TestDebugRouters, self).setUp()
self.w._should_process_command = mock.MagicMock(return_value=self.msg)
def testNoDebugs(self):
self.assertEqual(self.dbapi.routers_in_debug(), set())
self.assertEqual(self.dbapi.resources_in_debug(), set())
def testWithDebugs(self):
self.w.handle_message(
'*',
event.Event('*', '', event.COMMAND,
{'command': commands.ROUTER_DEBUG,
'router_id': 'this-router-id',
event.Event('*', event.COMMAND,
{'command': commands.RESOURCE_DEBUG,
'resource_id': 'this-resource-id',
'reason': 'foo'}),
)
self.enable_debug(router_uuid='this-router-id')
self.assertIn(('this-router-id', 'foo'), self.dbapi.routers_in_debug())
self.enable_debug(resource_id='this-resource-id')
self.assertIn(('this-resource-id', 'foo'),
self.dbapi.resources_in_debug())
def testManage(self):
self.enable_debug(router_uuid='this-router-id')
self.enable_debug(resource_id='this-resource-id')
lock = mock.Mock()
self.w._router_locks['this-router-id'] = lock
self.w._resource_locks['this-resource-id'] = lock
r = event.Resource(
tenant_id='*',
id='*',
driver=None,
)
self.w.handle_message(
'*',
event.Event('*', '', event.COMMAND,
{'command': commands.ROUTER_MANAGE,
'router_id': 'this-router-id'}),
event.Event(
resource=r,
crud=event.COMMAND,
body={'command': commands.RESOURCE_MANAGE,
'resource_id': 'this-resource-id'}),
)
self.assert_not_in_debug(router_uuid='this-router-id')
self.assert_not_in_debug(resource_id='this-resource-id')
self.assertEqual(lock.release.call_count, 1)
def testManageNoLock(self):
self.enable_debug(router_uuid='this-router-id')
self.enable_debug(resource_id='this-resource-id')
self.w.handle_message(
'*',
event.Event('*', '', event.COMMAND,
{'command': commands.ROUTER_MANAGE,
'router_id': 'this-router-id'}),
event.Event('*', event.COMMAND,
{'command': commands.RESOURCE_MANAGE,
'resource_id': 'this-resource-id'}),
)
self.assert_not_in_debug(router_uuid='this-router-id')
self.assert_not_in_debug(resource_id='this-resource-id')
def testManageUnlocked(self):
self.enable_debug(router_uuid='this-router-id')
self.enable_debug(resource_id='this-resource-id')
lock = threading.Lock()
self.w._router_locks['this-router-id'] = lock
self.w._resource_locks['this-resource-id'] = lock
self.w.handle_message(
'*',
event.Event('*', '', event.COMMAND,
{'command': commands.ROUTER_MANAGE,
'router_id': 'this-router-id'}),
event.Event('*', event.COMMAND,
{'command': commands.RESOURCE_MANAGE,
'resource_id': 'this-resource-id'}),
)
self.assert_not_in_debug(router_uuid='this-router-id')
self.assert_not_in_debug(resource_id='this-resource-id')
def testDebugging(self):
tenant_id = '98dd9c41-d3ac-4fd6-8927-567afa0b8fc3'
router_id = 'ac194fc5-f317-412e-8611-fb290629f624'
self.enable_debug(router_uuid=router_id)
resource_id = 'ac194fc5-f317-412e-8611-fb290629f624'
self.enable_debug(resource_id=resource_id)
msg = event.Event(
tenant_id=tenant_id,
router_id=router_id,
resource=event.Resource(router.Router.RESOURCE_NAME,
resource_id,
tenant_id),
crud=event.CREATE,
body={'key': 'value'},
)
@ -444,14 +473,18 @@ class TestDebugRouters(WorkerTestBase):
class TestDebugTenants(WorkerTestBase):
def setUp(self):
super(TestDebugTenants, self).setUp()
self.w._should_process_command = mock.MagicMock(return_value=self.msg)
def testNoDebugs(self):
self.assertEqual(self.dbapi.tenants_in_debug(), set())
def testWithDebugs(self):
self.enable_debug(tenant_uuid='this-tenant-id')
self.enable_debug(tenant_id='this-tenant-id')
self.w.handle_message(
'*',
event.Event('*', '', event.COMMAND,
event.Event('*', event.COMMAND,
{'command': commands.TENANT_DEBUG,
'tenant_id': 'this-tenant-id'}),
)
@ -459,22 +492,24 @@ class TestDebugTenants(WorkerTestBase):
self.assertTrue(is_debug)
def testManage(self):
self.enable_debug(tenant_uuid='this-tenant-id')
self.enable_debug(tenant_id='this-tenant-id')
self.w.handle_message(
'*',
event.Event('*', '', event.COMMAND,
event.Event('*', event.COMMAND,
{'command': commands.TENANT_MANAGE,
'tenant_id': 'this-tenant-id'}),
)
self.assert_not_in_debug(tenant_uuid='this-tenant-id')
self.assert_not_in_debug(tenant_id='this-tenant-id')
def testDebugging(self):
tenant_id = '98dd9c41-d3ac-4fd6-8927-567afa0b8fc3'
router_id = 'ac194fc5-f317-412e-8611-fb290629f624'
self.enable_debug(tenant_uuid=tenant_id)
resource_id = 'ac194fc5-f317-412e-8611-fb290629f624'
self.enable_debug(tenant_id=tenant_id)
msg = event.Event(
tenant_id=tenant_id,
router_id=router_id,
resource=event.Resource(
driver=router.Router.RESOURCE_NAME,
id=resource_id,
tenant_id=tenant_id),
crud=event.CREATE,
body={'key': 'value'},
)
@ -495,10 +530,9 @@ class TestConfigReload(WorkerTestBase):
mock_cfg.CONF = mock.MagicMock(
log_opt_values=mock.MagicMock())
tenant_id = '*'
router_id = '*'
resource_id = '*'
msg = event.Event(
tenant_id=tenant_id,
router_id=router_id,
resource=resource_id,
crud=event.COMMAND,
body={'command': commands.CONFIG_RELOAD}
)
@ -525,10 +559,11 @@ class TestGlobalDebug(WorkerTestBase):
def test_global_debug_no_message_sent(self):
self.dbapi.enable_global_debug()
tenant_id = '98dd9c41-d3ac-4fd6-8927-567afa0b8fc3'
router_id = 'ac194fc5-f317-412e-8611-fb290629f624'
resource_id = 'ac194fc5-f317-412e-8611-fb290629f624'
msg = event.Event(
tenant_id=tenant_id,
router_id=router_id,
resource=event.Resource(router.Router.RESOURCE_NAME,
resource_id,
tenant_id),
crud=event.CREATE,
body={'key': 'value'},
)

View File

@ -29,6 +29,7 @@ from oslo_config import cfg
from oslo_log import log as logging
from akanda.rug import commands
from akanda.rug import drivers
from akanda.rug.common.i18n import _LE, _LI, _LW
from akanda.rug import event
from akanda.rug import tenant
@ -64,30 +65,48 @@ WORKER_OPTS = [
]
CONF.register_opts(WORKER_OPTS)
EVENT_COMMANDS = {
commands.RESOURCE_UPDATE: event.UPDATE,
commands.RESOURCE_REBUILD: event.REBUILD,
}
DEPRECATED_ROUTER_COMMANDS = {
commands.ROUTER_UPDATE: event.UPDATE,
commands.ROUTER_REBUILD: event.REBUILD,
}
def _normalize_uuid(value):
return str(uuid.UUID(value.replace('-', '')))
class TenantRouterCache(object):
"""Holds a cache of default router_ids for tenants. This is constructed
class TenantResourceCache(object):
"""Holds a cache of default resource_ids for tenants. This is constructed
and consulted when we receieve messages with no associated router_id and
avoids a Neutron call per-message of this type.
"""
# NOTE(adam_g): This is a pretty dumb caching layer and can be backed
# by an external system like memcache to further optimize lookups
# across mulitple rugs.
_tenant_routers = {}
_tenant_resources = {}
def get_by_tenant(self, tenant_uuid, worker_context):
if tenant_uuid not in self._tenant_routers:
router = worker_context.neutron.get_router_for_tenant(
tenant_uuid)
if not router:
LOG.debug('Router not found for tenant %s.', tenant_uuid)
def get_by_tenant(self, resource, worker_context, message):
tenant_id = resource.tenant_id
driver = resource.driver
cached_resources = self._tenant_resources.get(driver, {})
if tenant_id not in cached_resources:
resource_id = drivers.get(driver).get_resource_id_for_tenant(
worker_context, tenant_id, message)
if not resource_id:
LOG.debug('%s not found for tenant %s.',
driver, tenant_id)
return None
self._tenant_routers[tenant_uuid] = router.id
return self._tenant_routers[tenant_uuid]
if not cached_resources:
self._tenant_resources[driver] = {}
self._tenant_resources[driver][tenant_id] = resource_id
return self._tenant_resources[driver][tenant_id]
class WorkerContext(object):
@ -114,7 +133,7 @@ class Worker(object):
self.lock = threading.Lock()
self._keep_going = True
self.tenant_managers = {}
self.router_cache = TenantRouterCache()
self.resource_cache = TenantResourceCache()
# This process-global context should not be used in the
# threads, since the clients are not thread-safe.
@ -124,12 +143,12 @@ class Worker(object):
# happens inside the worker process and not the parent.
self.notifier.start()
# The DB is used for trakcing debug modes
# The DB is used for tracking debug modes
self.db_api = db_api.get_instance()
# Thread locks for the routers so we only put one copy in the
# work queue at a time
self._router_locks = collections.defaultdict(threading.Lock)
self._resource_locks = collections.defaultdict(threading.Lock)
# Messages about what each thread is doing, keyed by thread id
# and reported by the debug command.
self._thread_status = {}
@ -170,25 +189,26 @@ class Worker(object):
# Make sure we didn't already have some updates under way
# for a router we've been told to ignore for debug mode.
should_ignore, reason = self.db_api.router_in_debug(sm.router_id)
should_ignore, reason = \
self.db_api.resource_in_debug(sm.resource_id)
if should_ignore:
LOG.debug('Skipping update of router %s in debug mode. '
'(reason: %s)', sm.router_id, reason)
LOG.debug('Skipping update of resource %s in debug mode. '
'(reason: %s)', sm.resource_id, reason)
continue
# FIXME(dhellmann): Need to look at the router to see if
# it belongs to a tenant which is in debug mode, but we
# don't have that data in the sm, yet.
LOG.debug('performing work on %s for tenant %s',
sm.router_id, sm.tenant_id)
sm.resource_id, sm.tenant_id)
try:
self._thread_status[my_id] = 'updating %s' % sm.router_id
self._thread_status[my_id] = 'updating %s' % sm.resource_id
sm.update(context)
except:
LOG.exception(_LE('could not complete update for %s'),
sm.router_id)
sm.resource_id)
finally:
self._thread_status[my_id] = (
'finalizing task for %s' % sm.router_id
'finalizing task for %s' % sm.resource_id
)
self.work_queue.task_done()
with self.lock:
@ -199,17 +219,17 @@ class Worker(object):
# queue lock so the main thread cannot put the
# state machine back into the queue until we
# release that lock.
self._release_router_lock(sm)
self._release_resource_lock(sm)
# The state machine has indicated that it is done
# by returning. If there is more work for it to
# do, reschedule it by placing it at the end of
# the queue.
if sm.has_more_work():
LOG.debug('%s has more work, returning to work queue',
sm.router_id)
self._add_router_to_work_queue(sm)
sm.resource_id)
self._add_resource_to_work_queue(sm)
else:
LOG.debug('%s has no more work', sm.router_id)
LOG.debug('%s has no more work', sm.resource_id)
# Return the context object so tests can look at it
self._thread_status[my_id] = 'exiting'
return context
@ -252,7 +272,7 @@ class Worker(object):
tenant_id = _normalize_uuid(target)
if tenant_id not in self.tenant_managers:
LOG.debug('creating tenant manager for %s', tenant_id)
self.tenant_managers[tenant_id] = tenant.TenantRouterManager(
self.tenant_managers[tenant_id] = tenant.TenantResourceManager(
tenant_id=tenant_id,
notify_callback=self.notifier.publish,
queue_warning_threshold=self._queue_warning_threshold,
@ -260,32 +280,42 @@ class Worker(object):
)
return [self.tenant_managers[tenant_id]]
def _populate_router_id(self, message):
"""Ensure message is populated with a router_id if it does
not contain one. If not, attempt to lookup by tenant
def _populate_resource_id(self, message):
"""Ensure message's resource is populated with a resource id if it
does not contain one. If not, attempt to lookup by tenant using the
driver supplied functionality.
:param message: event.Event object
:returns: a new event.Event object with a populated router_id if
found.
:returns: a new event.Event object with a populated Event.resource.id
if found, otherwise the original Event is returned.
"""
if message.router_id:
if message.resource.id:
return message
LOG.debug("Looking for router for %s", message.tenant_id)
router_id = self.router_cache.get_by_tenant(
message.tenant_id, self._context)
if not router_id:
LOG.debug("Looking for %s resource for for tenant %s",
message.resource.driver, message.resource.tenant_id)
resource_id = self.resource_cache.get_by_tenant(
message.resource, self._context, message)
if not resource_id:
LOG.warning(_LW(
'Router not found for tenant %s.'), message.tenant_id)
'Resource of type %s not found for tenant %s.'),
message.resource.driver, message.resource.tenant_id)
else:
new_resource = event.Resource(
id=resource_id,
driver=message.resource.driver,
tenant_id=message.resource.tenant_id,
)
new_message = event.Event(
router_id=router_id,
tenant_id=message.tenant_id,
resource=new_resource,
crud=message.crud,
body=message.body,
)
message = new_message
LOG.debug("Using router %s for tenant %s",
router_id, message.tenant_id)
LOG.debug("Using resource %s.", new_resource)
return message
def _should_process(self, message):
@ -296,29 +326,31 @@ class Worker(object):
'mode. (reason: %s)', reason)
return False
should_ignore, reason = self.db_api.tenant_in_debug(message.tenant_id)
if should_ignore:
LOG.info(
'Ignoring message intended for tenant %s in debug mode '
'(reason: %s): %s',
message.tenant_id, reason, message,
)
return False
if message.resource.id not in commands.WILDCARDS:
message = self._populate_resource_id(message)
if not message.resource.id:
LOG.info(_LI('Ignoring message with no resource found.'))
return False
message = self._populate_router_id(message)
if not message.router_id:
LOG.info(_LI('Ignoring message with no router found.'))
return False
should_ignore, reason = \
self.db_api.tenant_in_debug(message.resource.tenant_id)
if should_ignore:
LOG.info(
'Ignoring message intended for tenant %s in debug mode '
'(reason: %s): %s',
message.resource.tenant_id, reason, message,
)
return False
should_ignore, reason = self.db_api.router_in_debug(
message.router_id)
if should_ignore:
LOG.info(
'Ignoring message intended for router %s in '
'debug mode (reason: %s): %s',
message.router_id, reason, message,
)
return False
should_ignore, reason = self.db_api.resource_in_debug(
message.resource.id)
if should_ignore:
LOG.info(
'Ignoring message intended for resource %s in '
'debug mode (reason: %s): %s',
message.resource.id, reason, message,
)
return False
return message
@ -342,57 +374,105 @@ class Worker(object):
with self.lock:
self._deliver_message(target, message)
_EVENT_COMMANDS = {
commands.ROUTER_UPDATE: event.UPDATE,
commands.ROUTER_REBUILD: event.REBUILD,
}
def _find_state_machine_by_resource_id(self, resource_id):
for trm in self.tenant_managers.values():
sm = trm.get_state_machine_by_resource_id(resource_id)
if sm:
return sm
def _dispatch_command(self, target, message):
instructions = message.body
if instructions['command'] == commands.WORKERS_DEBUG:
self.report_status()
elif instructions['command'] == commands.ROUTER_DEBUG:
router_id = instructions['router_id']
reason = instructions.get('reason')
if router_id in commands.WILDCARDS:
# NOTE(adam_g): Drop 'router-debug' compat in M.
elif (instructions['command'] == commands.RESOURCE_DEBUG or
instructions['command'] == commands.ROUTER_DEBUG):
resource_id = (instructions.get('resource_id') or
instructions.get('router_id'))
if not resource_id:
LOG.warning(_LW(
'Ignoring instruction to debug all routers with %r'),
router_id)
'Ignoring instruction to debug resource with no id'))
return
reason = instructions.get('reason')
if resource_id in commands.WILDCARDS:
LOG.warning(_LW(
'Ignoring instruction to debug all resources with %r'),
resource_id)
else:
LOG.info(_LI('Placing router %s in debug mode (reason: %s)'),
router_id, reason)
self.db_api.enable_router_debug(router_id, reason)
resource_id, reason)
self.db_api.enable_resource_debug(resource_id, reason)
elif instructions['command'] == commands.ROUTER_MANAGE:
router_id = instructions['router_id']
elif (instructions['command'] == commands.RESOURCE_MANAGE or
instructions['command'] == commands.ROUTER_MANAGE):
resource_id = (instructions.get('resource_id') or
instructions.get('router_id'))
if not resource_id:
LOG.warning(_LW(
'Ignoring instruction to manage resource with no id'))
return
try:
self.db_api.disable_router_debug(router_id)
LOG.info(_LI('Resuming management of router %s'), router_id)
self.db_api.disable_resource_debug(resource_id)
LOG.info(_LI('Resuming management of resource %s'),
resource_id)
except KeyError:
pass
try:
self._router_locks[router_id].release()
LOG.info(_LI('Unlocked router %s'), router_id)
self._resource_locks[resource_id].release()
LOG.info(_LI('Unlocked resource %s'), resource_id)
except KeyError:
pass
except threading.ThreadError:
# Already unlocked, that's OK.
pass
elif instructions['command'] in self._EVENT_COMMANDS:
elif instructions['command'] in EVENT_COMMANDS:
resource_id = instructions.get('resource_id')
sm = self._find_state_machine_by_resource_id(resource_id)
if not sm:
LOG.debug(
'Will not process command, no managed state machine '
'found for resource %s', resource_id)
return
new_res = event.Resource(
id=resource_id,
driver=sm.driver.RESOURCE_NAME,
tenant_id=sm.tenant_id)
new_msg = event.Event(
tenant_id=message.tenant_id,
router_id=message.router_id,
crud=self._EVENT_COMMANDS[instructions['command']],
resource=new_res,
crud=EVENT_COMMANDS[instructions['command']],
body=instructions,
)
# Use handle_message() to ensure we acquire the lock
LOG.info(_LI('sending %s instruction to %s'),
instructions['command'], message.tenant_id)
self.handle_message(new_msg.tenant_id, new_msg)
instructions['command'], new_res)
self.handle_message(new_msg.resource.tenant_id, new_msg)
LOG.info(_LI('forced %s for %s complete'),
instructions['command'], message.tenant_id)
instructions['command'], new_res)
# NOTE(adam_g): This is here to support the deprecated old format of
# sending commands to specific routers and can be
# removed once the CLI component is dropped in M.
elif instructions['command'] in DEPRECATED_ROUTER_COMMANDS:
print 'XXX DEPR'
new_rsc = event.Resource(
driver=drivers.router.Router.RESOURCE_NAME,
id=message.body.get('router_id'),
tenant_id=message.body.get('tenant_id'),
)
new_msg = event.Event(
resource=new_rsc,
crud=DEPRECATED_ROUTER_COMMANDS[instructions['command']],
body=instructions,
)
# Use handle_message() to ensure we acquire the lock
LOG.info(_LI('sending %s instruction to %s'),
instructions['command'], new_rsc)
self.handle_message(new_msg.resource.tenant_id, new_msg)
LOG.info(_LI('forced %s for %s complete'),
instructions['command'], new_rsc)
elif instructions['command'] == commands.TENANT_DEBUG:
tenant_id = instructions['tenant_id']
@ -453,22 +533,22 @@ class Worker(object):
# at the same time as the thread trying to decide if
# the router is done.
if sm.send_message(message):
self._add_router_to_work_queue(sm)
self._add_resource_to_work_queue(sm)
def _add_router_to_work_queue(self, sm):
"""Queue up the state machine by router id.
def _add_resource_to_work_queue(self, sm):
"""Queue up the state machine by resource name.
The work queue lock should be held before calling this method.
"""
l = self._router_locks[sm.router_id]
l = self._resource_locks[sm.resource_id]
locked = l.acquire(False)
if locked:
self.work_queue.put(sm)
else:
LOG.debug('%s is already in the work queue', sm.router_id)
LOG.debug('%s is already in the work queue', sm.resource_id)
def _release_router_lock(self, sm):
self._router_locks[sm.router_id].release()
def _release_resource_lock(self, sm):
self._resource_locks[sm.resource_id].release()
def report_status(self, show_config=True):
if show_config:
@ -478,7 +558,7 @@ class Worker(object):
self.work_queue.qsize()
)
LOG.info(_LI(
'Number of tenant router managers managed: %d'),
'Number of tenant resource managers managed: %d'),
len(self.tenant_managers)
)
for thread in self.threads:
@ -496,10 +576,10 @@ class Worker(object):
else:
LOG.info(_LI('No tenants in debug mode'))
debug_routers = self.db_api.routers_in_debug()
if self.db_api.routers_in_debug():
for r_uuid, reason in debug_routers:
LOG.info(_LI('Debugging router: %s (reason: %s)'),
r_uuid, reason)
debug_resources = self.db_api.resources_in_debug()
if debug_resources:
for resource_id, reason in debug_resources:
LOG.info(_LI('Debugging resource: %s (reason: %s)'),
resource_id, reason)
else:
LOG.info(_LI('No routers in debug mode'))
LOG.info(_LI('No resources in debug mode'))

View File

@ -80,7 +80,7 @@ function configure_akanda() {
iniset $AKANDA_RUG_CONF DEFAULT interface_driver "akanda.rug.common.linux.interface.BridgeInterfaceDriver"
fi
iniset $AKANDA_RUG_CONF DEFAULT router_ssh_public_key $AKANDA_APPLIANCE_SSH_PUBLIC_KEY
iniset $AKANDA_RUG_CONF DEFAULT ssh_public_key $AKANDA_APPLIANCE_SSH_PUBLIC_KEY
iniset $AKANDA_RUG_CONF database connection `database_connection_url akanda`

View File

@ -38,6 +38,14 @@ console_scripts =
rug-ctl=akanda.rug.cli.main:main
akanda.rug.cli =
config reload=akanda.rug.cli.config:ConfigReload
resource debug=akanda.rug.cli.resource:ResourceDebug
resource manage=akanda.rug.cli.resource:ResourceManage
resource update=akanda.rug.cli.resource:ResourceUpdate
resource rebuild=akanda.rug.cli.resource:ResourceRebuild
# NOTE(adam_g): The 'router' commands are deprecated in favor
# of the generic 'resource' commands and can be dropped in M.
router debug=akanda.rug.cli.router:RouterDebug
router manage=akanda.rug.cli.router:RouterManage
router update=akanda.rug.cli.router:RouterUpdate