diff --git a/fenix/cmd/engine.py b/fenix/cmd/engine.py index 07ac42f..f340820 100644 --- a/fenix/cmd/engine.py +++ b/fenix/cmd/engine.py @@ -21,6 +21,7 @@ import sys from oslo_config import cfg from oslo_service import service +from fenix.db import api as db_api from fenix.engine import service as engine_service from fenix.utils import service as service_utils @@ -28,7 +29,7 @@ from fenix.utils import service as service_utils def main(): cfg.CONF(project='fenix', prog='fenix-engine') service_utils.prepare_service(sys.argv) - + db_api.setup_db() service.launch( cfg.CONF, engine_service.EngineService() diff --git a/fenix/db/__init__.py b/fenix/db/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/fenix/db/api.py b/fenix/db/api.py new file mode 100644 index 0000000..88bdd9c --- /dev/null +++ b/fenix/db/api.py @@ -0,0 +1,168 @@ +# Copyright (c) 2013 Mirantis Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Defines interface for DB access. + +Functions in this module are imported into the fenix.db namespace. Call these +functions from fenix.db namespace, not the fenix.db.api namespace. + +All functions in this module return objects that implement a dictionary-like +interface. + +**Related Flags** + +:db_backend: string to lookup in the list of LazyPluggable backends. + `sqlalchemy` is the only supported backend right now. + +:sql_connection: string specifying the sqlalchemy connection to use, like: + `sqlite:///var/lib/fenix/fenix.sqlite`. + +""" + +from oslo_config import cfg +from oslo_db import api as db_api +from oslo_db import options as db_options +from oslo_log import log as logging + + +_BACKEND_MAPPING = { + 'sqlalchemy': 'fenix.db.sqlalchemy.api', +} + +db_options.set_defaults(cfg.CONF) +IMPL = db_api.DBAPI(cfg.CONF.database.backend, + backend_mapping=_BACKEND_MAPPING) +LOG = logging.getLogger(__name__) + + +def get_instance(): + """Return a DB API instance.""" + return IMPL + + +def setup_db(): + """Set up database, create tables, etc. + + Return True on success, False otherwise + """ + return IMPL.setup_db() + + +def drop_db(): + """Drop database. + + Return True on success, False otherwise + """ + return IMPL.drop_db() + + +# Helpers for building constraints / equality checks + + +def constraint(**conditions): + """Return a constraint object suitable for use with some updates.""" + return IMPL.constraint(**conditions) + + +def equal_any(*values): + """Return an equality condition object suitable for use in a constraint. + + Equal_any conditions require that a model object's attribute equal any + one of the given values. + """ + return IMPL.equal_any(*values) + + +def not_equal(*values): + """Return an inequality condition object suitable for use in a constraint. + + Not_equal conditions require that a model object's attribute differs from + all of the given values. + """ + return IMPL.not_equal(*values) + + +def to_dict(func): + def decorator(*args, **kwargs): + res = func(*args, **kwargs) + + if isinstance(res, list): + return [item.to_dict() for item in res] + + if res: + return res.to_dict() + else: + return None + + return decorator + + +# Fenix workflow session DB access +def create_session(values): + """Create a session from the values.""" + return IMPL.create_session(values) + + +def remove_session(session_id): + """Remove a session from the tables.""" + return IMPL.remove_session(session_id) + + +def create_action(values): + """Create a action from the values.""" + return IMPL.create_action(values) + + +def create_host(values): + """Create a host from the values.""" + return IMPL.create_host(values) + + +def create_hosts(session_id, hostnames): + hosts = [] + for hostname in hostnames: + host = { + 'session_id': session_id, + 'hostname': str(hostname), + 'type': None, + 'maintained': False, + 'disabled': False} + hosts.append(host) + return IMPL.create_hosts(hosts) + + +def create_projects(session_id, project_ids): + projects = [] + for project_id in project_ids: + project = { + 'session_id': session_id, + 'project_id': str(project_id), + 'state': None} + projects.append(project) + return IMPL.create_projects(projects) + + +def create_instance(values): + """Create a instance from the values.""" + return IMPL.create_instance(values) + + +def create_instances(instances): + """Create a instances from the instances dictionary list.""" + return IMPL.create_instances(instances) + + +def remove_instance(session_id, instance_id): + return IMPL.remove_instance(session_id, instance_id) diff --git a/fenix/db/base.py b/fenix/db/base.py new file mode 100644 index 0000000..125c046 --- /dev/null +++ b/fenix/db/base.py @@ -0,0 +1,35 @@ +# Copyright (c) 2013 Mirantis Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Base class for classes that need modular database access.""" + +from oslo_config import cfg +from oslo_utils import importutils + + +db_driver_opts = [ + cfg.StrOpt('db_driver', default='fenix.db', + help='Driver to use for database access') +] + +CONF = cfg.CONF +CONF.register_opts(db_driver_opts) + + +class Base(object): + """DB driver is injected in the init method.""" + + def __init__(self): + self.db = importutils.import_module(CONF.db_driver) diff --git a/fenix/db/exceptions.py b/fenix/db/exceptions.py new file mode 100644 index 0000000..0dbae2f --- /dev/null +++ b/fenix/db/exceptions.py @@ -0,0 +1,41 @@ +# Copyright (c) 2014 Intel Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from oslo_log import log as logging + +from fenix import exceptions + + +LOG = logging.getLogger(__name__) + + +class FenixDBException(exceptions.FenixException): + msg_fmt = 'An unknown database exception occurred' + + +class FenixDBDuplicateEntry(FenixDBException): + msg_fmt = 'Duplicate entry for %(columns)s in %(model)s model was found' + + +class FenixDBNotFound(FenixDBException): + msg_fmt = '%(id)s %(model)s was not found' + + +class FenixDBInvalidFilter(FenixDBException): + msg_fmt = '%(query_filter)s is invalid' + + +class FenixDBInvalidFilterOperator(FenixDBException): + msg_fmt = '%(filter_operator)s is invalid' diff --git a/fenix/db/migration/README b/fenix/db/migration/README new file mode 100644 index 0000000..9ea16ce --- /dev/null +++ b/fenix/db/migration/README @@ -0,0 +1,75 @@ +# Copyright 2012 New Dream Network, LLC (DreamHost) +# Copyright 2014 Intel Corporation +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +Fenix project uses Alembic to handle database migrations. A migration occurs +by executing a script that details the changes needed to upgrade/downgrade +the database. The migration scripts are ordered so that multiple scripts +can run sequentially to update the database. + +You can then upgrade to the latest database version via: +$ fenix-db-manage --config-file /path/to/fenix.conf upgrade head + +To check the current database version: +$ fenix-db-manage --config-file /path/to/fenix.conf current + +To create a script to run the migration offline: +$ fenix-db-manage --config-file /path/to/fenix.conf upgrade head --sql + +To run the offline migration between specific migration versions: +$ fenix-db-manage --config-file /path/to/fenix.conf upgrade \ +: --sql + +Upgrade the database incrementally: +$ fenix-db-manage --config-file /path/to/fenix.conf \ +upgrade --delta <# of revs> + +Downgrade the database by a certain number of revisions: +$ fenix-db-manage --config-file /path/to/fenix.conf downgrade \ +--delta <# of revs> + + +DEVELOPERS: +A database migration script is required when you submit a change to Fenix +that alters the database model definition. The migration script is a special +python file that includes code to update/downgrade the database to match the +changes in the model definition. Alembic will execute these scripts in order to +provide a linear migration path between revision. The fenix-db-manage command +can be used to generate migration template for you to complete. The operations +in the template are those supported by the Alembic migration library. +After you modified the Fenix models accordingly, you can create the revision. + +$ fenix-db-manage --config-file /path/to/fenix.conf revision \ +-m "description of revision" \ +--autogenerate + +This generates a prepopulated template with the changes needed to match the +database state with the models. You should inspect the autogenerated template +to ensure that the proper models have been altered. + +In rare circumstances, you may want to start with an empty migration template +and manually author the changes necessary for an upgrade/downgrade. You can +create a blank file via: + +$ fenix-db-manage --config-file /path/to/fenix.conf revision \ +-m "description of revision" + +The migration timeline should remain linear so that there is a clear path when +upgrading/downgrading. To verify that the timeline does branch, you can run +this command: +$ fenix-db-manage --config-file /path/to/fenix.conf check_migration + +If the migration path does branch, you can find the branch point via: +$ fenix-db-manage --config-file /path/to/fenix.conf history diff --git a/fenix/db/migration/__init__.py b/fenix/db/migration/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/fenix/db/migration/alembic.ini b/fenix/db/migration/alembic.ini new file mode 100644 index 0000000..e6ba095 --- /dev/null +++ b/fenix/db/migration/alembic.ini @@ -0,0 +1,53 @@ +# A generic, single database configuration. + +[alembic] +# path to migration scripts +script_location = %(here)s/alembic_migrations + +# template used to generate migration files +# file_template = %%(rev)s_%%(slug)s + +# set to 'true' to run the environment during +# the 'revision' command, regardless of autogenerate +# revision_environment = false + +# default to an empty string because the Fenix migration cli will +# extract the correct value and set it programatically before alembic is fully +# invoked. +sqlalchemy.url = + + +# Logging configuration +[loggers] +keys = root,sqlalchemy,alembic + +[handlers] +keys = console + +[formatters] +keys = generic + +[logger_root] +level = WARN +handlers = console +qualname = + +[logger_sqlalchemy] +level = WARN +handlers = +qualname = sqlalchemy.engine + +[logger_alembic] +level = INFO +handlers = +qualname = alembic + +[handler_console] +class = StreamHandler +args = (sys.stderr,) +level = NOTSET +formatter = generic + +[formatter_generic] +format = %(levelname)-5.5s [%(name)s] %(message)s +datefmt = %H:%M:%S diff --git a/fenix/db/migration/alembic_migrations/env.py b/fenix/db/migration/alembic_migrations/env.py new file mode 100644 index 0000000..e39979d --- /dev/null +++ b/fenix/db/migration/alembic_migrations/env.py @@ -0,0 +1,84 @@ +# Copyright 2014 Intel Corporation +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from alembic import context +from sqlalchemy import create_engine, pool +from logging import config as log_config + +from fenix.db.sqlalchemy import model_base +from fenix.db.sqlalchemy import models # noqa + +# this is the Alembic Config object, which provides +# access to the values within the .ini file in use. +config = context.config + +# Interpret the config file for Python logging. +# This line sets up loggers basically. +log_config.fileConfig(config.config_file_name) + +# add your model's MetaData object here +# for 'autogenerate' support +# from myapp import mymodel +# target_metadata = mymodel.Base.metadata +target_metadata = model_base.FenixBase.metadata + +# other values from the config, defined by the needs of env.py, +# can be acquired: +# my_important_option = config.get_main_option("my_important_option") +# ... etc. + + +def run_migrations_offline(config): + """Run migrations in 'offline' mode. + + This configures the context with just a URL + and not an Engine, though an Engine is acceptable + here as well. By skipping the Engine creation + we don't even need a DBAPI to be available. + + Calls to context.execute() here emit the given string to the + script output. + + """ + url = config.database.connection + context.configure(url=url) + + with context.begin_transaction(): + context.run_migrations() + + +def run_migrations_online(config): + """Run migrations in 'online' mode. + + In this scenario we need to create an Engine + and associate a connection with the context. + + """ + engine = create_engine(config.database.connection, + poolclass=pool.NullPool) + connection = engine.connect() + context.configure(connection=connection, + target_metadata=target_metadata) + + try: + with context.begin_transaction(): + context.run_migrations() + finally: + connection.close() + +if context.is_offline_mode(): + run_migrations_offline(config.fenix_config) +else: + run_migrations_online(config.fenix_config) diff --git a/fenix/db/migration/alembic_migrations/script.py.mako b/fenix/db/migration/alembic_migrations/script.py.mako new file mode 100644 index 0000000..54bb794 --- /dev/null +++ b/fenix/db/migration/alembic_migrations/script.py.mako @@ -0,0 +1,37 @@ +# Copyright ${create_date.year} OpenStack Foundation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""${message} + +Revision ID: ${up_revision} +Revises: ${down_revision} +Create Date: ${create_date} + +""" + +# revision identifiers, used by Alembic. +revision = ${repr(up_revision)} +down_revision = ${repr(down_revision)} + +from alembic import op +import sqlalchemy as sa +${imports if imports else ""} + +def upgrade(): + ${upgrades if upgrades else "pass"} + + +def downgrade(): + ${downgrades if downgrades else "pass"} diff --git a/fenix/db/migration/alembic_migrations/versions/001_initial.py b/fenix/db/migration/alembic_migrations/versions/001_initial.py new file mode 100644 index 0000000..59b87f5 --- /dev/null +++ b/fenix/db/migration/alembic_migrations/versions/001_initial.py @@ -0,0 +1,120 @@ +# Copyright 2014 OpenStack Foundation. +# Copyright 2014 Intel Corporation +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +revision = '001' +down_revision = None + +import uuid + +from alembic import op +import six +import sqlalchemy as sa +from sqlalchemy.dialects.mysql import MEDIUMTEXT + + +def _generate_unicode_uuid(): + return six.text_type(str(uuid.uuid4())) + + +def MediumText(): + return sa.Text().with_variant(MEDIUMTEXT(), 'mysql') + + +def upgrade(): + op.create_table( + 'sessions', + sa.Column('created_at', sa.DateTime(), nullable=True), + sa.Column('updated_at', sa.DateTime(), nullable=True), + sa.Column('session_id', sa.String(36), primary_key=True), + sa.Column('state', sa.String(length=32), nullable=True), + sa.Column('maintenance_at', sa.DateTime(), nullable=True), + sa.Column('meta', MediumText(), nullable=True), + sa.Column('workflow', sa.String(length=255), nullable=True), + sa.PrimaryKeyConstraint('session_id')) + + op.create_table( + 'hosts', + sa.Column('created_at', sa.DateTime(), nullable=True), + sa.Column('updated_at', sa.DateTime(), nullable=True), + sa.Column('id', sa.String(36), primary_key=True, + default=_generate_unicode_uuid), + sa.Column('session_id', sa.String(36), + sa.ForeignKey('sessions.session_id')), + sa.Column('hostname', sa.String(length=255), nullable=False), + sa.Column('type', sa.String(length=32), nullable=True), + sa.Column('maintained', sa.Boolean, default=False), + sa.Column('disabled', sa.Boolean, default=False), + sa.UniqueConstraint('session_id', 'hostname', name='_session_host_uc'), + sa.PrimaryKeyConstraint('id')) + + op.create_table( + 'projects', + sa.Column('created_at', sa.DateTime(), nullable=True), + sa.Column('updated_at', sa.DateTime(), nullable=True), + sa.Column('id', sa.String(36), primary_key=True, + default=_generate_unicode_uuid), + sa.Column('session_id', sa.String(36), + sa.ForeignKey('sessions.session_id')), + sa.Column('project_id', sa.String(36), nullable=True,), + sa.Column('state', sa.String(length=37), nullable=True), + sa.UniqueConstraint('session_id', 'project_id', + name='_session_project_uc'), + sa.PrimaryKeyConstraint('id')) + + op.create_table( + 'instances', + sa.Column('created_at', sa.DateTime(), nullable=True), + sa.Column('updated_at', sa.DateTime(), nullable=True), + sa.Column('id', sa.String(36), primary_key=True, + default=_generate_unicode_uuid), + sa.Column('session_id', sa.String(36), + sa.ForeignKey('sessions.session_id')), + sa.Column('instance_id', sa.String(36), nullable=True, + primary_key=True), + sa.Column('action', sa.String(32), nullable=True), + sa.Column('project_id', sa.String(36), nullable=True), + sa.Column('instance_id', sa.String(36), nullable=True), + sa.Column('project_state', sa.String(length=37), nullable=True), + sa.Column('state', sa.String(length=16), nullable=True), + sa.Column('instance_name', sa.String(length=255), nullable=False), + sa.Column('action_done', sa.Boolean, default=False), + sa.Column('details', sa.String(255), nullable=True), + sa.Column('host', sa.String(length=255), nullable=False), + sa.UniqueConstraint('session_id', 'instance_id', + name='_session_instance_uc'), + sa.PrimaryKeyConstraint('id')) + + op.create_table( + 'action_plugins', + sa.Column('created_at', sa.DateTime(), nullable=True), + sa.Column('updated_at', sa.DateTime(), nullable=True), + sa.Column('id', sa.String(36), primary_key=True, + default=_generate_unicode_uuid), + sa.Column('session_id', sa.String(36), + sa.ForeignKey('sessions.session_id')), + sa.Column('plugin', sa.String(length=255), nullable=False), + sa.Column('state', sa.String(length=32), nullable=True), + sa.Column('type', sa.String(length=32), nullable=True), + sa.Column('meta', MediumText(), nullable=False), + sa.UniqueConstraint('session_id', 'plugin', name='_session_plugin_uc'), + sa.PrimaryKeyConstraint('id')) + + +def downgrade(): + op.drop_table('sessions') + op.drop_table('hosts') + op.drop_table('projects') + op.drop_table('instances') + op.drop_table('action_plugins') diff --git a/fenix/db/migration/alembic_migrations/versions/README b/fenix/db/migration/alembic_migrations/versions/README new file mode 100644 index 0000000..1ffef61 --- /dev/null +++ b/fenix/db/migration/alembic_migrations/versions/README @@ -0,0 +1,3 @@ +This directory contains the migration scripts for the Fenix project. Please +see the README in fenix/db/migration on how to use and generate new +migrations. diff --git a/fenix/db/migration/cli.py b/fenix/db/migration/cli.py new file mode 100644 index 0000000..f867288 --- /dev/null +++ b/fenix/db/migration/cli.py @@ -0,0 +1,123 @@ +# Copyright (c) 2013 Mirantis Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +"""CLI tool to manage the Fenix DB. Inspired by Neutron's same tool.""" + +import gettext +import os + +from alembic import command as alembic_command +from alembic import config as alembic_config +from alembic import util as alembic_util +from oslo_config import cfg +from oslo_db import options as db_options + +gettext.install('fenix') +from fenix.i18n import _ + + +CONF = cfg.CONF + + +def do_alembic_command(config, cmd, *args, **kwargs): + try: + getattr(alembic_command, cmd)(config, *args, **kwargs) + except alembic_util.CommandError as e: + alembic_util.err(str(e)) + + +def do_check_migration(config, cmd): + do_alembic_command(config, 'branches') + + +def do_upgrade_downgrade(config, cmd): + if not CONF.command.revision and not CONF.command.delta: + raise SystemExit(_('You must provide a revision or relative delta')) + + revision = CONF.command.revision + + if CONF.command.delta: + sign = '+' if CONF.command.name == 'upgrade' else '-' + revision = sign + str(CONF.command.delta) + else: + revision = CONF.command.revision + + do_alembic_command(config, cmd, revision, sql=CONF.command.sql) + + +def do_stamp(config, cmd): + do_alembic_command(config, cmd, + CONF.command.revision, + sql=CONF.command.sql) + + +def do_revision(config, cmd): + do_alembic_command(config, cmd, + message=CONF.command.message, + autogenerate=CONF.command.autogenerate, + sql=CONF.command.sql) + + +def add_command_parsers(subparsers): + for name in ['current', 'history', 'branches']: + parser = subparsers.add_parser(name) + parser.set_defaults(func=do_alembic_command) + + parser = subparsers.add_parser('check_migration') + parser.set_defaults(func=do_check_migration) + + for name in ['upgrade', 'downgrade']: + parser = subparsers.add_parser(name) + parser.add_argument('--delta', type=int) + parser.add_argument('--sql', action='store_true') + parser.add_argument('revision', nargs='?') + parser.set_defaults(func=do_upgrade_downgrade) + + parser = subparsers.add_parser('stamp') + parser.add_argument('--sql', action='store_true') + parser.add_argument('revision') + parser.set_defaults(func=do_stamp) + + parser = subparsers.add_parser('revision') + parser.add_argument('-m', '--message') + parser.add_argument('--autogenerate', action='store_true') + parser.add_argument('--sql', action='store_true') + parser.set_defaults(func=do_revision) + + +command_opts = [ + cfg.SubCommandOpt('command', + title='Command', + help='Available commands', + handler=add_command_parsers) +] + +CONF.register_cli_opts(command_opts) + + +def main(): + config = alembic_config.Config( + os.path.join(os.path.dirname(__file__), 'alembic.ini') + ) + config.fenix_config = CONF + + CONF() + db_options.set_defaults(CONF) + if not CONF.database.connection: + raise SystemExit( + _("Provide a configuration file with DB connection information")) + + CONF.command.func(config, CONF.command.name) diff --git a/fenix/db/sqlalchemy/__init__.py b/fenix/db/sqlalchemy/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/fenix/db/sqlalchemy/api.py b/fenix/db/sqlalchemy/api.py new file mode 100644 index 0000000..67c2115 --- /dev/null +++ b/fenix/db/sqlalchemy/api.py @@ -0,0 +1,391 @@ +# Copyright (c) 2013 Mirantis Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Implementation of SQLAlchemy backend.""" + +import sys + +from oslo_config import cfg +from oslo_db import exception as common_db_exc +from oslo_db.sqlalchemy import session as db_session +from oslo_log import log as logging +import sqlalchemy as sa + +from fenix.db import exceptions as db_exc +from fenix.db.sqlalchemy import facade_wrapper +from fenix.db.sqlalchemy import models + + +LOG = logging.getLogger(__name__) + +get_engine = facade_wrapper.get_engine +get_session = facade_wrapper.get_session + + +def get_backend(): + """The backend is this module itself.""" + return sys.modules[__name__] + + +def model_query(model, session=None): + """Query helper. + + :param model: base model to query + """ + session = session or get_session() + + return session.query(model) + + +def setup_db(): + try: + engine = db_session.EngineFacade(cfg.CONF.database.connection, + sqlite_fk=True).get_engine() + models.MaintenanceSession.metadata.create_all(engine) + models.MaintenanceAction.metadata.create_all(engine) + models.MaintenanceHost.metadata.create_all(engine) + models.MaintenanceProject.metadata.create_all(engine) + models.MaintenanceInstance.metadata.create_all(engine) + except sa.exc.OperationalError as e: + LOG.error("Database registration exception: %s", e) + return False + return True + + +def drop_db(): + try: + engine = db_session.EngineFacade(cfg.CONF.database.connection, + sqlite_fk=True).get_engine() + models.Lease.metavalues.drop_all(engine) + except Exception as e: + LOG.error("Database shutdown exception: %s", e) + return False + return True + + +# Helpers for building constraints / equality checks + + +def constraint(**conditions): + return Constraint(conditions) + + +def equal_any(*values): + return EqualityCondition(values) + + +def not_equal(*values): + return InequalityCondition(values) + + +class Constraint(object): + def __init__(self, conditions): + self.conditions = conditions + + def apply(self, model, query): + for key, condition in self.conditions.items(): + for clause in condition.clauses(getattr(model, key)): + query = query.filter(clause) + return query + + +class EqualityCondition(object): + def __init__(self, values): + self.values = values + + def clauses(self, field): + return sa.or_([field == value for value in self.values]) + + +class InequalityCondition(object): + def __init__(self, values): + self.values = values + + def clauses(self, field): + return [field != value for value in self.values] + + +# Session +def _maintenance_session_get(session, session_id): + query = model_query(models.MaintenanceSession, session) + return query.filter_by(session_id=session_id).first() + + +def maintenance_session_get(session_id): + return _maintenance_session_get(get_session(), session_id) + + +def create_session(values): + values = values.copy() + msession = models.MaintenanceSession() + msession.update(values) + + session = get_session() + with session.begin(): + try: + msession.save(session=session) + except common_db_exc.DBDuplicateEntry as e: + # raise exception about duplicated columns (e.columns) + raise db_exc.FenixDBDuplicateEntry( + model=msession.__class__.__name__, columns=e.columns) + + return maintenance_session_get(msession.session_id) + + +def remove_session(session_id): + session = get_session() + with session.begin(): + + hosts = _hosts_get(session, session_id) + + if not hosts: + # raise not found error + raise db_exc.FenixDBNotFound(session, session_id=session_id, + model='hosts') + + for host in hosts: + session.delete(host) + + projects = _projects_get(session, session_id) + + if not projects: + # raise not found error + raise db_exc.FenixDBNotFound(session, session_id=session_id, + model='projects') + + for project in projects: + session.delete(project) + + instances = _instances_get(session, session_id) + + if not instances: + # raise not found error + raise db_exc.FenixDBNotFound(session, session_id=session_id, + model='instances') + + for instance in instances: + session.delete(instance) + + msession = _maintenance_session_get(session, session_id) + + if not msession: + # raise not found error + raise db_exc.FenixDBNotFound(session, session_id=session_id, + model='sessions') + + session.delete(msession) + # TBD Other tables content when implemented + + +# Action +def _action_get(session, session_id, plugin): + query = model_query(models.MaintenanceActions, session) + return query.filter_by(session_id=session_id, plugin=plugin).first() + + +def action_get(session_id, plugin): + return _action_get(get_session(), session_id, plugin) + + +def create_action(values): + values = values.copy() + maction = models.MaintenanceActions() + maction.update(values) + + session = get_session() + with session.begin(): + try: + maction.save(session=session) + except common_db_exc.DBDuplicateEntry as e: + # raise exception about duplicated columns (e.columns) + raise db_exc.FenixDBDuplicateEntry( + model=maction.__class__.__name__, columns=e.columns) + + return action_get(maction.session_id, maction.plugin) + + +# Host +def _host_get(session, session_id, hostname): + query = model_query(models.MaintenanceHost, session) + return query.filter_by(session_id=session_id, hostname=hostname).first() + + +def host_get(session_id, hostname): + return _host_get(get_session(), session_id, hostname) + + +def _hosts_get(session, session_id): + query = model_query(models.MaintenanceHost, session) + return query.filter_by(session_id=session_id).all() + + +def hosts_get(session_id): + return _hosts_get(get_session(), session_id) + + +def create_host(values): + values = values.copy() + mhost = models.MaintenanceHost() + mhost.update(values) + + session = get_session() + with session.begin(): + try: + mhost.save(session=session) + except common_db_exc.DBDuplicateEntry as e: + # raise exception about duplicated columns (e.columns) + raise db_exc.FenixDBDuplicateEntry( + model=mhost.__class__.__name__, columns=e.columns) + + return host_get(mhost.session_id, mhost.hostname) + + +def create_hosts(values_list): + for values in values_list: + vals = values.copy() + session = get_session() + with session.begin(): + mhost = models.MaintenanceHost() + mhost.update(vals) + try: + mhost.save(session=session) + except common_db_exc.DBDuplicateEntry as e: + # raise exception about duplicated columns (e.columns) + raise db_exc.FenixDBDuplicateEntry( + model=mhost.__class__.__name__, columns=e.columns) + + return hosts_get(mhost.session_id) + + +# Project +def _project_get(session, session_id, project_id): + query = model_query(models.MaintenanceProject, session) + return query.filter_by(session_id=session_id, + project_id=project_id).first() + + +def project_get(session_id, project_id): + return _project_get(get_session(), session_id, project_id) + + +def _projects_get(session, session_id): + query = model_query(models.MaintenanceProject, session) + return query.filter_by(session_id=session_id).all() + + +def projects_get(session_id): + return _projects_get(get_session(), session_id) + + +def create_project(values): + values = values.copy() + mproject = models.MaintenanceProject() + mproject.update(values) + + session = get_session() + with session.begin(): + try: + mproject.save(session=session) + except common_db_exc.DBDuplicateEntry as e: + # raise exception about duplicated columns (e.columns) + raise db_exc.FenixDBDuplicateEntry( + model=mproject.__class__.__name__, columns=e.columns) + + return project_get(mproject.session_id, mproject.project_id) + + +def create_projects(values_list): + for values in values_list: + vals = values.copy() + session = get_session() + with session.begin(): + mproject = models.MaintenanceProject() + mproject.update(vals) + try: + mproject.save(session=session) + except common_db_exc.DBDuplicateEntry as e: + # raise exception about duplicated columns (e.columns) + raise db_exc.FenixDBDuplicateEntry( + model=mproject.__class__.__name__, columns=e.columns) + + return projects_get(mproject.session_id) + + +# Instance +def _instance_get(session, session_id, instance_id): + query = model_query(models.MaintenanceInstance, session) + return query.filter_by(session_id=session_id, + instance_id=instance_id).first() + + +def instance_get(session_id, instance_id): + return _instance_get(get_session(), session_id, instance_id) + + +def _instances_get(session, session_id): + query = model_query(models.MaintenanceInstance, session) + return query.filter_by(session_id=session_id).all() + + +def instances_get(session_id): + return _instances_get(get_session(), session_id) + + +def create_instance(values): + values = values.copy() + minstance = models.MaintenanceInstance() + minstance.update(values) + + session = get_session() + with session.begin(): + try: + minstance.save(session=session) + except common_db_exc.DBDuplicateEntry as e: + # raise exception about duplicated columns (e.columns) + raise db_exc.FenixDBDuplicateEntry( + model=minstance.__class__.__name__, columns=e.columns) + + return instance_get(minstance.session_id, minstance.instance_id) + + +def create_instances(values_list): + for values in values_list: + vals = values.copy() + session = get_session() + with session.begin(): + minstance = models.MaintenanceInstance() + minstance.update(vals) + try: + minstance.save(session=session) + except common_db_exc.DBDuplicateEntry as e: + # raise exception about duplicated columns (e.columns) + raise db_exc.FenixDBDuplicateEntry( + model=minstance.__class__.__name__, columns=e.columns) + + return instances_get(minstance.session_id) + + +def remove_instance(session_id, instance_id): + session = get_session() + with session.begin(): + + minstance = _instance_get(session, session_id, instance_id) + + if not minstance: + # raise not found error + raise db_exc.FenixDBNotFound(session, session_id=session_id, + model='sessions') + + session.delete(minstance) diff --git a/fenix/db/sqlalchemy/facade_wrapper.py b/fenix/db/sqlalchemy/facade_wrapper.py new file mode 100644 index 0000000..4887f6e --- /dev/null +++ b/fenix/db/sqlalchemy/facade_wrapper.py @@ -0,0 +1,43 @@ +# Copyright 2014 Intel Corporation +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from oslo_config import cfg +from oslo_db.sqlalchemy import session as db_session + + +CONF = cfg.CONF + +_engine_facade = None + + +def get_session(): + return _get_facade().get_session() + + +def get_engine(): + return _get_facade().get_engine() + + +def _clear_engine(): + global _engine_facade + _engine_facade = None + + +def _get_facade(): + global _engine_facade + if not _engine_facade: + _engine_facade = db_session.EngineFacade.from_config(CONF) + + return _engine_facade diff --git a/fenix/db/sqlalchemy/model_base.py b/fenix/db/sqlalchemy/model_base.py new file mode 100644 index 0000000..7afd9c4 --- /dev/null +++ b/fenix/db/sqlalchemy/model_base.py @@ -0,0 +1,51 @@ +# Copyright (c) 2013 Mirantis Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from oslo_db.sqlalchemy import models +from sqlalchemy.ext import declarative +from sqlalchemy.orm import attributes + + +class _FenixBase(models.ModelBase, models.TimestampMixin): + """Base class for all Fenix SQLAlchemy DB Models.""" + + def to_dict(self, include=None): + """sqlalchemy based automatic to_dict method.""" + d = {} + + # if a column is unloaded at this point, it is + # probably deferred. We do not want to access it + # here and thereby cause it to load... + unloaded = attributes.instance_state(self).unloaded + + columns = self.__table__.columns + if include: + columns = [col for col in columns if col.name in include] + + for col in columns: + if col.name not in unloaded: + d[col.name] = getattr(self, col.name) + + datetime_to_str(d, 'created_at') + datetime_to_str(d, 'updated_at') + + return d + + +def datetime_to_str(dct, attr_name): + if dct.get(attr_name) is not None: + dct[attr_name] = dct[attr_name].isoformat(' ') + +FenixBase = declarative.declarative_base(cls=_FenixBase) diff --git a/fenix/db/sqlalchemy/models.py b/fenix/db/sqlalchemy/models.py new file mode 100644 index 0000000..b6575e7 --- /dev/null +++ b/fenix/db/sqlalchemy/models.py @@ -0,0 +1,123 @@ +# Copyright (c) 2013 Mirantis Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from oslo_utils import uuidutils +import six +import sqlalchemy as sa +from sqlalchemy.dialects.mysql import MEDIUMTEXT + +from fenix.db.sqlalchemy import model_base as mb + +# Helpers + + +def _generate_unicode_uuid(): + return six.text_type(uuidutils.generate_uuid()) + + +def MediumText(): + return sa.Text().with_variant(MEDIUMTEXT(), 'mysql') + + +def _id_column(): + return sa.Column(sa.String(36), + primary_key=True, + default=_generate_unicode_uuid) + + +class MaintenanceSession(mb.FenixBase): + """Maintenance session""" + + __tablename__ = 'sessions' + + session_id = sa.Column(sa.String(36), primary_key=True) + state = sa.Column(sa.String(length=32), nullable=True) + maintenance_at = sa.Column(sa.DateTime(), nullable=True) + meta = sa.Column(MediumText(), nullable=False) + workflow = sa.Column(sa.String(length=255), nullable=True) + + def to_dict(self): + return super(MaintenanceSession, self).to_dict() + + +class MaintenanceAction(mb.FenixBase): + """Maintenance action""" + + __tablename__ = 'actions' + + id = _id_column() + session_id = sa.Column(sa.String(36), sa.ForeignKey('sessions.session_id'), + nullable=False) + plugin = sa.Column(sa.String(length=255), nullable=False) + state = sa.Column(sa.String(length=32), nullable=True) + type = sa.Column(sa.String(length=32), nullable=True) + meta = sa.Column(MediumText(), nullable=False) + + def to_dict(self): + return super(MaintenanceAction, self).to_dict() + + +class MaintenanceHost(mb.FenixBase): + """Maintenance host""" + + __tablename__ = 'hosts' + + id = _id_column() + session_id = sa.Column(sa.String(36), sa.ForeignKey('sessions.session_id'), + nullable=False) + hostname = sa.Column(sa.String(length=255), primary_key=True) + type = sa.Column(sa.String(length=32), nullable=True) + maintained = sa.Column(sa.Boolean, default=False) + + def to_dict(self): + return super(MaintenanceHost, self).to_dict() + + +class MaintenanceProject(mb.FenixBase): + """Maintenance project""" + + __tablename__ = 'projects' + + id = _id_column() + session_id = sa.Column(sa.String(36), sa.ForeignKey('sessions.session_id'), + nullable=False) + project_id = sa.Column(sa.String(length=36), primary_key=True) + state = sa.Column(sa.String(length=37), nullable=True) + + def to_dict(self): + return super(MaintenanceProject, self).to_dict() + + +class MaintenanceInstance(mb.FenixBase): + """Maintenance instance""" + + __tablename__ = 'instances' + + id = _id_column() + session_id = sa.Column(sa.String(36), sa.ForeignKey('sessions.session_id'), + nullable=False) + instance_id = sa.Column(sa.String(length=36), primary_key=True) + state = sa.Column(sa.String(length=16), nullable=True) + action = sa.Column(sa.String(32), nullable=True) + project_id = sa.Column(sa.String(36), nullable=True) + project_state = sa.Column(sa.String(length=37), nullable=True) + instance_name = sa.Column(sa.String(length=255), nullable=False) + action_done = sa.Column(sa.Boolean, default=False) + host = sa.Column(sa.String(length=255), nullable=False) + details = sa.Column(sa.String(length=255), nullable=True) + + def to_dict(self): + return super(MaintenanceInstance, self).to_dict() diff --git a/fenix/db/sqlalchemy/types.py b/fenix/db/sqlalchemy/types.py new file mode 100644 index 0000000..65a11e9 --- /dev/null +++ b/fenix/db/sqlalchemy/types.py @@ -0,0 +1,33 @@ +# Copyright (c) 2013 Mirantis Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from oslo_serialization import jsonutils +import sqlalchemy as sa + + +class JsonEncoded(sa.TypeDecorator): + """Represents an immutable structure as a json-encoded string.""" + + impl = sa.Text + + def process_bind_param(self, value, dialect): + if value is not None: + value = jsonutils.dump_as_bytes(value) + return value + + def process_result_value(self, value, dialect): + if value is not None: + value = jsonutils.loads(value) + return value diff --git a/fenix/db/sqlalchemy/utils.py b/fenix/db/sqlalchemy/utils.py new file mode 100644 index 0000000..1e9b337 --- /dev/null +++ b/fenix/db/sqlalchemy/utils.py @@ -0,0 +1,24 @@ +# Copyright (c) 2018 OpenStack Foundation. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +import sys + +from fenix.db.sqlalchemy import facade_wrapper + +get_session = facade_wrapper.get_session + + +def get_backend(): + """The backend is this module itself.""" + return sys.modules[__name__] diff --git a/fenix/exceptions.py b/fenix/exceptions.py new file mode 100644 index 0000000..2ca4c04 --- /dev/null +++ b/fenix/exceptions.py @@ -0,0 +1,49 @@ +# Copyright (c) 2013 Mirantis Inc. +# Copyright (c) 2013 Bull. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from oslo_log import log as logging + +LOG = logging.getLogger(__name__) + + +class FenixException(Exception): + """Base Fenix Exception. + + To correctly use this class, inherit from it and define + a 'msg_fmt' and 'code' properties. + """ + msg_fmt = "An unknown exception occurred" + code = 500 + + def __init__(self, message=None, **kwargs): + self.kwargs = kwargs + + if 'code' not in self.kwargs: + self.kwargs['code'] = self.code + + if not message: + try: + message = self.msg_fmt % kwargs + except KeyError: + # kwargs doesn't match a variable in the message + # log the issue and the kwargs + LOG.exception('Exception in string format operation') + for name, value in kwargs.items(): + LOG.error("%(name)s: %(value)s", + {'name': name, 'value': value}) + + message = self.msg_fmt + + super(FenixException, self).__init__(message) diff --git a/fenix/utils/service.py b/fenix/utils/service.py index eb0b60e..253b825 100644 --- a/fenix/utils/service.py +++ b/fenix/utils/service.py @@ -114,7 +114,7 @@ class EngineEndpoint(object): return None LOG.info("EngineEndpoint: admin_get_session") return ({'session_id': session_id, 'state': - self.workflow_sessions[session_id].state}) + self.workflow_sessions[session_id].session.state}) def admin_delete_session(self, ctx, session_id): """Delete maintenance workflow session thread""" @@ -134,18 +134,18 @@ class EngineEndpoint(object): if not self._validate_session(session_id): return None LOG.info("EngineEndpoint: project_get_session") - instance_ids = (self.workflow_sessions[session_id].session_data. + instance_ids = (self.workflow_sessions[session_id]. state_instance_ids(project_id)) return {'instance_ids': instance_ids} def project_update_session(self, ctx, session_id, project_id, data): """Update maintenance workflow session project state""" LOG.info("EngineEndpoint: project_update_session") - session_data = self.workflow_sessions[session_id].session_data - project = session_data.project(project_id) + session_obj = self.workflow_sessions[session_id] + project = session_obj.project(project_id) project.state = data["state"] if 'instance_actions' in data: - session_data.proj_instance_actions[project_id] = ( + session_obj.proj_instance_actions[project_id] = ( data['instance_actions'].copy()) return data diff --git a/fenix/utils/time.py b/fenix/utils/time.py index 9005ea8..155d1ce 100644 --- a/fenix/utils/time.py +++ b/fenix/utils/time.py @@ -22,6 +22,10 @@ def str_to_datetime(dt_str): return datetime.datetime(year, month, day, hours, minutes, seconds) +def datetime_to_str(dt): + return (dt.strftime('%Y-%m-%d %H:%M:%S')) + + def reply_time_str(wait): now = datetime.datetime.utcnow() reply = now - datetime.timedelta( diff --git a/fenix/workflow/workflow.py b/fenix/workflow/workflow.py index d8bc0db..49b206e 100644 --- a/fenix/workflow/workflow.py +++ b/fenix/workflow/workflow.py @@ -13,12 +13,16 @@ # License for the specific language governing permissions and limitations # under the License. import aodhclient.client as aodhclient +from ast import literal_eval +import collections from oslo_log import log as logging import oslo_messaging as messaging from oslo_service import threadgroup +import six from threading import Thread import time +from fenix.db import api as db_api from fenix.utils.identity_auth import get_identity_auth from fenix.utils.identity_auth import get_session @@ -26,140 +30,6 @@ from fenix.utils.identity_auth import get_session LOG = logging.getLogger(__name__) -class Instance(object): - - def __init__(self, project, instance_id, instance_name, host, ha=False): - self.project = project - self.instance_id = instance_id - self.instance_name = instance_name - self.host = host - self.ha = ha - - def __str__(self): - return "%s: %s" % (self.instance_id, self.instance_name) - - def is_on_host(self, host): - if self.host == host: - return True - else: - return False - - -class Project(object): - - def __init__(self, name): - self.name = name - self.state = None - self.state_instances = [] - - -class SessionData(object): - - def __init__(self, data, session_id): - self.session_id = session_id - self.projects = [] - self.hosts = data['hosts'] - self.maintenance_at = str(data['maintenance_at']) - self.metadata = data['metadata'] - self.instances = [] - self.maintained_hosts = [] - self.proj_instance_actions = {} - - def get_empty_hosts(self): - empty_hosts = list(self.hosts) - ([empty_hosts.remove(instance.host) for instance in - self.instances if instance.host in empty_hosts]) - return empty_hosts - - def add_instance(self, project, instance_id, instance_name, host, - ha=False): - if host not in self.hosts: - LOG.error('%s: instance %s in invalid host %s' % - (self.session_id, instance_id, host)) - if project not in self.project_names(): - self.projects.append(Project(project)) - self.instances.append(Instance(project, instance_id, instance_name, - host, ha)) - - def project(self, name): - return ([project for project in self.projects if - project.name == name][0]) - - def project_names(self): - return [project.name for project in self.projects] - - def set_projets_state(self, state): - for project in self.projects: - project.state = state - project.state_instances = [] - - def project_has_state_instances(self, name): - project = self.project(name) - if project.state_instances: - return True - else: - return False - - def set_projects_state_and_host_instances(self, state, host): - some_project_has_instances = False - for project in self.projects: - project.state = state - project.state_instances = ( - self.instance_ids_by_host_and_project(host, project.name)) - if project.state_instances: - some_project_has_instances = True - project.state = state - else: - project.state = None - if not some_project_has_instances: - LOG.error('%s: No project has instances on host %s' % - (self.session_id, host)) - - def get_projects_with_state(self): - return ([project for project in self.projects if project.state - is not None]) - - def state_instance_ids(self, name): - instances = ([project.state_instances for project in self.projects if - project.name == name][0]) - if not instances: - instances = self.instance_ids_by_project(name) - return instances - - def instances_by_project(self, project): - return [instance for instance in self.instances if - instance.project == project] - - def instance_ids_by_project(self, project): - return [instance.instance_id for instance in self.instances if - instance.project == project] - - def instance_ids_by_host_and_project(self, host, project): - return [instance.instance_id for instance in self.instances - if instance.host == host and - instance.project == project] - - def instances_by_host_and_project(self, host, project): - return [instance for instance in self.instances - if instance.host == host and - instance.project == project] - - def instance_action_by_project_reply(self, project, instance_id): - return self.proj_instance_actions[project][instance_id] - - def __str__(self): - info = 'Instance info:\n' - for host in self.hosts: - info += ('%s:\n' % host) - for project in self.project_names(): - instances = self.instances_by_host_and_project(host, project) - if instances: - info += (' %s:\n' % project) - for instance in instances: - info += (' %s\n' % instance) - return info - - class BaseWorkflow(Thread): def __init__(self, conf, session_id, data): @@ -169,8 +39,20 @@ class BaseWorkflow(Thread): self.stopped = False self.thg = threadgroup.ThreadGroup() self.timer = {} - self.state = 'MAINTENANCE' - self.session_data = SessionData(data, session_id) + self.session = self._init_session(data) + LOG.info('%s: session %s' % (self.session_id, self.session)) + if len(data['hosts']): + # Hosts given as input, not to be discovered in workflow + self.hosts = self.init_hosts(self.convert(data['hosts'])) + else: + self.hosts = [] + LOG.info('%s: hosts %s' % (self.session_id, self.hosts)) + # TBD API to support action plugins + # self.actions = + self.projects = [] + self.instances = [] + self.proj_instance_actions = {} + self.states_methods = {'MAINTENANCE': 'maintenance', 'SCALE_IN': 'scale_in', 'PREPARE_MAINTENANCE': 'prepare_maintenance', @@ -183,8 +65,8 @@ class BaseWorkflow(Thread): self.auth = get_identity_auth(conf.workflow_user, conf.workflow_password, conf.workflow_project) - self.session = get_session(auth=self.auth) - self.aodh = aodhclient.Client('2', self.session) + self.auth_session = get_session(auth=self.auth) + self.aodh = aodhclient.Client('2', self.auth_session) transport = messaging.get_transport(self.conf) self.notif_proj = messaging.Notifier(transport, 'maintenance.planned', @@ -197,6 +79,182 @@ class BaseWorkflow(Thread): topics=['notifications']) self.notif_admin = self.notif_admin.prepare(publisher_id='fenix') + def init_hosts(self, hostnames): + LOG.info('%s: init_hosts: %s' % (self.session_id, hostnames)) + return db_api.create_hosts(self.session_id, hostnames) + + def init_projects(self, project_ids): + LOG.info('%s: init_projects: %s' % (self.session_id, project_ids)) + return db_api.create_projects(self.session_id, project_ids) + + def convert(self, data): + if isinstance(data, six.string_types): + return str(data) + elif isinstance(data, collections.Mapping): + return dict(map(self.convert, data.iteritems())) + elif isinstance(data, collections.Iterable): + return type(data)(map(self.convert, data)) + else: + return data + + def _init_session(self, data): + session = { + 'session_id': self.session_id, + 'state': 'MAINTENANCE', + 'maintenance_at': str(data['maintenance_at']), + 'meta': str(self.convert(data['metadata'])), + 'workflow': self.convert((data['workflow']))} + LOG.info('%s: _init_session: %s' % (self.session_id, session)) + return db_api.create_session(session) + + def get_compute_hosts(self): + return [host.hostname for host in self.hosts + if host.type == 'compute'] + + def get_empty_computes(self): + all_computes = self.get_compute_hosts() + instance_computes = [] + for instance in self.instances: + if instance.host not in instance_computes: + instance_computes.append(instance.host) + return [host for host in all_computes if host not in instance_computes] + + def get_maintained_hosts(self): + return [host.hostname for host in self.hosts if host.maintained] + + def host_maintained(self, hostname): + host_obj = [host for host in self.hosts if + host.hostname == hostname][0] + host_obj.maintained = True + + def add_instance(self, instance): + return db_api.create_instance(instance) + + def add_instances(self, instances): + return db_api.create_instances(instances) + + def remove_instance(self, instance): + instance_id = instance.instance_id + self.instances.remove(instance) + db_api.remove_instance(self.session_id, instance_id) + + def project(self, project_id): + return ([project for project in self.projects if + project.project_id == project_id][0]) + + def project_names(self): + return [project.project_id for project in self.projects] + + def set_projets_state(self, state): + for project in self.projects: + project.state = state + for instance in self.instances: + instance.project_state = None + + def project_has_state_instances(self, project_id): + instances = ([instance.instance_id for instance in self.instances if + instance.project_id == project_id and + instance.project_state]) + if instances: + return True + else: + return False + + def set_projects_state_and_hosts_instances(self, state, hosts): + some_project_has_instances = False + for project in self.projects: + project.state = state + projects_instances = self.instances_by_project(project.project_id) + state_instances = False + for instance in projects_instances: + if instance.host in hosts: + state_instances = True + instance.project_state = state + else: + instance.project_state = None + if state_instances: + some_project_has_instances = True + project.state = state + else: + project.state = None + if not some_project_has_instances: + LOG.error('%s: No project has instances on hosts %s' % + (self.session_id, hosts)) + + def get_projects_with_state(self): + return ([project for project in self.projects if project.state + is not None]) + + def state_instance_ids(self, project_id): + project = self.project(project_id) + instances = ([instance.instance_id for instance in self.instances if + instance.project_id == project_id and + instance.project_state == project.state]) + if not instances: + instances = self.instance_ids_by_project(project_id) + return instances + + def instances_by_project(self, project): + return [instance for instance in self.instances if + instance.project_id == project] + + def instance_ids_by_project(self, project): + return [instance.instance_id for instance in self.instances if + instance.project_id == project] + + def instance_ids_by_host_and_project(self, host, project): + return [instance.instance_id for instance in self.instances + if instance.host == host and + instance.project_id == project] + + def instances_by_host_and_project(self, host, project): + return [instance for instance in self.instances + if instance.host == host and + instance.project_id == project] + + def instance_action_by_project_reply(self, project, instance_id): + return self.proj_instance_actions[project][instance_id] + + def instance_id_found(self, instance_id): + instance_ids = [instance.instance_id for instance in self.instances if + instance.instance_id == instance_id] + if instance_ids: + return True + else: + return False + + def instance_name_found(self, instance_name): + instance_ids = [instance.instance_id for instance in self.instances if + instance.instance_name == instance_name] + if instance_ids: + return True + else: + return False + + def instance_by_name(self, instance_name): + instance = [instance for instance in self.instances if + instance.instance_name == instance_name][0] + return instance + + def instance_by_id(self, instance_id): + instance = [instance for instance in self.instances if + instance.instance_id == instance_id][0] + return instance + + def __str__(self): + info = 'Instance info:\n' + for host in self.hosts: + info += ('%s:\n' % host.hostname) + for project in self.project_names(): + instance_ids = ( + self.instance_ids_by_host_and_project(host.hostname, + project)) + if instance_ids: + info += (' %s:\n' % project) + for instance_id in instance_ids: + info += (' %s\n' % instance_id) + return info + def _timer_expired(self, name): LOG.info("%s: timer expired %s" % (self.session_id, name)) if name in self.timer.keys(): @@ -228,6 +286,7 @@ class BaseWorkflow(Thread): name)) def cleanup(self): + db_api.remove_session(self.session_id) LOG.info("%s: cleanup" % self.session_id) def stop(self): @@ -244,14 +303,16 @@ class BaseWorkflow(Thread): def run(self): LOG.info("%s: started" % self.session_id) while not self.stopped: - if self.state not in ["MAINTENANCE_DONE", "MAINTENANCE_FAILED"]: + if self.session.state not in ["MAINTENANCE_DONE", + "MAINTENANCE_FAILED"]: try: - statefunc = getattr(self, self.states_methods[self.state]) + statefunc = (getattr(self, + self.states_methods[self.session.state])) statefunc() except Exception as e: LOG.error("%s: %s Raised exception: %s" % (self.session_id, statefunc, e), exc_info=True) - self.state = "MAINTENANCE_FAILED" + self.session.state = "MAINTENANCE_FAILED" else: time.sleep(1) # IDLE while session removed @@ -263,7 +324,7 @@ class BaseWorkflow(Thread): str(alarm['event_rule']['event_type']) == match_event]) all_projects_match = True - for project in self.session_data.project_names(): + for project in self.project_names(): if project not in match_projects: LOG.error('%s: project %s not ' 'listening to %s' % @@ -284,7 +345,7 @@ class BaseWorkflow(Thread): actions_at=actions_at, reply_at=reply_at, session_id=self.session_id, - metadata=metadata, + metadata=literal_eval(metadata), reply_url=reply_url) LOG.info('Sending "maintenance.planned" to project: %s' % payload) @@ -311,22 +372,22 @@ class BaseWorkflow(Thread): continue elif pstate == state_nack: LOG.error('%s: %s from %s' % - (self.session_id, pstate, project.name)) + (self.session_id, pstate, project.project_id)) break else: LOG.error('%s: Project %s in invalid state %s' % - (self.session_id, project.name, pstate)) + (self.session_id, project.project_id, pstate)) break return pstate def _project_names_in_state(self, projects, state): - return ([project.name for project in projects if + return ([project.project_id for project in projects if project.state == state]) def wait_projects_state(self, state, timer_name): state_ack = 'ACK_%s' % state state_nack = 'NACK_%s' % state - projects = self.session_data.get_projects_with_state() + projects = self.get_projects_with_state() if not projects: LOG.error('%s: wait_projects_state %s. Emtpy project list' % (self.session_id, state)) @@ -340,18 +401,18 @@ class BaseWorkflow(Thread): LOG.info('all projects in: %s' % state_ack) return True elif answer == state_nack: - pnames = self._projects_in_state(projects, answer) + pnames = self._project_names_in_state(projects, answer) LOG.error('%s: projects rejected with %s: %s' % (self.session_id, answer, pnames)) return False else: - pnames = self._projects_in_state(projects, answer) + pnames = self._project_names_in_state(projects, answer) LOG.error('%s: projects with invalid state %s: %s' % (self.session_id, answer, pnames)) return False time.sleep(1) LOG.error('%s: timer %s expired waiting answer to state %s' % (self.session_id, timer_name, state)) - pnames = self._projects_in_state(projects, state) + pnames = self._project_names_in_state(projects, state) LOG.error('%s: projects not answered: %s' % (self.session_id, pnames)) return False diff --git a/fenix/workflow/workflows/default.py b/fenix/workflow/workflows/default.py index 5cf250a..7e05f63 100644 --- a/fenix/workflow/workflows/default.py +++ b/fenix/workflow/workflows/default.py @@ -13,15 +13,17 @@ # License for the specific language governing permissions and limitations # under the License. import datetime + +from novaclient import API_MAX_VERSION as nova_max_version import novaclient.client as novaclient from novaclient.exceptions import BadRequest from oslo_log import log as logging import time +from fenix.utils.time import datetime_to_str from fenix.utils.time import is_time_after_time from fenix.utils.time import reply_time_str -from fenix.utils.time import str_to_datetime from fenix.utils.time import time_now_str @@ -34,72 +36,161 @@ class Workflow(BaseWorkflow): def __init__(self, conf, session_id, data): super(Workflow, self).__init__(conf, session_id, data) - self.nova = novaclient.Client(version='2.34', session=self.session) + self.nova = novaclient.Client(nova_max_version.get_string(), + session=self.auth_session) + self._init_update_hosts() LOG.info("%s: initialized" % self.session_id) - def cleanup(self): - LOG.info("%s: cleanup" % self.session_id) + def _init_update_hosts(self): + controllers = self.nova.services.list(binary='nova-conductor') + computes = self.nova.services.list(binary='nova-compute') + for host in self.hosts: + hostname = host.hostname + match = [compute for compute in computes if + hostname == compute.host] + if match: + host.type = 'compute' + if match[0].status == 'disabled': + LOG.info("compute status from services") + host.disabled = True + continue + if ([controller for controller in controllers if + hostname == controller.host]): + host.type = 'controller' + continue + host.type = 'other' - def stop(self): - LOG.info("%s: stop" % self.session_id) - self.stopped = True + def get_compute_hosts(self): + return [host.hostname for host in self.hosts + if host.type == 'compute'] - def is_ha_instance(self, instance): + def get_empty_computes(self): + all_computes = self.get_compute_hosts() + instance_computes = [] + for instance in self.instances: + if instance.host not in instance_computes: + instance_computes.append(instance.host) + return [host for host in all_computes if host not in instance_computes] + + def get_instance_details(self, instance): network_interfaces = next(iter(instance.addresses.values())) for network_interface in network_interfaces: _type = network_interface.get('OS-EXT-IPS:type') if _type == "floating": LOG.info('Instance with floating ip: %s %s' % (instance.id, instance.name)) - return True - return False + return "floating_ip" + return None + + def _fenix_instance(self, project_id, instance_id, instance_name, host, + state, details, action=None, project_state=None, + action_done=False): + instance = {'session_id': self.session_id, + 'instance_id': instance_id, + 'action': action, + 'project_id': project_id, + 'instance_id': instance_id, + 'project_state': project_state, + 'state': state, + 'instance_name': instance_name, + 'action_done': action_done, + 'host': host, + 'details': details} + return instance def initialize_server_info(self): + project_ids = [] + instances = [] + compute_hosts = self.get_compute_hosts() opts = {'all_tenants': True} servers = self.nova.servers.list(detailed=True, search_opts=opts) for server in servers: try: host = str(server.__dict__.get('OS-EXT-SRV-ATTR:host')) - project = str(server.tenant_id) + if host not in compute_hosts: + continue + project_id = str(server.tenant_id) instance_name = str(server.name) instance_id = str(server.id) - ha = self.is_ha_instance(server) + details = self.get_instance_details(server) + state = str(server.__dict__.get('OS-EXT-STS:vm_state')) except Exception: raise Exception('can not get params from server=%s' % server) - self.session_data.add_instance(project, - instance_id, - instance_name, - host, - ha) - LOG.info(str(self.session_data)) + instances.append(self._fenix_instance(project_id, instance_id, + instance_name, host, state, + details)) + if project_id not in project_ids: + project_ids.append(project_id) + + self.projects = self.init_projects(project_ids) + self.instances = self.add_instances(instances) + LOG.info(str(self)) + + def update_instance(self, project_id, instance_id, instance_name, host, + state, details): + if self.instance_id_found(instance_id): + # TBD Might need to update instance variables here if not done + # somewhere else + return + elif self.instance_name_found(instance_name): + # Project has made re-instantiation, remove old add new + old_instance = self.instance_by_name(instance_name) + instance = self._fenix_instance(project_id, instance_id, + instance_name, host, + state, details, + old_instance.action, + old_instance.project_state, + old_instance.action_done) + self.instances.append(self.add_instance(instance)) + self.remove_instance(old_instance) + else: + # Instance new, as project has added instances + instance = self._fenix_instance(project_id, instance_id, + instance_name, host, + state, details) + self.instances.append(self.add_instance(instance)) + + def remove_non_existing_instances(self, instance_ids): + remove_instances = [instance for instance in + self.instances if instance.instance_id not in + instance_ids] + for instance in remove_instances: + # Instance deleted, as project possibly scaled down + self.remove_instance(instance) def update_server_info(self): + # TBD This keeps internal instance information up-to-date and prints + # it out. Same could be done by updating the information when changed + # Anyhow this also double checks information against Nova + instance_ids = [] + compute_hosts = self.get_compute_hosts() opts = {'all_tenants': True} servers = self.nova.servers.list(detailed=True, search_opts=opts) - # TBD actually update, not regenerate - self.session_data.instances = [] for server in servers: try: host = str(server.__dict__.get('OS-EXT-SRV-ATTR:host')) - project = str(server.tenant_id) + if host not in compute_hosts: + continue + project_id = str(server.tenant_id) instance_name = str(server.name) instance_id = str(server.id) - ha = self.is_ha_instance(server) + details = self.get_instance_details(server) + state = str(server.__dict__.get('OS-EXT-STS:vm_state')) except Exception: raise Exception('can not get params from server=%s' % server) - self.session_data.add_instance(project, - instance_id, - instance_name, - host, - ha) - LOG.info(str(self.session_data)) + self.update_instance(project_id, instance_id, instance_name, host, + state, details) + instance_ids.append(instance_id) + self.remove_non_existing_instances(instance_ids) + + LOG.info(str(self)) def confirm_maintenance(self): allowed_actions = [] - actions_at = self.session_data.maintenance_at + actions_at = self.session.maintenance_at state = 'MAINTENANCE' - self.session_data.set_projets_state(state) - for project in self.session_data.project_names(): + self.set_projets_state(state) + for project in self.project_names(): LOG.info('\nMAINTENANCE to project %s\n' % project) instance_ids = '%s/v1/maintenance/%s/%s' % (self.url, self.session_id, @@ -108,9 +199,9 @@ class Workflow(BaseWorkflow): if is_time_after_time(reply_at, actions_at): LOG.error('%s: No time for project to answer in state: %s' % (self.session_id, state)) - self.state = "MAINTENANCE_FAILED" + self.session.state = "MAINTENANCE_FAILED" return False - metadata = self.session_data.metadata + metadata = self.session.meta self._project_notify(project, instance_ids, allowed_actions, actions_at, reply_at, state, metadata) self.start_timer(self.conf.project_maintenance_reply, @@ -122,13 +213,13 @@ class Workflow(BaseWorkflow): actions_at = reply_time_str(self.conf.project_scale_in_reply) reply_at = actions_at state = 'SCALE_IN' - self.session_data.set_projets_state(state) - for project in self.session_data.project_names(): + self.set_projets_state(state) + for project in self.project_names(): LOG.info('\nSCALE_IN to project %s\n' % project) instance_ids = '%s/v1/maintenance/%s/%s' % (self.url, self.session_id, project) - metadata = self.session_data.metadata + metadata = self.session.meta self._project_notify(project, instance_ids, allowed_actions, actions_at, reply_at, state, metadata) self.start_timer(self.conf.project_scale_in_reply, @@ -143,7 +234,7 @@ class Workflow(BaseWorkflow): LOG.info('checking hypervisors for VCPU capacity') for hvisor in hvisors: hostname = hvisor.__getattr__('hypervisor_hostname') - if hostname not in self.session_data.hosts: + if hostname not in self.get_compute_hosts(): continue vcpus = hvisor.__getattr__('vcpus') vcpus_used = hvisor.__getattr__('vcpus_used') @@ -170,70 +261,69 @@ class Workflow(BaseWorkflow): return vcpus - vcpus_used def find_host_to_be_empty(self): - # Preferrably host with most free vcpus, no ha instances and least - # instances altogether + # Preferrably host with most free vcpus, no floating ip instances and + # least instances altogether host_to_be_empty = None - host_nonha_instances = 0 + host_no_fip_instances = 0 host_free_vcpus = 0 hvisors = self.nova.hypervisors.list(detailed=True) - for host in self.session_data.hosts: + for host in self.get_compute_hosts(): free_vcpus = self.get_free_vcpus_by_host(host, hvisors) - ha_instances = 0 - nonha_instances = 0 - for project in self.session_data.project_names(): - for instance in ( - self.session_data.instances_by_host_and_project(host, - project)): - if instance.ha: - ha_instances += 1 + fip_instances = 0 + no_fip_instances = 0 + for project in self.project_names(): + for instance in (self.instances_by_host_and_project(host, + project)): + if instance.details and "floating_ip" in instance.details: + fip_instances += 1 else: - nonha_instances += 1 - LOG.info('host %s has %d ha and %d non ha instances %s free ' - 'vcpus' % (host, ha_instances, nonha_instances, + no_fip_instances += 1 + LOG.info('%s has %d floating ip and %d other instances %s free ' + 'vcpus' % (host, fip_instances, no_fip_instances, free_vcpus)) - if ha_instances == 0: - # We do not want to choose host with HA instance + if fip_instances == 0: + # We do not want to choose host with floating ip instance if host_to_be_empty: # We have host candidate, let's see if this is better if free_vcpus > host_free_vcpus: # Choose as most vcpus free host_to_be_empty = host - host_nonha_instances = nonha_instances + host_no_fip_instances = no_fip_instances host_free_vcpus = 0 elif free_vcpus == host_free_vcpus: - if nonha_instances < host_nonha_instances: + if no_fip_instances < host_no_fip_instances: # Choose as most vcpus free and least instances host_to_be_empty = host - host_nonha_instances = nonha_instances + host_no_fip_instances = no_fip_instances host_free_vcpus = 0 else: # This is first host candidate host_to_be_empty = host - host_nonha_instances = nonha_instances + host_no_fip_instances = no_fip_instances host_free_vcpus = 0 if not host_to_be_empty: # No best cadidate found, let's choose last host in loop host_to_be_empty = host LOG.info('host %s selected to be empty' % host_to_be_empty) # TBD It might yet not be possible to move instances away from this - # host is other hosts has vcpu capacity scattered. It should be checked - # if instances on this host fits to other hosts + # host if other hosts has free vcpu capacity scattered. It should + # checked if instances on this host fits to other hosts return host_to_be_empty def confirm_host_to_be_emptied(self, host, state): allowed_actions = ['MIGRATE', 'LIVE_MIGRATE', 'OWN_ACTION'] actions_at = reply_time_str(self.conf.project_maintenance_reply) reply_at = actions_at - self.session_data.set_projects_state_and_host_instances(state, host) - for project in self.session_data.project_names(): - if not self.session_data.project_has_state_instances(project): + self.set_projects_state_and_hosts_instances(state, [host]) + for project in self.project_names(): + if not self.project_has_state_instances(project): continue LOG.info('%s to project %s' % (state, project)) instance_ids = '%s/v1/maintenance/%s/%s' % (self.url, self.session_id, project) - metadata = self.session_data.metadata + metadata = self.session.meta self._project_notify(project, instance_ids, allowed_actions, actions_at, reply_at, state, metadata) self.start_timer(self.conf.project_maintenance_reply, @@ -242,11 +332,11 @@ class Workflow(BaseWorkflow): def confirm_maintenance_complete(self): state = 'MAINTENANCE_COMPLETE' - metadata = self.session_data.metadata + metadata = self.session.meta actions_at = reply_time_str(self.conf.project_scale_in_reply) reply_at = actions_at - self.session_data.set_projets_state(state) - for project in self.session_data.project_names(): + self.set_projets_state(state) + for project in self.project_names(): LOG.info('%s to project %s' % (state, project)) instance_ids = '%s/v1/maintenance/%s/%s' % (self.url, self.session_id, @@ -258,38 +348,39 @@ class Workflow(BaseWorkflow): '%s_TIMEOUT' % state) return self.wait_projects_state(state, '%s_TIMEOUT' % state) - def notify_action_done(self, project, instance_id): - instance_ids = instance_id + def notify_action_done(self, project, instance): + instance_ids = [instance.instance_id] allowed_actions = [] actions_at = None reply_at = None state = "INSTANCE_ACTION_DONE" - metadata = None + instance.project_state = state + metadata = "{}" self._project_notify(project, instance_ids, allowed_actions, actions_at, reply_at, state, metadata) def actions_to_have_empty_host(self, host): # TBD these might be done parallel - for project in self.session_data.proj_instance_actions.keys(): + for project in self.proj_instance_actions.keys(): instances = ( - self.session_data.instances_by_host_and_project(host, project)) + self.instances_by_host_and_project(host, project)) for instance in instances: - action = (self.session_data.instance_action_by_project_reply( - project, instance.instance_id)) - LOG.info('Action %s instance %s ' % (action, + instance.action = (self.instance_action_by_project_reply( + project, instance.instance_id)) + LOG.info('Action %s instance %s ' % (instance.action, instance.instance_id)) - if action == 'MIGRATE': - if not self.migrate_server(instance.instance_id): + if instance.action == 'MIGRATE': + if not self.migrate_server(instance): return False - self.notify_action_done(project, instance.instance_id) - elif action == 'OWN_ACTION': + self.notify_action_done(project, instance) + elif instance.action == 'OWN_ACTION': pass else: # TBD LIVE_MIGRATE not supported raise Exception('%s: instance %s action ' '%s not supported' % (self.session_id, instance.instance_id, - action)) + instance.action)) return self._wait_host_empty(host) def _wait_host_empty(self, host): @@ -311,38 +402,41 @@ class Workflow(BaseWorkflow): LOG.info('%s host still not empty' % host) return False - def migrate_server(self, server_id): + def migrate_server(self, instance): # TBD this method should be enhanced for errors and to have failed # instance back to state active instead of error + server_id = instance.instance_id server = self.nova.servers.get(server_id) - vm_state = server.__dict__.get('OS-EXT-STS:vm_state') - LOG.info('server %s state %s' % (server_id, vm_state)) - last_vm_state = vm_state + instance.state = server.__dict__.get('OS-EXT-STS:vm_state') + LOG.info('server %s state %s' % (server_id, instance.state)) + last_vm_state = instance.state retry_migrate = 2 while True: try: server.migrate() time.sleep(5) retries = 36 - while vm_state != 'resized' and retries > 0: + while instance.state != 'resized' and retries > 0: # try to confirm within 3min server = self.nova.servers.get(server_id) - vm_state = server.__dict__.get('OS-EXT-STS:vm_state') - if vm_state == 'resized': + instance.state = server.__dict__.get('OS-EXT-STS:vm_state') + if instance.state == 'resized': server.confirm_resize() LOG.info('instance %s migration confirmed' % server_id) + instance.host = ( + str(server.__dict__.get('OS-EXT-SRV-ATTR:host'))) return True - if last_vm_state != vm_state: + if last_vm_state != instance.state: LOG.info('instance %s state: %s' % (server_id, - vm_state)) - if vm_state == 'error': + instance.state)) + if instance.state == 'error': LOG.error('instance %s migration failed, state: %s' - % (server_id, vm_state)) + % (server_id, instance.state)) return False time.sleep(5) retries = retries - 1 - last_vm_state = vm_state + last_vm_state = instance.state # Timout waiting state to change break @@ -365,7 +459,7 @@ class Workflow(BaseWorkflow): finally: retry_migrate = retry_migrate - 1 LOG.error('instance %s migration timeout, state: %s' % - (server_id, vm_state)) + (server_id, instance.state)) return False def host_maintenance(self, host): @@ -379,34 +473,33 @@ class Workflow(BaseWorkflow): self.initialize_server_info() if not self.projects_listen_alarm('maintenance.scheduled'): - self.state = 'MAINTENANCE_FAILED' + self.session.state = 'MAINTENANCE_FAILED' return if not self.confirm_maintenance(): - self.state = 'MAINTENANCE_FAILED' + self.session.state = 'MAINTENANCE_FAILED' return - maintenance_empty_hosts = self.session_data.get_empty_hosts() + maintenance_empty_hosts = self.get_empty_computes() if len(maintenance_empty_hosts) == 0: if self.need_scale_in(): LOG.info('%s: Need to scale in to get capacity for ' 'empty host' % (self.session_id)) - self.state = 'SCALE_IN' + self.session.state = 'SCALE_IN' else: LOG.info('%s: Free capacity, but need empty host' % (self.session_id)) - self.state = 'PREPARE_MAINTENANCE' + self.session.state = 'PREPARE_MAINTENANCE' else: LOG.info('Empty host found') - self.state = 'START_MAINTENANCE' + self.session.state = 'START_MAINTENANCE' - maint_at = str_to_datetime(self.session_data.maintenance_at) - if maint_at > datetime.datetime.utcnow(): + if self.session.maintenance_at > datetime.datetime.utcnow(): time_now = time_now_str() LOG.info('Time now: %s maintenance starts: %s....' % - (time_now, self.session_data.maintenance_at)) - td = maint_at - datetime.datetime.utcnow() + (time_now, datetime_to_str(self.session.maintenance_at))) + td = self.session.maintenance_at - datetime.datetime.utcnow() self.start_timer(td.total_seconds(), 'MAINTENANCE_START_TIMEOUT') while not self.is_timer_expired('MAINTENANCE_START_TIMEOUT'): time.sleep(1) @@ -418,31 +511,31 @@ class Workflow(BaseWorkflow): LOG.info("%s: scale in" % self.session_id) if not self.confirm_scale_in(): - self.state = 'MAINTENANCE_FAILED' + self.session.state = 'MAINTENANCE_FAILED' return - # TBD it takes time to have proper infromation updated about free + # TBD it takes time to have proper information updated about free # capacity. Should make sure instances removed has also VCPUs removed self.update_server_info() - maintenance_empty_hosts = self.session_data.get_empty_hosts() + maintenance_empty_hosts = self.get_empty_computes() if len(maintenance_empty_hosts) == 0: if self.need_scale_in(): LOG.info('%s: Need to scale in more to get capacity for ' 'empty host' % (self.session_id)) - self.state = 'SCALE_IN' + self.session.state = 'SCALE_IN' else: LOG.info('%s: Free capacity, but need empty host' % (self.session_id)) - self.state = 'PREPARE_MAINTENANCE' + self.session.state = 'PREPARE_MAINTENANCE' else: LOG.info('Empty host found') - self.state = 'START_MAINTENANCE' + self.session.state = 'START_MAINTENANCE' def prepare_maintenance(self): LOG.info("%s: prepare_maintenance called" % self.session_id) host = self.find_host_to_be_empty() if not self.confirm_host_to_be_emptied(host, 'PREPARE_MAINTENANCE'): - self.state = 'MAINTENANCE_FAILED' + self.session.state = 'MAINTENANCE_FAILED' return if not self.actions_to_have_empty_host(host): # TBD we found the hard way that we couldn't make host empty and @@ -451,19 +544,19 @@ class Workflow(BaseWorkflow): # what instance on which host LOG.info('%s: Failed to empty %s. Need to scale in more to get ' 'capacity for empty host' % (self.session_id, host)) - self.state = 'SCALE_IN' + self.session.state = 'SCALE_IN' else: - self.state = 'START_MAINTENANCE' + self.session.state = 'START_MAINTENANCE' self.update_server_info() def start_maintenance(self): LOG.info("%s: start_maintenance called" % self.session_id) - empty_hosts = self.session_data.get_empty_hosts() + empty_hosts = self.get_empty_computes() if not empty_hosts: LOG.info("%s: No empty host to be maintained" % self.session_id) - self.state = 'MAINTENANCE_FAILED' + self.session.state = 'MAINTENANCE_FAILED' return - maintained_hosts = self.session_data.maintained_hosts + maintained_hosts = self.get_maintained_hosts() if not maintained_hosts: # First we maintain all empty hosts for host in empty_hosts: @@ -481,7 +574,7 @@ class Workflow(BaseWorkflow): 'MAINTENANCE_COMPLETE', self.session_id) LOG.info('MAINTENANCE_COMPLETE host %s' % host) - maintained_hosts.append(host) + self.host_maintained(host) else: # Now we maintain hosts gone trough PLANNED_MAINTENANCE hosts = [h for h in empty_hosts if h not in maintained_hosts] @@ -498,42 +591,44 @@ class Workflow(BaseWorkflow): 'MAINTENANCE_COMPLETE', self.session_id) LOG.info('MAINTENANCE_COMPLETE host %s' % host) - maintained_hosts.append(host) - if [h for h in self.session_data.hosts if h not in maintained_hosts]: + + self.host_maintained(host) + maintained_hosts = self.get_maintained_hosts() + if len(maintained_hosts) != len(self.hosts): # Not all host maintained - self.state = 'PLANNED_MAINTENANCE' + self.session.state = 'PLANNED_MAINTENANCE' else: - self.state = 'MAINTENANCE_COMPLETE' + self.session.state = 'MAINTENANCE_COMPLETE' def planned_maintenance(self): LOG.info("%s: planned_maintenance called" % self.session_id) - maintained_hosts = self.session_data.maintained_hosts - not_maintained_hosts = ([h for h in self.session_data.hosts if h not in - maintained_hosts]) + maintained_hosts = self.get_maintained_hosts() + not_maintained_hosts = ([h.hostname for h in self.hosts if h.hostname + not in maintained_hosts]) LOG.info("%s: Not maintained hosts: %s" % (self.session_id, not_maintained_hosts)) host = not_maintained_hosts[0] if not self.confirm_host_to_be_emptied(host, 'PLANNED_MAINTENANCE'): - self.state = 'MAINTENANCE_FAILED' + self.session.state = 'MAINTENANCE_FAILED' return if not self.actions_to_have_empty_host(host): # Failure in here might indicate action to move instance failed. # This might be as Nova VCPU capacity was not yet emptied from # expected target hosts - self.state = 'MAINTENANCE_FAILED' + self.session.state = 'MAINTENANCE_FAILED' return self.update_server_info() - self.state = 'START_MAINTENANCE' + self.session.state = 'START_MAINTENANCE' def maintenance_complete(self): LOG.info("%s: maintenance_complete called" % self.session_id) LOG.info('Projects may still need to up scale back to full ' 'capcity') if not self.confirm_maintenance_complete(): - self.state = 'MAINTENANCE_FAILED' + self.session.state = 'MAINTENANCE_FAILED' return self.update_server_info() - self.state = 'MAINTENANCE_DONE' + self.session.state = 'MAINTENANCE_DONE' def maintenance_done(self): pass