Adds a DB layer, use it for debug modes

In preparation for scale out RUG, this adds a database layer
(built on oslo.db) that will be used for managing debug modes.
Instead of tracking debug'd/ignored routers and tenants in-memory
or on disk as files, this uses a database. This means that putting
things into debug mode via rug-ctl are now persistent, and the file-based
approach is no longer available.  A sqlite database (the default) can be
used for single node installs, or the RUG can be pointed at mysql/pg to
handle this in larger environments.

This also adds a global debug mode that can be used to ignore all events
during maintanence periods.

A new optional 'reason' argument has been added to the debug modes, allowing
operators to add a note when entering a tenant/router/cluster into debug
mode.

Change-Id: I3f5129e11b11cf5aaed8889da3b204104e5ad203
Closes-bug: #1470619
Partially implements: blueprint rug-scaling
This commit is contained in:
Adam Gandelman 2015-07-28 21:58:00 -07:00
parent 67f9a1b223
commit 6a3261958b
28 changed files with 1361 additions and 170 deletions

3
.gitignore vendored
View File

@ -39,3 +39,6 @@ test.conf
#pycharm cruft
.idea/*
*.db
*.db_clean

View File

@ -1,21 +0,0 @@
# Copyright 2014 DreamHost, LLC
#
# Author: DreamHost, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import gettext
gettext.install('neutron', unicode=1)

View File

@ -0,0 +1,61 @@
# Copyright 2015 Akanda, Inc.
#
# Author: Akanda, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Commands related to tenants.
"""
import logging
from akanda.rug import commands
from akanda.rug.cli import message
class GlobalDebug(message.MessageSending):
"""Enable or disable global debug mode"""
_COMMAND = commands.GLOBAL_DEBUG
log = logging.getLogger(__name__)
def get_parser(self, prog_name):
p = super(GlobalDebug, self).get_parser(prog_name)
p.add_argument(
'status',
)
p.add_argument(
'--reason',
)
return p
def make_message(self, parsed_args):
status = parsed_args.status.lower()
if status not in ['enable', 'disable']:
m = "Invalid global-debug command, must 'enable' or 'disable'"
raise ValueError(m)
if status == 'enable':
enabled = 1
else:
enabled = 0
self.log.info(
"sending instruction to %s global debug mode" % status
)
return {
'command': self._COMMAND,
'enabled': enabled,
'reason': parsed_args.reason,
}

View File

@ -39,10 +39,14 @@ class _TenantRouterCmd(message.MessageSending):
p.add_argument(
'router_id',
)
p.add_argument(
'--reason',
)
return p
def make_message(self, parsed_args):
router_id = parsed_args.router_id.lower()
reason = parsed_args.reason
if router_id == 'error':
tenant_id = 'error'
elif router_id == '*':
@ -82,6 +86,7 @@ class _TenantRouterCmd(message.MessageSending):
'command': self._COMMAND,
'router_id': router_id,
'tenant_id': tenant_id,
'reason': reason,
}

View File

@ -32,6 +32,9 @@ class _TenantCmd(message.MessageSending):
p.add_argument(
'tenant_id',
)
p.add_argument(
'--reason',
)
return p
def make_message(self, parsed_args):
@ -43,6 +46,7 @@ class _TenantCmd(message.MessageSending):
return {
'command': self._COMMAND,
'tenant_id': parsed_args.tenant_id,
'reason': parsed_args.reason,
}

View File

@ -45,3 +45,5 @@ CONFIG_RELOAD = 'config-reload'
# Force a poll of all routers right now
POLL = 'poll'
GLOBAL_DEBUG = 'global-debug'

View File

128
akanda/rug/db/api.py Normal file
View File

@ -0,0 +1,128 @@
# Copyright 2015 Akanda, Inc.
#
# Author: Akanda, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import six
from oslo_config import cfg
from oslo_db import api as db_api
_BACKEND_MAPPING = {
'sqlalchemy': 'akanda.rug.db.sqlalchemy.api'
}
IMPL = db_api.DBAPI.from_config(
cfg.CONF, backend_mapping=_BACKEND_MAPPING, lazy=True)
def get_instance():
return IMPL
@six.add_metaclass(abc.ABCMeta)
class Connection(object):
@abc.abstractmethod
def __init__(self):
pass
@abc.abstractmethod
def enable_router_debug(self, router_uuid, reason=None):
"""Enter a router into debug mode
:param router_uuid: str uuid of the router to be placed into debug
mode
:param reason: str (optional) reason for entering router into debug
mode
"""
@abc.abstractmethod
def disable_router_debug(self, router_uuid):
"""Remove a router into debug mode
:param router_uuid: str uuid of the router to be removed from debug
mode
"""
@abc.abstractmethod
def router_in_debug(self, router_uuid):
"""Determines if a router is in debug mode
:param router_uuid: str the uuid of the router to query
:returns: tuple (False, None) if router is not in debug mode or
(True, "reason") if it is.
"""
@abc.abstractmethod
def routers_in_debug(self):
"""Queries all routers in debug mode
:returns: a set of (router_uuid, reason) tuples
"""
@abc.abstractmethod
def enable_tenant_debug(self, tenant_uuid, reason=None):
"""Enter a tenant into debug mode
:param tenant_uuid: str uuid of the tenant to be placed into debug
mode
:param reason: str (optional) reason for entering tenant into debug
mode
"""
@abc.abstractmethod
def disable_tenant_debug(self, tenant_uuid):
"""Remove a tenant into debug mode
:param tenant_uuid: str uuid of the tenant to be removed from debug
mode
"""
@abc.abstractmethod
def tenant_in_debug(self, tenant_uuid):
"""Determines if a tenant is in debug mode
:param tenant_uuid: str the uuid of the tenant to query
:returns: tuple (False, None) if tenant is not in debug mode or
(True, "reason") if it is.
"""
@abc.abstractmethod
def tenants_in_debug(self):
"""Queries all tenants in debug mode
:returns: a set of (tenant_uuid, reason) tuples
"""
@abc.abstractmethod
def enable_global_debug(self, reason=None):
"""Enter the entire system into debug mode
:param reason: str (optional) reason for entering cluster into global
debug mode.
"""
@abc.abstractmethod
def disable_global_debug(self):
"""Remove the entire system from global debug mode"""
@abc.abstractmethod
def global_debug(self):
"""Determine whether cluster is in global debug mode
:returns: bool True if cluster is in debug mode
:returns: tuple (False, None) if cluster is not in global debug mode or
(True, "reason") if it is.
"""

View File

@ -0,0 +1,56 @@
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Database setup and migration commands."""
from oslo_config import cfg
from stevedore import driver
_IMPL = None
def get_backend():
global _IMPL
if not _IMPL:
cfg.CONF.import_opt('backend', 'oslo_db.options', group='database')
_IMPL = driver.DriverManager("akanda.database.migration_backend",
cfg.CONF.database.backend).driver
return _IMPL
def upgrade(version=None):
"""Migrate the database to `version` or the most recent version."""
return get_backend().upgrade(version)
def downgrade(version=None):
return get_backend().downgrade(version)
def version():
return get_backend().version()
def stamp(version):
return get_backend().stamp(version)
def revision(message, autogenerate):
return get_backend().revision(message, autogenerate)
def create_schema():
return get_backend().create_schema()

View File

View File

@ -0,0 +1,54 @@
# A generic, single database configuration.
[alembic]
# path to migration scripts
script_location = %(here)s/alembic
# template used to generate migration files
# file_template = %%(rev)s_%%(slug)s
# max length of characters to apply to the
# "slug" field
#truncate_slug_length = 40
# set to 'true' to run the environment during
# the 'revision' command, regardless of autogenerate
# revision_environment = false
#sqlalchemy.url = driver://user:pass@localhost/dbname
# Logging configuration
[loggers]
keys = root,sqlalchemy,alembic
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARN
handlers = console
qualname =
[logger_sqlalchemy]
level = WARN
handlers =
qualname = sqlalchemy.engine
[logger_alembic]
level = INFO
handlers =
qualname = alembic
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S

View File

@ -0,0 +1,61 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from logging import config as log_config
from alembic import context
try:
# NOTE(whaom): This is to register the DB2 alembic code which
# is an optional runtime dependency.
from ibm_db_alembic.ibm_db import IbmDbImpl # noqa
except ImportError:
pass
from akanda.rug.db.sqlalchemy import api as sqla_api
from akanda.rug.db.sqlalchemy import models
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
# Interpret the config file for Python logging.
# This line sets up loggers basically.
log_config.fileConfig(config.config_file_name)
# add your model's MetaData object here
# for 'autogenerate' support
# from myapp import mymodel
target_metadata = models.Base.metadata
# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.
def run_migrations_online():
"""Run migrations in 'online' mode.
In this scenario we need to create an Engine
and associate a connection with the context.
"""
engine = sqla_api.get_engine()
with engine.connect() as connection:
context.configure(connection=connection,
target_metadata=target_metadata)
with context.begin_transaction():
context.run_migrations()
run_migrations_online()

View File

@ -0,0 +1,22 @@
"""${message}
Revision ID: ${up_revision}
Revises: ${down_revision}
Create Date: ${create_date}
"""
# revision identifiers, used by Alembic.
revision = ${repr(up_revision)}
down_revision = ${repr(down_revision)}
from alembic import op
import sqlalchemy as sa
${imports if imports else ""}
def upgrade():
${upgrades if upgrades else "pass"}
def downgrade():
${downgrades if downgrades else "pass"}

View File

@ -0,0 +1,68 @@
# Copyright 2015 Akanda, Inc.
#
# Author: Akanda, Inc
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""initial_migration
Revision ID: 4f695b725637
Revises: None
Create Date: 2015-07-02 12:29:50.243891
"""
# revision identifiers, used by Alembic.
revision = '4f695b725637'
down_revision = None
from alembic import op
import sqlalchemy as sa
def upgrade():
op.create_table(
'router_debug',
sa.Column('created_at', sa.DateTime(), nullable=True),
sa.Column('updated_at', sa.DateTime(), nullable=True),
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('uuid', sa.String(length=36), nullable=False),
sa.Column('reason', sa.String(length=255), nullable=True),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('uuid', name='uniq_debug_router0uuid'),
)
op.create_table(
'tenant_debug',
sa.Column('created_at', sa.DateTime(), nullable=True),
sa.Column('updated_at', sa.DateTime(), nullable=True),
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('uuid', sa.String(length=36), nullable=False),
sa.Column('reason', sa.String(length=255), nullable=True),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('uuid', name='uniq_debug_tenant0uuid'),
)
op.create_table(
'global_debug',
sa.Column('created_at', sa.DateTime(), nullable=True),
sa.Column('updated_at', sa.DateTime(), nullable=True),
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('status', sa.Integer(), nullable=False),
sa.Column('reason', sa.String(length=255), nullable=True),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('status', name='uniq_global_debug0status'),
)
def downgrade():
raise NotImplementedError(('Downgrade from initial migration is'
' unsupported.'))

View File

@ -0,0 +1,168 @@
# -*- encoding: utf-8 -*-
#
# Copyright 2013 Hewlett-Packard Development Company, L.P.
# Copyright 2015 Akanda, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""SQLAlchemy storage backend."""
from oslo_config import cfg
from oslo_db import exception as db_exc
from oslo_db.sqlalchemy import session as db_session
from oslo_log import log
from akanda.rug.db import api
from akanda.rug.db.sqlalchemy import models
CONF = cfg.CONF
LOG = log.getLogger(__name__)
_FACADE = None
def _create_facade_lazily():
global _FACADE
if _FACADE is None:
_FACADE = db_session.EngineFacade.from_config(CONF)
return _FACADE
def get_engine():
facade = _create_facade_lazily()
return facade.get_engine()
def get_session(**kwargs):
facade = _create_facade_lazily()
return facade.get_session(**kwargs)
def get_backend():
"""The backend is this module itself."""
return Connection()
def model_query(model, *args, **kwargs):
"""Query helper for simpler session usage.
:param session: if present, the session to use
"""
session = kwargs.get('session') or get_session()
query = session.query(model, *args)
return query
class Connection(api.Connection):
"""SqlAlchemy connection."""
def __init__(self):
pass
def _enable_debug(self, model, uuid, reason=None):
model.update({
'uuid': uuid,
'reason': reason,
})
try:
model.save()
except db_exc.DBDuplicateEntry:
pass
def _disable_debug(self, model=None, uuid=None):
query = model_query(model)
query.filter_by(uuid=uuid).delete()
def _check_debug(self, model, uuid):
query = model_query(model)
res = query.filter_by(uuid=uuid).all()
if not res:
return (False, None)
return (True, res[0].reason)
def _list_debug(self, model):
res = model_query(model).all()
return set((r.uuid, r.reason) for r in res)
def enable_router_debug(self, router_uuid, reason=None):
self._enable_debug(
model=models.RouterDebug(),
uuid=router_uuid,
reason=reason,
)
def disable_router_debug(self, router_uuid):
self._disable_debug(
model=models.RouterDebug,
uuid=router_uuid,
)
def router_in_debug(self, router_uuid):
return self._check_debug(models.RouterDebug, router_uuid)
def routers_in_debug(self):
return self._list_debug(models.RouterDebug)
def enable_tenant_debug(self, tenant_uuid, reason=None):
self._enable_debug(
model=models.TenantDebug(),
uuid=tenant_uuid,
reason=reason,
)
def disable_tenant_debug(self, tenant_uuid):
self._disable_debug(
model=models.TenantDebug,
uuid=tenant_uuid,
)
def tenant_in_debug(self, tenant_uuid):
return self._check_debug(models.TenantDebug, tenant_uuid)
def tenants_in_debug(self):
return self._list_debug(models.TenantDebug)
def _set_global_debug(self, status, reason=None):
query = model_query(models.GlobalDebug)
res = query.first()
if not res:
gdb = models.GlobalDebug()
gdb.update({
'status': status,
'reason': reason,
})
gdb.save()
def enable_global_debug(self, reason=None):
gdb = models.GlobalDebug()
gdb.update({
'status': 1,
'reason': reason,
})
try:
gdb.save()
except db_exc.DBDuplicateEntry:
pass
def disable_global_debug(self):
query = model_query(models.GlobalDebug)
query.filter_by(status=1).delete()
def global_debug(self):
query = model_query(models.GlobalDebug)
res = query.filter_by(status=1).all()
if not res:
return (False, None)
return (True, res[0].reason)

View File

@ -0,0 +1,102 @@
# Copyright 2012 New Dream Network, LLC (DreamHost)
# Copyright 2015 Akanda, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import six
from alembic import command as alembic_command
from alembic import config as alembic_config
from alembic import util as alembic_util
from oslo_config import cfg
from akanda.rug.common.i18n import _
_db_opts = [
cfg.StrOpt('connection',
deprecated_name='sql_connection',
default='',
secret=True,
help=_('URL to database')),
cfg.StrOpt('engine',
default='',
help=_('Database engine')),
]
CONF = cfg.CONF
def do_alembic_command(config, cmd, *args, **kwargs):
try:
getattr(alembic_command, cmd)(config, *args, **kwargs)
except alembic_util.CommandError as e:
alembic_util.err(six.text_type(e))
def add_alembic_subparser(sub, cmd):
return sub.add_parser(cmd, help=getattr(alembic_command, cmd).__doc__)
def do_upgrade(config, cmd):
revision = CONF.command.revision or 'head'
do_alembic_command(config, cmd, revision, sql=CONF.command.sql)
def do_stamp(config, cmd):
do_alembic_command(config, cmd,
CONF.command.revision,
sql=CONF.command.sql)
def add_command_parsers(subparsers):
for name in ['current', 'history', 'branches']:
parser = add_alembic_subparser(subparsers, name)
parser.set_defaults(func=do_alembic_command)
parser = add_alembic_subparser(subparsers, 'upgrade')
parser.add_argument('--delta', type=int)
parser.add_argument('--sql', action='store_true')
parser.add_argument('revision', nargs='?')
parser.add_argument('--mysql-engine',
default='',
help='Change MySQL storage engine of current '
'existing tables')
parser.set_defaults(func=do_upgrade)
parser = add_alembic_subparser(subparsers, 'stamp')
parser.add_argument('--sql', action='store_true')
parser.add_argument('revision')
parser.set_defaults(func=do_stamp)
command_opt = cfg.SubCommandOpt('command',
title='Command',
help=_('Available commands'),
handler=add_command_parsers)
CONF.register_cli_opt(command_opt)
def get_alembic_config():
config = alembic_config.Config(os.path.join(os.path.dirname(__file__),
'alembic.ini'))
return config
def main():
CONF(project='akanda-rug')
config = get_alembic_config()
config.akanda_config = CONF
CONF.command.func(config, CONF.command.name)

View File

@ -0,0 +1,113 @@
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import alembic
from alembic import config as alembic_config
import alembic.migration as alembic_migration
from oslo_db import exception as db_exc
from akanda.rug.db.sqlalchemy import api as sqla_api
from akanda.rug.db.sqlalchemy import models
def _alembic_config():
path = os.path.join(os.path.dirname(__file__), 'alembic.ini')
config = alembic_config.Config(path)
return config
def version(config=None, engine=None):
"""Current database version.
:returns: Database version
:rtype: string
"""
if engine is None:
engine = sqla_api.get_engine()
with engine.connect() as conn:
context = alembic_migration.MigrationContext.configure(conn)
return context.get_current_revision()
def upgrade(revision, config=None):
"""Used for upgrading database.
:param version: Desired database version
:type version: string
"""
revision = revision or 'head'
config = config or _alembic_config()
alembic.command.upgrade(config, revision or 'head')
def create_schema(config=None, engine=None):
"""Create database schema from models description.
Can be used for initial installation instead of upgrade('head').
"""
if engine is None:
engine = sqla_api.get_engine()
# NOTE(viktors): If we will use metadata.create_all() for non empty db
# schema, it will only add the new tables, but leave
# existing as is. So we should avoid of this situation.
if version(engine=engine) is not None:
raise db_exc.DbMigrationError("DB schema is already under version"
" control. Use upgrade() instead")
models.Base.metadata.create_all(engine)
stamp('head', config=config)
def downgrade(revision, config=None):
"""Used for downgrading database.
:param version: Desired database version
:type version: string
"""
revision = revision or 'base'
config = config or _alembic_config()
return alembic.command.downgrade(config, revision)
def stamp(revision, config=None):
"""Stamps database with provided revision.
Don't run any migrations.
:param revision: Should match one from repository or head - to stamp
database with most recent revision
:type revision: string
"""
config = config or _alembic_config()
return alembic.command.stamp(config, revision=revision)
def revision(message=None, autogenerate=False, config=None):
"""Creates template for migration.
:param message: Text that will be used for migration title
:type message: string
:param autogenerate: If True - generates diff based on current database
state
:type autogenerate: bool
"""
config = config or _alembic_config()
return alembic.command.revision(config, message=message,
autogenerate=autogenerate)

View File

@ -0,0 +1,113 @@
# -*- encoding: utf-8 -*-
#
# Copyright 2013 Hewlett-Packard Development Company, L.P.
# Copyright 2015 Akanda, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
SQLAlchemy models for baremetal data.
"""
from akanda.rug.common.i18n import _
from oslo_config import cfg
from oslo_db import options as db_options
from oslo_db.sqlalchemy import models
import six.moves.urllib.parse as urlparse
from sqlalchemy import Column
from sqlalchemy import Integer
from sqlalchemy import schema, String
from sqlalchemy.ext.declarative import declarative_base
sql_opts = [
cfg.StrOpt('mysql_engine',
default='InnoDB',
help=_('MySQL engine to use.'))
]
_DEFAULT_SQL_CONNECTION = 'sqlite:///akanda-ruxg.db'
cfg.CONF.register_opts(sql_opts, 'database')
db_options.set_defaults(cfg.CONF, _DEFAULT_SQL_CONNECTION, 'ironic.sqlite')
def table_args():
engine_name = urlparse.urlparse(cfg.CONF.database.connection).scheme
if engine_name == 'mysql':
return {'mysql_engine': cfg.CONF.database.mysql_engine,
'mysql_charset': "utf8"}
return None
class AkandaBase(models.TimestampMixin,
models.ModelBase):
metadata = None
def as_dict(self):
d = {}
for c in self.__table__.columns:
d[c.name] = self[c.name]
return d
def save(self, session=None):
import akanda.rug.db.sqlalchemy.api as db_api
if session is None:
session = db_api.get_session()
super(AkandaBase, self).save(session)
Base = declarative_base(cls=AkandaBase)
class RouterDebug(Base):
"""Represents a router in debug mode."""
__tablename__ = 'router_debug'
__table_args__ = (
schema.UniqueConstraint('uuid', name='uniq_debug_router0uuid'),
table_args()
)
id = Column(Integer, primary_key=True)
uuid = Column(String(36))
reason = Column(String(255), nullable=True)
class TenantDebug(Base):
"""Represents a tenant in debug mode."""
__tablename__ = 'tenant_debug'
__table_args__ = (
schema.UniqueConstraint('uuid', name='uniq_debug_tenant0uuid'),
table_args()
)
id = Column(Integer, primary_key=True)
uuid = Column(String(36))
reason = Column(String(255), nullable=True)
class GlobalDebug(Base):
"""Stores a single row that serves as a status flag for global debug"""
__tablename__ = 'global_debug'
__table_args__ = (
schema.UniqueConstraint('status', name='uniq_global_debug0status'),
table_args()
)
id = Column(Integer, primary_key=True)
status = Column(Integer)
reason = Column(String(255), nullable=True)

View File

@ -114,7 +114,6 @@ def main(argv=sys.argv[1:]):
log.setup(cfg.CONF, 'akanda-rug')
cfg.CONF.log_opt_values(LOG, logging.INFO)
# Purge the mgt tap interface on startup
neutron = neutron_api.Neutron(cfg.CONF)
# TODO(mark): develop better way restore after machine reboot

View File

@ -34,3 +34,9 @@ class RugTestBase(testtools.TestCase):
'etc', 'rug.ini'
)
self.argv = ['--config-file', test_config_file]
def config(self, **kw):
"""Override config options for a test."""
group = kw.pop('group', None)
for k, v in kw.items():
cfg.CONF.set_override(k, v, group)

View File

@ -0,0 +1,15 @@
# Copyright 2015 Akanda, Inc.
#
# Author: Akanda, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

View File

@ -0,0 +1,92 @@
# Copyright (c) 2012 NTT DOCOMO, INC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Akanda Rug DB test base class."""
import os
import shutil
import fixtures
from oslo_config import cfg
from oslo_db.sqlalchemy import enginefacade
from akanda.rug.db import api as dbapi
from akanda.rug.db.sqlalchemy import migration
from akanda.rug.test.unit import base
CONF = cfg.CONF
_DB_CACHE = None
TEST_DB_PATH = os.path.join(os.path.dirname(__file__), 'rug_test.db')
CLEAN_TEST_DB_PATH = os.path.join(os.path.dirname(__file__),
'rug_test.db_clean')
def get_engine(connection):
engine = enginefacade.get_legacy_facade().get_engine()
return engine
class Database(fixtures.Fixture):
def __init__(self, db_migrate, sql_connection):
if sql_connection.startswith('sqlite:///'):
if os.path.exists(TEST_DB_PATH):
os.unlink(TEST_DB_PATH)
if os.path.exists(CLEAN_TEST_DB_PATH):
os.unlink(CLEAN_TEST_DB_PATH)
self.setup_sqlite(sql_connection, db_migrate)
db_migrate.upgrade('head')
elif sql_connection == "sqlite://":
conn = self.engine.connect()
self._DB = "".join(line for line in conn.connection.iterdump())
self.engine.dispose()
db_migrate.upgrade('head')
shutil.copyfile(TEST_DB_PATH, CLEAN_TEST_DB_PATH)
def setup_sqlite(self, sql_connection, db_migrate):
self.sql_connection = sql_connection
self.engine = enginefacade.get_legacy_facade().get_engine()
self.engine.dispose()
self.engine.connect()
def setUp(self):
super(Database, self).setUp()
if self.sql_connection == "sqlite://":
conn = self.engine.connect()
conn.connection.executescript(self._DB)
self.addCleanup(self.engine.dispose)
else:
shutil.copyfile(CLEAN_TEST_DB_PATH,
TEST_DB_PATH)
self.addCleanup(os.unlink, TEST_DB_PATH)
class DbTestCase(base.RugTestBase):
def setUp(self):
super(DbTestCase, self).setUp()
sql_connection = 'sqlite:///' + TEST_DB_PATH
self.config(group='database', connection=sql_connection)
self.dbapi = dbapi.get_instance()
global _DB_CACHE
if not _DB_CACHE:
_DB_CACHE = Database(migration,
sql_connection=sql_connection)
self.useFixture(_DB_CACHE)

View File

@ -0,0 +1,99 @@
# Copyright 2015 Akanda, Inc.
#
# Author: Akanda, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import uuid
from akanda.rug.test.unit.db import base
class TestDBDebugModes(base.DbTestCase):
def test_global_debug(self):
self.dbapi.enable_global_debug()
enabled, reason = self.dbapi.global_debug()
self.assertTrue(enabled)
self.assertIsNone(None)
self.dbapi.disable_global_debug()
enabled, reason = self.dbapi.global_debug()
self.assertFalse(enabled)
self.assertIsNone(reason)
def test_global_debug_with_reason(self):
self.dbapi.enable_global_debug(reason='foo')
enabled, reason = self.dbapi.global_debug()
self.assertTrue(enabled)
self.assertEqual(reason, 'foo')
self.dbapi.disable_global_debug()
enabled, reason = self.dbapi.global_debug()
self.assertFalse(enabled)
self.assertIsNone(reason)
def test_router_debug(self):
r_id = uuid.uuid4().hex
self.dbapi.enable_router_debug(
router_uuid=r_id)
enabled, reason = self.dbapi.router_in_debug(
router_uuid=r_id)
self.assertTrue(enabled)
self.assertIsNone(reason)
self.dbapi.router_in_debug('foo_router')
def test_router_debug_with_reason(self):
r_id = uuid.uuid4().hex
self.dbapi.enable_router_debug(
router_uuid=r_id, reason='foo')
enabled, reason = self.dbapi.router_in_debug(
router_uuid=r_id)
self.assertTrue(enabled)
self.assertEqual(reason, 'foo')
def test_routers_in_debug(self):
r_ids = [uuid.uuid4().hex for i in range(1, 3)]
for r_id in r_ids:
self.dbapi.enable_router_debug(
router_uuid=r_id, reason='router %s is broken' % r_id)
for debug_r_id, reason in self.dbapi.routers_in_debug():
self.assertIn(debug_r_id, r_ids)
self.assertEqual(reason, 'router %s is broken' % debug_r_id)
def test_tenant_debug(self):
t_id = uuid.uuid4().hex
self.dbapi.enable_tenant_debug(
tenant_uuid=t_id)
enabled, reason = self.dbapi.tenant_in_debug(
tenant_uuid=t_id)
self.assertTrue(enabled)
self.assertIsNone(reason)
self.dbapi.tenant_in_debug('foo_tenant')
def test_tenant_debug_with_reason(self):
t_id = uuid.uuid4().hex
self.dbapi.enable_tenant_debug(
tenant_uuid=t_id, reason='foo')
enabled, reason = self.dbapi.tenant_in_debug(
tenant_uuid=t_id)
self.assertTrue(enabled)
self.assertEqual(reason, 'foo')
def test_tenants_in_debug(self):
t_ids = [uuid.uuid4().hex for i in range(1, 3)]
for t_id in t_ids:
self.dbapi.enable_tenant_debug(
tenant_uuid=t_id, reason='tenant %s is broken' % t_id)
for debug_t_id, reason in self.dbapi.tenants_in_debug():
self.assertIn(debug_t_id, t_ids)
self.assertEqual(reason, 'tenant %s is broken' % debug_t_id)

View File

@ -15,8 +15,6 @@
# under the License.
import os
import tempfile
import threading
import mock
@ -33,7 +31,10 @@ from akanda.rug import worker
from akanda.rug.api import neutron
class WorkerTestBase(unittest.TestCase):
from akanda.rug.test.unit.db import base
class WorkerTestBase(base.DbTestCase):
def setUp(self):
super(WorkerTestBase, self).setUp()
cfg.CONF.boot_timeout = 1
@ -57,6 +58,57 @@ class WorkerTestBase(unittest.TestCase):
self.w._shutdown()
super(WorkerTestBase, self).tearDown()
def enable_debug(self, router_uuid=None, tenant_uuid=None):
if router_uuid:
self.dbapi.enable_router_debug(router_uuid=router_uuid)
is_debug, _ = self.dbapi.router_in_debug(router_uuid)
if tenant_uuid:
self.dbapi.enable_tenant_debug(tenant_uuid=tenant_uuid)
is_debug, _ = self.dbapi.tenant_in_debug(tenant_uuid)
self.assertTrue(is_debug)
def assert_not_in_debug(self, router_uuid=None, tenant_uuid=None):
if router_uuid:
is_debug, _ = self.dbapi.router_in_debug(router_uuid)
in_debug = self.dbapi.routers_in_debug()
uuid = router_uuid
if tenant_uuid:
is_debug, _ = self.dbapi.tenant_in_debug(tenant_uuid)
in_debug = self.dbapi.tenants_in_debug()
uuid = tenant_uuid
self.assertFalse(is_debug)
self.assertNotIn(uuid, in_debug)
class TestWorker(WorkerTestBase):
tenant_id = '1040f478-3c74-11e5-a72a-173606e0a6d0'
router_id = '18ffa532-3c74-11e5-a0e7-eb9f90a17ffb'
def setUp(self):
super(TestWorker, self).setUp()
self.target = self.tenant_id
self.msg = event.Event(
tenant_id=self.tenant_id,
router_id=self.router_id,
crud=event.CREATE,
body={'key': 'value'},
)
def test__should_process_true(self):
self.assertEqual(
self.msg,
self.w._should_process(self.msg))
def test__should_process_global_debug(self):
self.dbapi.enable_global_debug()
self.assertFalse(
self.w._should_process(self.msg))
def test__should_process_tenant_debug(self):
self.dbapi.enable_tenant_debug(tenant_uuid=self.tenant_id)
self.assertFalse(
self.w._should_process(self.msg))
class TestCreatingRouter(WorkerTestBase):
def setUp(self):
@ -80,7 +132,7 @@ class TestCreatingRouter(WorkerTestBase):
def test_message_enqueued(self):
trm = self.w.tenant_managers[self.tenant_id]
sm = trm.get_state_machines(self.msg, worker.WorkerContext())[0]
self.assertEqual(1, len(sm._queue))
self.assertEqual(len(sm._queue), 1)
class TestWildcardMessages(WorkerTestBase):
@ -106,16 +158,16 @@ class TestWildcardMessages(WorkerTestBase):
def test_wildcard_to_all(self):
trms = self.w._get_trms('*')
ids = sorted(trm.tenant_id for trm in trms)
self.assertEqual(['98dd9c41-d3ac-4fd6-8927-567afa0b8fc3',
'ac194fc5-f317-412e-8611-fb290629f624'],
ids)
self.assertEqual(ids,
['98dd9c41-d3ac-4fd6-8927-567afa0b8fc3',
'ac194fc5-f317-412e-8611-fb290629f624'])
def test_wildcard_to_error(self):
trms = self.w._get_trms('error')
ids = sorted(trm.tenant_id for trm in trms)
self.assertEqual(['98dd9c41-d3ac-4fd6-8927-567afa0b8fc3',
'ac194fc5-f317-412e-8611-fb290629f624'],
ids)
self.assertEqual(ids,
['98dd9c41-d3ac-4fd6-8927-567afa0b8fc3',
'ac194fc5-f317-412e-8611-fb290629f624'])
class TestShutdown(WorkerTestBase):
@ -197,19 +249,21 @@ class TestReportStatus(WorkerTestBase):
class TestDebugRouters(WorkerTestBase):
def testNoDebugs(self):
self.assertEqual(set(), self.w._debug_routers)
self.assertEqual(self.dbapi.routers_in_debug(), set())
def testWithDebugs(self):
self.w.handle_message(
'*',
event.Event('*', '', event.COMMAND,
{'command': commands.ROUTER_DEBUG,
'router_id': 'this-router-id'}),
'router_id': 'this-router-id',
'reason': 'foo'}),
)
self.assertEqual(set(['this-router-id']), self.w._debug_routers)
self.enable_debug(router_uuid='this-router-id')
self.assertIn(('this-router-id', 'foo'), self.dbapi.routers_in_debug())
def testManage(self):
self.w._debug_routers = set(['this-router-id'])
self.enable_debug(router_uuid='this-router-id')
lock = mock.Mock()
self.w._router_locks['this-router-id'] = lock
self.w.handle_message(
@ -218,21 +272,21 @@ class TestDebugRouters(WorkerTestBase):
{'command': commands.ROUTER_MANAGE,
'router_id': 'this-router-id'}),
)
self.assertEqual(set(), self.w._debug_routers)
self.assert_not_in_debug(router_uuid='this-router-id')
self.assertEqual(lock.release.call_count, 1)
def testManageNoLock(self):
self.w._debug_routers = set(['this-router-id'])
self.enable_debug(router_uuid='this-router-id')
self.w.handle_message(
'*',
event.Event('*', '', event.COMMAND,
{'command': commands.ROUTER_MANAGE,
'router_id': 'this-router-id'}),
)
self.assertEqual(set(), self.w._debug_routers)
self.assert_not_in_debug(router_uuid='this-router-id')
def testManageUnlocked(self):
self.w._debug_routers = set(['this-router-id'])
self.enable_debug(router_uuid='this-router-id')
lock = threading.Lock()
self.w._router_locks['this-router-id'] = lock
self.w.handle_message(
@ -241,13 +295,12 @@ class TestDebugRouters(WorkerTestBase):
{'command': commands.ROUTER_MANAGE,
'router_id': 'this-router-id'}),
)
self.assertEqual(set(), self.w._debug_routers)
self.assert_not_in_debug(router_uuid='this-router-id')
def testDebugging(self):
self.w._debug_routers = set(['ac194fc5-f317-412e-8611-fb290629f624'])
tenant_id = '98dd9c41-d3ac-4fd6-8927-567afa0b8fc3'
router_id = 'ac194fc5-f317-412e-8611-fb290629f624'
self.enable_debug(router_uuid=router_id)
msg = event.Event(
tenant_id=tenant_id,
router_id=router_id,
@ -265,100 +318,35 @@ class TestDebugRouters(WorkerTestBase):
self.w.handle_message(tenant_id, msg)
class TestIgnoreRouters(WorkerTestBase):
def setUp(self):
tmpdir = tempfile.mkdtemp()
cfg.CONF.ignored_router_directory = tmpdir
fullname = os.path.join(tmpdir, 'this-router-id')
with open(fullname, 'a'):
os.utime(fullname, None)
self.addCleanup(lambda: os.unlink(fullname) and os.rmdir(tmpdir))
super(TestIgnoreRouters, self).setUp()
@mock.patch('os.listdir')
def testNoIgnorePath(self, mock_listdir):
mock_listdir.side_effect = OSError()
ignored = self.w._get_routers_to_ignore()
self.assertEqual(set(), ignored)
def testNoIgnores(self):
tmpdir = tempfile.mkdtemp()
cfg.CONF.ignored_router_directory = tmpdir
self.addCleanup(lambda: os.rmdir(tmpdir))
w = worker.Worker(mock.Mock())
ignored = w._get_routers_to_ignore()
self.assertEqual(set(), ignored)
def testWithIgnores(self):
ignored = self.w._get_routers_to_ignore()
self.assertEqual(set(['this-router-id']), ignored)
def testManage(self):
self.w._debug_routers = set(['this-router-id'])
self.w.handle_message(
'*',
event.Event('*', '', event.COMMAND,
{'command': commands.ROUTER_MANAGE,
'router_id': 'this-router-id'}),
)
self.assertEqual(set(), self.w._debug_routers)
def testIgnoring(self):
tmpdir = tempfile.mkdtemp()
cfg.CONF.ignored_router_directory = tmpdir
fullname = os.path.join(tmpdir, 'ac194fc5-f317-412e-8611-fb290629f624')
with open(fullname, 'a'):
os.utime(fullname, None)
self.addCleanup(lambda: os.unlink(fullname) and os.rmdir(tmpdir))
tenant_id = '98dd9c41-d3ac-4fd6-8927-567afa0b8fc3'
router_id = 'ac194fc5-f317-412e-8611-fb290629f624'
msg = event.Event(
tenant_id=tenant_id,
router_id=router_id,
crud=event.CREATE,
body={'key': 'value'},
)
# Create the router manager and state machine so we can
# replace the send_message() method with a mock.
trm = self.w._get_trms(tenant_id)[0]
sm = trm.get_state_machines(msg, worker.WorkerContext())[0]
w = worker.Worker(mock.Mock())
with mock.patch.object(sm, 'send_message') as meth:
# The router id is being ignored, so the send_message()
# method shouldn't ever be invoked.
meth.side_effect = AssertionError('send_message was called')
w.handle_message(tenant_id, msg)
class TestDebugTenants(WorkerTestBase):
def testNoDebugs(self):
self.assertEqual(set(), self.w._debug_tenants)
self.assertEqual(self.dbapi.tenants_in_debug(), set())
def testWithDebugs(self):
self.enable_debug(tenant_uuid='this-tenant-id')
self.w.handle_message(
'*',
event.Event('*', '', event.COMMAND,
{'command': commands.TENANT_DEBUG,
'tenant_id': 'this-tenant-id'}),
)
self.assertEqual(set(['this-tenant-id']), self.w._debug_tenants)
is_debug, _ = self.dbapi.tenant_in_debug('this-tenant-id')
self.assertTrue(is_debug)
def testManage(self):
self.w._debug_tenants = set(['this-tenant-id'])
self.enable_debug(tenant_uuid='this-tenant-id')
self.w.handle_message(
'*',
event.Event('*', '', event.COMMAND,
{'command': commands.TENANT_MANAGE,
'tenant_id': 'this-tenant-id'}),
)
self.assertEqual(set(), self.w._debug_tenants)
self.assert_not_in_debug(tenant_uuid='this-tenant-id')
def testDebugging(self):
self.w._debug_tenants = set(['98dd9c41-d3ac-4fd6-8927-567afa0b8fc3'])
tenant_id = '98dd9c41-d3ac-4fd6-8927-567afa0b8fc3'
router_id = 'ac194fc5-f317-412e-8611-fb290629f624'
self.enable_debug(tenant_uuid=tenant_id)
msg = event.Event(
tenant_id=tenant_id,
router_id=router_id,
@ -398,16 +386,33 @@ class TestNormalizeUUID(unittest.TestCase):
def test_upper(self):
self.assertEqual(
'ac194fc5-f317-412e-8611-fb290629f624',
worker._normalize_uuid(
'ac194fc5-f317-412e-8611-fb290629f624'.upper()
)
)
'ac194fc5-f317-412e-8611-fb290629f624'.upper()),
'ac194fc5-f317-412e-8611-fb290629f624')
def test_no_dashes(self):
self.assertEqual(
'ac194fc5-f317-412e-8611-fb290629f624',
worker._normalize_uuid(
'ac194fc5f317412e8611fb290629f624'
)
worker._normalize_uuid('ac194fc5f317412e8611fb290629f624'),
'ac194fc5-f317-412e-8611-fb290629f624')
class TestGlobalDebug(WorkerTestBase):
def test_global_debug_no_message_sent(self):
self.dbapi.enable_global_debug()
tenant_id = '98dd9c41-d3ac-4fd6-8927-567afa0b8fc3'
router_id = 'ac194fc5-f317-412e-8611-fb290629f624'
msg = event.Event(
tenant_id=tenant_id,
router_id=router_id,
crud=event.CREATE,
body={'key': 'value'},
)
# Create the router manager and state machine so we can
# replace the send_message() method with a mock.
trm = self.w._get_trms(tenant_id)[0]
sm = trm.get_state_machines(msg, worker.WorkerContext())[0]
with mock.patch.object(sm, 'send_message') as meth:
# The tenant id is being ignored, so the send_message()
# method shouldn't ever be invoked.
meth.side_effect = AssertionError('send_message was called')
self.w.handle_message(tenant_id, msg)

View File

@ -19,7 +19,6 @@
"""
import collections
import os
import Queue
import threading
import uuid
@ -34,6 +33,7 @@ from akanda.rug import event
from akanda.rug import tenant
from akanda.rug.api import nova
from akanda.rug.api import neutron
from akanda.rug.db import api as db_api
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
@ -99,9 +99,10 @@ class Worker(object):
# The notifier needs to be started here to ensure that it
# happens inside the worker process and not the parent.
self.notifier.start()
# Track the routers and tenants we are told to ignore
self._debug_routers = set()
self._debug_tenants = set()
# The DB is used for trakcing debug modes
self.db_api = db_api.get_instance()
# Thread locks for the routers so we only put one copy in the
# work queue at a time
self._router_locks = collections.defaultdict(threading.Lock)
@ -142,11 +143,13 @@ class Worker(object):
if sm is None:
LOG.info('received stop message')
break
# Make sure we didn't already have some updates under way
# for a router we've been told to ignore for debug mode.
if sm.router_id in self._debug_routers:
LOG.debug('skipping update of router %s (in debug mode)',
sm.router_id)
should_ignore, reason = self.db_api.router_in_debug(sm.router_id)
if should_ignore:
LOG.debug('Skipping update of router %s in debug mode. '
'(reason: %s)', sm.router_id, reason)
continue
# FIXME(dhellmann): Need to look at the router to see if
# it belongs to a tenant which is in debug mode, but we
@ -233,6 +236,23 @@ class Worker(object):
)
return [self.tenant_managers[tenant_id]]
def _should_process(self, message):
"""Determines whether a message should be processed or not."""
global_debug, reason = self.db_api.global_debug()
if global_debug:
LOG.info('Skipping incoming event, cluster in global debug '
'mode. (reason: %s)', reason)
return False
should_ignore, reason = self.db_api.tenant_in_debug(message.tenant_id)
if should_ignore:
LOG.info(
'Ignoring message intended for tenant %s in debug mode '
'(reason: %s): %s',
message.tenant_id, reason, message,
)
return False
return message
def handle_message(self, target, message):
"""Callback to be used in main
"""
@ -244,6 +264,8 @@ class Worker(object):
if message.crud == event.COMMAND:
self._dispatch_command(target, message)
else:
if not self._should_process(message):
return
# This is an update command for the router, so deliver it
# to the state machine.
with self.lock:
@ -261,18 +283,20 @@ class Worker(object):
elif instructions['command'] == commands.ROUTER_DEBUG:
router_id = instructions['router_id']
reason = instructions.get('reason')
if router_id in commands.WILDCARDS:
LOG.warning(
'Ignoring instruction to debug all routers with %r',
router_id)
else:
LOG.info('Placing router %s in debug mode', router_id)
self._debug_routers.add(router_id)
LOG.info('Placing router %s in debug mode (reason: %s)',
router_id, reason)
self.db_api.enable_router_debug(router_id, reason)
elif instructions['command'] == commands.ROUTER_MANAGE:
router_id = instructions['router_id']
try:
self._debug_routers.remove(router_id)
self.db_api.disable_router_debug(router_id)
LOG.info('Resuming management of router %s', router_id)
except KeyError:
pass
@ -301,22 +325,36 @@ class Worker(object):
elif instructions['command'] == commands.TENANT_DEBUG:
tenant_id = instructions['tenant_id']
reason = instructions.get('reason')
if tenant_id in commands.WILDCARDS:
LOG.warning(
'Ignoring instruction to debug all tenants with %r',
tenant_id)
else:
LOG.info('Placing tenant %s in debug mode', tenant_id)
self._debug_tenants.add(tenant_id)
LOG.info('Placing tenant %s in debug mode (reason: %s)',
tenant_id, reason)
self.db_api.enable_tenant_debug(tenant_id, reason)
elif instructions['command'] == commands.TENANT_MANAGE:
tenant_id = instructions['tenant_id']
try:
self._debug_tenants.remove(tenant_id)
self.db_api.disable_tenant_debug(tenant_id)
LOG.info('Resuming management of tenant %s', tenant_id)
except KeyError:
pass
elif instructions['command'] == commands.GLOBAL_DEBUG:
enable = instructions.get('enabled')
reason = instructions.get('reason')
if enable == 1:
LOG.info('Enabling global debug mode (reason: %s)', reason)
self.db_api.enable_global_debug(reason)
elif enable == 0:
LOG.info('Disabling global debug mode')
self.db_api.disable_global_debug()
else:
LOG.warning('Unrecognized global debug command: %s',
instructions)
elif instructions['command'] == commands.CONFIG_RELOAD:
try:
cfg.CONF()
@ -326,36 +364,24 @@ class Worker(object):
cfg.CONF.log_opt_values(LOG, INFO)
else:
LOG.warning('unrecognized command: %s', instructions)
def _get_routers_to_ignore(self):
ignores = set()
try:
if self._ignore_directory:
ignores = set(os.listdir(self._ignore_directory))
except OSError:
pass
return ignores
LOG.warning('Unrecognized command: %s', instructions)
def _deliver_message(self, target, message):
if target in self._debug_tenants:
LOG.info(
'Ignoring message intended for tenant %s: %s',
target, message,
)
return
LOG.debug('preparing to deliver %r to %r', message, target)
routers_to_ignore = self._debug_routers.union(
self._get_routers_to_ignore()
)
trms = self._get_trms(target)
for trm in trms:
sms = trm.get_state_machines(message, self._context)
for sm in sms:
if sm.router_id in routers_to_ignore:
# NOTE(adam_g): We dont necessarily know the router_id
# till the sm has been created. this check should move to
# _should_process() once thats changed.
should_ignore, reason = self.db_api.router_in_debug(
sm.router_id)
if should_ignore:
LOG.info(
'Ignoring message intended for %s: %s',
sm.router_id, message,
'Ignoring message intended for router %s in '
'debug mode (reason: %s): %s',
sm.router_id, reason, message,
)
continue
# Add the message to the state machine's inbox. If
@ -402,16 +428,16 @@ class Worker(object):
'alive' if thread.isAlive() else 'DEAD',
self._thread_status.get(thread.name, 'UNKNOWN'),
)
for tid in sorted(self._debug_tenants):
LOG.info('Debugging tenant: %s', tid)
if not self._debug_tenants:
debug_tenants = self.db_api.tenants_in_debug()
if debug_tenants:
for t_uuid, reason in debug_tenants:
LOG.info('Debugging tenant: %s (reason: %s)', t_uuid, reason)
else:
LOG.info('No tenants in debug mode')
for rid in sorted(self._debug_routers):
LOG.info('Debugging router: %s', rid)
if not self._debug_routers:
debug_routers = self.db_api.routers_in_debug()
if self.db_api.routers_in_debug():
for r_uuid, reason in debug_routers:
LOG.info('Debugging router: %s (reason: %s)', r_uuid, reason)
else:
LOG.info('No routers in debug mode')
ignored_routers = sorted(self._get_routers_to_ignore())
for rid in ignored_routers:
LOG.info('Ignoring router: %s', rid)
if not ignored_routers:
LOG.info('No routers being ignored')

View File

@ -82,6 +82,8 @@ function configure_akanda() {
iniset $AKANDA_RUG_CONF DEFAULT router_ssh_public_key $AKANDA_APPLIANCE_SSH_PUBLIC_KEY
iniset $AKANDA_RUG_CONF database connection `database_connection_url akanda`
if [ "$LOG_COLOR" == "True" ] && [ "$SYSLOG" == "False" ]; then
colorize_logging
fi
@ -155,7 +157,11 @@ function _remove_subnets() {
}
function pre_start_akanda() {
typeset auth_args="--os-username $Q_ADMIN_USERNAME --os-password $SERVICE_PASSWORD --os-tenant-name $SERVICE_TENANT_NAME --os-auth-url $OS_AUTH_URL"
# Create and init the database
recreate_database akanda
akanda-rug-dbsync --config-file $AKANDA_RUG_CONF upgrade
typeset auth_args="--os-username $Q_ADMIN_USERNAME --os-password $SERVICE_PASSWORD --os-tenant-name $SERVICE_TENANT_NAME --os-auth-url $OS_AUTH_URL"
if ! neutron net-show $PUBLIC_NETWORK_NAME; then
neutron $auth_args net-create $PUBLIC_NETWORK_NAME --router:external
fi

View File

@ -1,12 +1,14 @@
# The order of packages is significant, because pip processes them in the order
# of appearance. Changing the order has an impact on the overall integration
# process, which may cause wedges in the gate later.
alembic>=0.7.2
eventlet>=0.17.4
netaddr>=0.7.12
httplib2>=0.7.5
python-neutronclient<3,>=2.3.11
oslo.config>=1.11.0 # Apache-2.0
oslo.context>=0.2.0 # Apache-2.0
oslo.db>=1.10.0 # Apache-2.0
oslo.i18n>=1.5.0 # Apache-2.0
oslo.log>=1.2.0 # Apache-2.0
oslo.messaging!=1.12.0,>=1.8.0 # Apache-2.0

View File

@ -34,6 +34,7 @@ setup-hooks =
console_scripts =
akanda-rug-service=akanda.rug.main:main
akanda-debug-router=akanda.rug.debug:debug_one_router
akanda-rug-dbsync = akanda.rug.db.sqlalchemy.dbsync:main
rug-ctl=akanda.rug.cli.main:main
akanda.rug.cli =
config reload=akanda.rug.cli.config:ConfigReload
@ -44,6 +45,7 @@ akanda.rug.cli =
tenant debug=akanda.rug.cli.tenant:TenantDebug
tenant manage=akanda.rug.cli.tenant:TenantManage
workers debug=akanda.rug.cli.worker:WorkerDebug
global debug=akanda.rug.cli.global_debug:GlobalDebug
browse=akanda.rug.cli.browse:BrowseRouters
poll=akanda.rug.cli.poll:Poll
ssh=akanda.rug.cli.router:RouterSSH