Introduce database functionality into KDS

Add database setup and initial testing into Key Distribution Server.
Also adds a migration script.

Change-Id: I22e914ef1ca6ec8217d131c073261b309d9f637e
This commit is contained in:
Jamie Lennox 2013-12-03 12:33:28 +10:00
parent 987407a23c
commit a553d98dd4
47 changed files with 4640 additions and 83 deletions

View File

@ -1,7 +1,7 @@
[DEFAULT]
#
# Options defined in keystone.contrib.kds.common.service
# Options defined in kite.common.service
#
# IP for the server to bind to (string value)
@ -10,32 +10,9 @@
# The port for the server (integer value)
#port=9109
#
# Options defined in keystone.openstack.common.db.sqlalchemy.session
#
# the filename to use with sqlite (string value)
#sqlite_db=keystone.sqlite
# If true, use synchronous mode for sqlite (boolean value)
#sqlite_synchronous=true
#
# Options defined in keystone.openstack.common.eventlet_backdoor
#
# Enable eventlet backdoor. Acceptable values are 0, <port>,
# and <start>:<end>, where 0 results in listening on a random
# tcp port number; <port> results in listening on the
# specified port number (and not enabling backdoor if that
# port is in use); and <start>:<end> results in listening on
# the smallest unused port number within the specified range
# of port numbers. The chosen port is displayed in the
# service's log file. (string value)
#backdoor_port=<None>
#
# Options defined in keystone.openstack.common.lockutils
# Options defined in kite.openstack.common.lockutils
#
# Whether to disable inter-process locks (boolean value)
@ -44,8 +21,9 @@
# Directory to use for lock files. (string value)
#lock_path=<None>
#
# Options defined in keystone.openstack.common.log
# Options defined in kite.openstack.common.log
#
# Print debugging output (set logging level to DEBUG instead
@ -59,29 +37,29 @@
# Log output to standard error (boolean value)
#use_stderr=true
# format string to use for log messages with context (string
# Format string to use for log messages with context (string
# value)
#logging_context_format_string=%(asctime)s.%(msecs)03d %(process)d %(levelname)s %(name)s [%(request_id)s %(user)s %(tenant)s] %(instance)s%(message)s
#logging_context_format_string=%(asctime)s.%(msecs)03d %(process)d %(levelname)s %(name)s [%(request_id)s %(user_identity)s] %(instance)s%(message)s
# format string to use for log messages without context
# Format string to use for log messages without context
# (string value)
#logging_default_format_string=%(asctime)s.%(msecs)03d %(process)d %(levelname)s %(name)s [-] %(instance)s%(message)s
# data to append to log format when level is DEBUG (string
# Data to append to log format when level is DEBUG (string
# value)
#logging_debug_format_suffix=%(funcName)s %(pathname)s:%(lineno)d
# prefix each line of exception output with this format
# Prefix each line of exception output with this format
# (string value)
#logging_exception_prefix=%(asctime)s.%(msecs)03d %(process)d TRACE %(name)s %(instance)s
# list of logger=LEVEL pairs (list value)
#default_log_levels=amqp=WARN,amqplib=WARN,boto=WARN,keystone=INFO,qpid=WARN,sqlalchemy=WARN,suds=INFO,iso8601=WARN
# List of logger=LEVEL pairs (list value)
#default_log_levels=amqp=WARN,amqplib=WARN,boto=WARN,qpid=WARN,sqlalchemy=WARN,suds=INFO,iso8601=WARN,requests.packages.urllib3.connectionpool=WARN
# publish error events (boolean value)
# Publish error events (boolean value)
#publish_errors=false
# make deprecations fatal (boolean value)
# Make deprecations fatal (boolean value)
#fatal_deprecations=false
# If an instance is passed with the log message, format it
@ -121,75 +99,56 @@
# Deprecated group/name - [DEFAULT]/logdir
#log_dir=<None>
# Use syslog for logging. (boolean value)
# Use syslog for logging. Existing syslog format is DEPRECATED
# during I, and then will be changed in J to honor RFC5424
# (boolean value)
#use_syslog=false
# syslog facility to receive log lines (string value)
# (Optional) Use syslog rfc5424 format for logging. If
# enabled, will add APP-NAME (RFC5424) before the MSG part of
# the syslog message. The old format without APP-NAME is
# deprecated in I, and will be removed in J. (boolean value)
#use_syslog_rfc_format=false
# Syslog facility to receive log lines (string value)
#syslog_log_facility=LOG_USER
#
# Options defined in keystone.openstack.common.policy
#
# JSON file containing policy (string value)
#policy_file=policy.json
# Rule enforced when requested rule is not found (string
# value)
#policy_default_rule=default
[ssl]
#
# Options defined in keystone.openstack.common.sslutils
#
# CA certificate file to use to verify connecting clients
# (string value)
#ca_file=<None>
# Certificate file to use when starting the server securely
# (string value)
#cert_file=<None>
# Private key file to use when starting the server securely
# (string value)
#key_file=<None>
[database]
#
# Options defined in keystone.openstack.common.db.api
# Options defined in kite.openstack.common.db.options
#
# The file name to use with SQLite (string value)
#sqlite_db=kite.sqlite
# If True, SQLite uses synchronous mode (boolean value)
#sqlite_synchronous=true
# The backend to use for db (string value)
# Deprecated group/name - [DEFAULT]/db_backend
#backend=sqlalchemy
# Enable the experimental use of thread pooling for all DB API
# calls (boolean value)
# Deprecated group/name - [DEFAULT]/dbapi_use_tpool
#use_tpool=false
#
# Options defined in keystone.openstack.common.db.sqlalchemy.session
#
# The SQLAlchemy connection string used to connect to the
# database (string value)
# Deprecated group/name - [DEFAULT]/sql_connection
# Deprecated group/name - [DATABASE]/sql_connection
# Deprecated group/name - [sql]/connection
#connection=sqlite:////keystone/openstack/common/db/$sqlite_db
#connection=<None>
# The SQLAlchemy connection string used to connect to the
# slave database (string value)
#slave_connection=
# The SQL mode to be used for MySQL sessions. This option,
# including the default, overrides any server-set SQL mode. To
# use whatever SQL mode is set by the server configuration,
# set this to no value. Example: mysql_sql_mode= (string
# value)
#mysql_sql_mode=TRADITIONAL
# timeout before idle sql connections are reaped (integer
# Timeout before idle sql connections are reaped (integer
# value)
# Deprecated group/name - [DEFAULT]/sql_idle_timeout
# Deprecated group/name - [DATABASE]/sql_idle_timeout
# Deprecated group/name - [sql]/idle_timeout
#idle_timeout=3600
# Minimum number of SQL connections to keep open in a pool
@ -204,13 +163,13 @@
# Deprecated group/name - [DATABASE]/sql_max_pool_size
#max_pool_size=<None>
# maximum db connection retries during startup. (setting -1
# Maximum db connection retries during startup. (setting -1
# implies an infinite retry count) (integer value)
# Deprecated group/name - [DEFAULT]/sql_max_retries
# Deprecated group/name - [DATABASE]/sql_max_retries
#max_retries=10
# interval between retries of opening a sql connection
# Interval between retries of opening a sql connection
# (integer value)
# Deprecated group/name - [DEFAULT]/sql_retry_interval
# Deprecated group/name - [DATABASE]/reconnect_interval
@ -236,3 +195,24 @@
# (integer value)
# Deprecated group/name - [DATABASE]/sqlalchemy_pool_timeout
#pool_timeout=<None>
# Enable the experimental use of database reconnect on
# connection lost (boolean value)
#use_db_reconnect=false
# seconds between db connection retries (integer value)
#db_retry_interval=1
# Whether to increase interval between db connection retries,
# up to db_max_retry_interval (boolean value)
#db_inc_retry_interval=true
# max seconds between db connection retries, if
# db_inc_retry_interval is enabled (integer value)
#db_max_retry_interval=10
# maximum db connection retries before error is raised.
# (setting -1 implies an infinite retry count) (integer value)
#db_max_retries=20

66
kite/cli/manage.py Normal file
View File

@ -0,0 +1,66 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import sys
from kite.openstack.common import gettextutils
gettextutils.install('kite')
from oslo.config import cfg
from kite.common import service
from kite.db import migration
CONF = cfg.CONF
def do_db_version():
"""Print database's current migration level."""
print(migration.version())
def do_db_upgrade():
return migration.upgrade(CONF.command.revision)
def do_db_downgrade():
return migration.downgrade(CONF.command.revision)
def add_command_parsers(subparsers):
parser = subparsers.add_parser('db_version')
parser.set_defaults(func=do_db_version)
parser = subparsers.add_parser('db_upgrade')
parser.set_defaults(func=do_db_upgrade)
parser.add_argument('--revision', nargs='?')
parser = subparsers.add_parser('db_downgrade')
parser.set_defaults(func=do_db_downgrade)
parser.add_argument('--revision', nargs='?')
command_opt = cfg.SubCommandOpt('command',
title='Commands',
help='Available commands',
handler=add_command_parsers)
def main():
CONF.register_cli_opt(command_opt)
service.prepare_service(sys.argv)
try:
CONF.command.func()
except Exception as e:
sys.exit("ERROR: %s" % e)

View File

@ -10,6 +10,45 @@
# License for the specific language governing permissions and limitations
# under the License.
_FATAL_EXCEPTION_FORMAT_ERRORS = False
class KdsException(Exception):
pass
"""Base Exception class.
To correctly use this class, inherit from it and define
a 'msg_fmt' property. That message will get printf'd
with the keyword arguments provided to the constructor.
"""
msg_fmt = _('An unknown exception occurred')
def __init__(self, **kwargs):
try:
self._error_string = self.msg_fmt % kwargs
except Exception:
if _FATAL_EXCEPTION_FORMAT_ERRORS:
raise
else:
# at least get the core message out if something happened
self._error_string = self.msg_fmt
def __str__(self):
return self._error_string
class BackendException(KdsException):
msg_fmt = _("Failed to load the '%(backend)s' backend because it is not "
"allowed. Allowed backends are: %(allowed)s")
class IntegrityError(KdsException):
msg_fmt = _('Cannot set key data for %(name)s: %(reason)s')
class GroupStatusChanged(IntegrityError):
def __init__(self, **kwargs):
kwargs.setdefault('reason', "Can't change group status of a host")
super(GroupStatusChanged, self).__init__(**kwargs)

View File

@ -10,10 +10,18 @@
# License for the specific language governing permissions and limitations
# under the License.
import os
from oslo.config import cfg
from kite.openstack.common.db import options
from kite.openstack.common import log
_COMMON_PATH = os.path.abspath(os.path.dirname(__file__))
_ROOT_PATH = os.path.normpath(os.path.join(_COMMON_PATH, '..', '..'))
_DEFAULT_SQL_CONNECTION = 'sqlite:///%s' % os.path.join(_ROOT_PATH,
'kite.sqlite')
CONF = cfg.CONF
API_SERVICE_OPTS = [
@ -35,6 +43,8 @@ def parse_args(args, default_config_files=None):
def prepare_service(argv=[]):
options.set_defaults(sql_connection=_DEFAULT_SQL_CONNECTION,
sqlite_db='kite.sqlite')
cfg.set_defaults(log.log_opts,
default_log_levels=['sqlalchemy=WARN',
'eventlet.wsgi.server=WARN'

59
kite/common/utils.py Normal file
View File

@ -0,0 +1,59 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo.config import cfg
from kite.common import exception
CONF = cfg.CONF
# NOTE(jamielennox): This class is a direct copy from nova, glance, heat and
# a bunch of other projects. It has been submitted to OSLO
# https://review.openstack.org/#/c/67002/ and should be synced when available.
class LazyPluggable(object):
"""A pluggable backend loaded lazily based on some value."""
def __init__(self, pivot, config_group=None, **backends):
self.__backends = backends
self.__pivot = pivot
self.__backend = None
self.__config_group = config_group
def __get_backend(self):
if not self.__backend:
if self.__config_group is None:
backend_name = CONF[self.__pivot]
else:
backend_name = CONF[self.__config_group][self.__pivot]
if backend_name not in self.__backends:
allowed = ', '.join(self.__backends.iterkeys())
raise exception.BackendException(backend=backend_name,
allowed=allowed)
backend = self.__backends[backend_name]
if isinstance(backend, tuple):
name = backend[0]
fromlist = backend[1]
else:
name = backend
fromlist = backend
self.__backend = __import__(name=name, globals=None,
locals=None, fromlist=fromlist)
return self.__backend
def __getattr__(self, key):
backend = self.__get_backend()
return getattr(backend, key)

0
kite/db/__init__.py Normal file
View File

40
kite/db/api.py Normal file
View File

@ -0,0 +1,40 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo.config import cfg
from kite.openstack.common.db import api as db_api
CONF = cfg.CONF
IMPL = CONF.database
CONF.import_opt('backend',
'kite.openstack.common.db.options',
group='database')
_BACKEND_MAPPING = {'sqlalchemy': 'kite.db.sqlalchemy.api',
'kvs': 'kite.db.kvs.api'}
def reset():
global IMPL
IMPL = None
def get_instance(force_new=False):
"""Return a DB API instance."""
global IMPL
if not IMPL or force_new:
IMPL = db_api.DBAPI(CONF.database.backend,
backend_mapping=_BACKEND_MAPPING)
return IMPL

63
kite/db/connection.py Normal file
View File

@ -0,0 +1,63 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import six
@six.add_metaclass(abc.ABCMeta)
class Connection(object):
@abc.abstractmethod
def set_key(self, name, key, signature, group, expiration=None):
"""Set a key for a name in the database.
If a key is set for an existing key name then a new key entry with a
new generation value is created.
:param string name: The unique name of the key to set.
:param string key: The key data to save.
:param string signature: The signature of the key data to save.
:param bool group: Whether this is a group key or not.
:param DateTime expiration: When the key should expire
(None is never expire).
:raises IntegrityError: If a key exists then new keys assigned to the
name must have the same 'group' setting. If the
value of group is changed an IntegrityError is
raised.
:returns int: The generation number of this key.
"""
@abc.abstractmethod
def get_key(self, name, generation=None, group=None):
"""Get key related to kds_id.
:param string name: The unique name of the key to fetch.
:param int generation: A specific generation of the key to retrieve. If
not specified the most recent generation is
retrieved.
:param bool group: If provided only retrieve this key if its group
value is the same.
:returns dict: A dictionary of the key information or None if not
found. Keys will contain:
- name: Unique name of the key.
- group: If this key is a group key or not.
- key: The key data.
- signature: The signature of the key data.
- generation: The generation of this key.
- expiration: When the key expires (or None).
Expired keys can be returned.
"""

0
kite/db/kvs/__init__.py Normal file
View File

70
kite/db/kvs/api.py Normal file
View File

@ -0,0 +1,70 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from kite.common import exception
from kite.db import connection
def get_backend():
return KvsDbImpl()
def reset():
pass
class KvsDbImpl(connection.Connection):
"""A simple in-memory Key Value backend.
KVS backends are designed for use in testing and for simple debugging.
This backend should not be deployed in any production systems.
"""
def __init__(self):
super(KvsDbImpl, self).__init__()
self.clear()
def clear(self):
self._data = dict()
def set_key(self, name, key, signature, group, expiration=None):
host = self._data.setdefault(name, {'latest_generation': 0,
'keys': dict(), 'group': group})
if host['group'] != group:
raise exception.GroupStatusChanged(name=name)
host['latest_generation'] += 1
host['keys'][host['latest_generation']] = {'key': key,
'signature': signature,
'expiration': expiration}
return host['latest_generation']
def get_key(self, name, generation=None, group=None):
response = {'name': name}
try:
host = self._data[name]
if generation is None:
generation = host['latest_generation']
key_data = host['keys'][generation]
except KeyError:
return None
response['generation'] = generation
response['group'] = host['group']
if group is not None and host['group'] != group:
return None
response.update(key_data)
return response

40
kite/db/migration.py Normal file
View File

@ -0,0 +1,40 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Database setup and migration commands."""
from oslo.config import cfg
from kite.common import utils
CONF = cfg.CONF
CONF.import_opt('backend',
'kite.openstack.common.db.options',
group='database')
IMPL = utils.LazyPluggable(pivot='backend',
config_group='database',
sqlalchemy='kite.db.sqlalchemy.migration')
INIT_VERSION = 0
def upgrade(revision=None):
return IMPL.upgrade(revision=revision)
def downgrade(revision=None):
return IMPL.downgrade(revision=revision)
def version():
return IMPL.version()

View File

View File

@ -0,0 +1,59 @@
# A generic, single database configuration.
[alembic]
# path to migration scripts
script_location = %(here)s/alembic
# template used to generate migration files
# file_template = %%(rev)s_%%(slug)s
# max length of characters to apply to the
# "slug" field
#truncate_slug_length = 40
# set to 'true' to run the environment during
# the 'revision' command, regardless of autogenerate
# revision_environment = false
# set to 'true' to allow .pyc and .pyo files without
# a source .py file to be detected as revisions in the
# versions/ directory
# sourceless = false
; sqlalchemy.url = driver://user:pass@localhost/dbname
# Logging configuration
[loggers]
keys = root,sqlalchemy,alembic
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARN
handlers = console
qualname =
[logger_sqlalchemy]
level = WARN
handlers =
qualname = sqlalchemy.engine
[logger_alembic]
level = INFO
handlers =
qualname = alembic
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S

View File

@ -0,0 +1,62 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import with_statement
from logging import config as log_config
from alembic import context
from kite.db.sqlalchemy import api as db_api
from kite.db.sqlalchemy import models
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
# Interpret the config file for Python logging.
# This line sets up loggers basically.
log_config.fileConfig(config.config_file_name)
# add your model's MetaData object here
# for 'autogenerate' support
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata
target_metadata = models.Base.metadata
# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.
def run_migrations_online():
"""Run migrations in 'online' mode.
In this scenario we need to create an Engine
and associate a connection with the context.
"""
engine = db_api.get_engine()
with engine.connect() as connection:
context.configure(connection=connection,
target_metadata=target_metadata)
try:
with context.begin_transaction():
context.run_migrations()
finally:
connection.close()
run_migrations_online()

View File

@ -0,0 +1,22 @@
"""${message}
Revision ID: ${up_revision}
Revises: ${down_revision}
Create Date: ${create_date}
"""
# revision identifiers, used by Alembic.
revision = ${repr(up_revision)}
down_revision = ${repr(down_revision)}
from alembic import op
import sqlalchemy as sa
${imports if imports else ""}
def upgrade():
${upgrades if upgrades else "pass"}
def downgrade():
${downgrades if downgrades else "pass"}

View File

@ -0,0 +1,76 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Initialize Kite Tables
Revision ID: 49c8b865f6b
Revises: None
Create Date: 2014-04-01 14:31:06.415935
"""
# revision identifiers, used by Alembic.
revision = '49c8b865f6b'
down_revision = None
from alembic import op
import sqlalchemy as sa
def upgrade():
op.create_table('hosts',
sa.Column('id',
sa.Integer(),
primary_key=True,
autoincrement=True),
sa.Column('name',
sa.Text(),
nullable=False),
sa.Column('group',
sa.Boolean(),
nullable=False),
sa.Column('latest_generation',
sa.Integer(),
nullable=False),
mysql_engine='InnoDB',
mysql_charset='utf8')
op.create_index('name_idx', 'hosts', ['name'],
unique=True, mysql_length=20)
op.create_table('keys',
sa.Column('host_id',
sa.Integer(),
sa.ForeignKey('hosts.id'),
primary_key=True,
autoincrement=False),
sa.Column('generation',
sa.Integer(),
primary_key=True,
autoincrement=False),
sa.Column('signature',
sa.LargeBinary(),
nullable=False),
sa.Column('enc_key',
sa.LargeBinary(),
nullable=False),
sa.Column('expiration',
sa.DateTime(),
nullable=True,
index=True),
mysql_engine='InnoDB',
mysql_charset='utf8')
def downgrade():
op.drop_table('keys')
op.drop_table('hosts')

105
kite/db/sqlalchemy/api.py Normal file
View File

@ -0,0 +1,105 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo.config import cfg
from sqlalchemy.orm import exc
from kite.common import exception
from kite.db import connection
from kite.db.sqlalchemy import models
from kite.openstack.common.db.sqlalchemy import session as db_session
CONF = cfg.CONF
_facade = None
def get_facade():
global _facade
if not _facade:
_facade = db_session.EngineFacade.from_config(CONF.database.connection,
CONF)
return _facade
def get_engine():
return get_facade().get_engine()
def get_session():
return get_facade().get_session()
def reset():
global _facade
_facade = None
def get_backend():
return SqlalchemyDbImpl()
class SqlalchemyDbImpl(connection.Connection):
def set_key(self, name, key, signature, group, expiration=None):
session = get_session()
with session.begin():
q = session.query(models.Host)
q = q.filter(models.Host.name == name)
try:
host = q.one()
except exc.NoResultFound:
host = models.Host(name=name,
latest_generation=0,
group=group)
else:
if host.group != group:
raise exception.GroupStatusChanged(name=name)
host.latest_generation += 1
host.keys.append(models.Key(signature=signature,
enc_key=key,
generation=host.latest_generation,
expiration=expiration))
session.add(host)
return host.latest_generation
def get_key(self, name, generation=None, group=None):
session = get_session()
query = session.query(models.Host, models.Key)
query = query.filter(models.Host.id == models.Key.host_id)
query = query.filter(models.Host.name == name)
if group is not None:
query = query.filter(models.Host.group == group)
if generation is not None:
query = query.filter(models.Key.generation == generation)
else:
query = query.filter(models.Host.latest_generation ==
models.Key.generation)
try:
result = query.one()
except exc.NoResultFound:
return None
return {'name': result.Host.name,
'group': result.Host.group,
'key': result.Key.enc_key,
'signature': result.Key.signature,
'generation': result.Key.generation,
'expiration': result.Key.expiration}

View File

@ -0,0 +1,45 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import alembic
from alembic import config as alembic_config
from alembic import migration as alembic_migration
from kite.db.sqlalchemy import api as db_api
def _alembic_config():
path = os.path.join(os.path.dirname(__file__), 'alembic.ini')
config = alembic_config.Config(path)
return config
def upgrade(revision=None, config=None):
alembic.command.upgrade(config or _alembic_config(), revision or 'head')
def downgrade(revision=None, config=None):
alembic.command.downgrade(config or _alembic_config(), revision or 'base')
def version(config=None):
"""Current database version.
:returns: Database version
:rtype: string
"""
engine = db_api.get_engine()
with engine.connect() as conn:
context = alembic_migration.MigrationContext.configure(conn)
return context.get_current_revision()

View File

@ -0,0 +1,60 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import sqlalchemy as sql
from sqlalchemy.ext.declarative import declarative_base
from kite.openstack.common.db.sqlalchemy import models
class KdsBase(models.ModelBase):
pass
Base = declarative_base(cls=KdsBase)
class Host(Base):
__tablename__ = 'hosts'
id = sql.Column(sql.Integer(), primary_key=True, autoincrement=True)
name = sql.Column(sql.Text(), index=True, unique=True, nullable=False)
group = sql.Column(sql.Boolean(), nullable=False, index=True)
latest_generation = sql.Column(sql.Integer(), nullable=False)
class Key(Base):
__tablename__ = 'keys'
host_id = sql.Column(sql.Integer(),
sql.ForeignKey('hosts.id'),
primary_key=True,
autoincrement=False)
generation = sql.Column(sql.Integer(),
primary_key=True,
autoincrement=False)
signature = sql.Column(sql.LargeBinary(),
nullable=False)
enc_key = sql.Column(sql.LargeBinary(),
nullable=False)
expiration = sql.Column(sql.DateTime(),
nullable=True,
index=True)
owner = sql.orm.relationship('Host',
backref=sql.orm.backref('keys',
order_by=sql.desc(
generation)))

View File

View File

@ -0,0 +1,302 @@
# Copyright 2012 SINA Corporation
# Copyright 2014 Cisco Systems, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
"""Extracts OpenStack config option info from module(s)."""
from __future__ import print_function
import argparse
import imp
import os
import re
import socket
import sys
import textwrap
from oslo.config import cfg
import six
import stevedore.named
from kite.openstack.common import gettextutils
from kite.openstack.common import importutils
gettextutils.install('kite')
STROPT = "StrOpt"
BOOLOPT = "BoolOpt"
INTOPT = "IntOpt"
FLOATOPT = "FloatOpt"
LISTOPT = "ListOpt"
DICTOPT = "DictOpt"
MULTISTROPT = "MultiStrOpt"
OPT_TYPES = {
STROPT: 'string value',
BOOLOPT: 'boolean value',
INTOPT: 'integer value',
FLOATOPT: 'floating point value',
LISTOPT: 'list value',
DICTOPT: 'dict value',
MULTISTROPT: 'multi valued',
}
OPTION_REGEX = re.compile(r"(%s)" % "|".join([STROPT, BOOLOPT, INTOPT,
FLOATOPT, LISTOPT, DICTOPT,
MULTISTROPT]))
PY_EXT = ".py"
BASEDIR = os.path.abspath(os.path.join(os.path.dirname(__file__),
"../../../../"))
WORDWRAP_WIDTH = 60
def generate(argv):
parser = argparse.ArgumentParser(
description='generate sample configuration file',
)
parser.add_argument('-m', dest='modules', action='append')
parser.add_argument('-l', dest='libraries', action='append')
parser.add_argument('srcfiles', nargs='*')
parsed_args = parser.parse_args(argv)
mods_by_pkg = dict()
for filepath in parsed_args.srcfiles:
pkg_name = filepath.split(os.sep)[1]
mod_str = '.'.join(['.'.join(filepath.split(os.sep)[:-1]),
os.path.basename(filepath).split('.')[0]])
mods_by_pkg.setdefault(pkg_name, list()).append(mod_str)
# NOTE(lzyeval): place top level modules before packages
pkg_names = sorted(pkg for pkg in mods_by_pkg if pkg.endswith(PY_EXT))
ext_names = sorted(pkg for pkg in mods_by_pkg if pkg not in pkg_names)
pkg_names.extend(ext_names)
# opts_by_group is a mapping of group name to an options list
# The options list is a list of (module, options) tuples
opts_by_group = {'DEFAULT': []}
if parsed_args.modules:
for module_name in parsed_args.modules:
module = _import_module(module_name)
if module:
for group, opts in _list_opts(module):
opts_by_group.setdefault(group, []).append((module_name,
opts))
# Look for entry points defined in libraries (or applications) for
# option discovery, and include their return values in the output.
#
# Each entry point should be a function returning an iterable
# of pairs with the group name (or None for the default group)
# and the list of Opt instances for that group.
if parsed_args.libraries:
loader = stevedore.named.NamedExtensionManager(
'oslo.config.opts',
names=list(set(parsed_args.libraries)),
invoke_on_load=False,
)
for ext in loader:
for group, opts in ext.plugin():
opt_list = opts_by_group.setdefault(group or 'DEFAULT', [])
opt_list.append((ext.name, opts))
for pkg_name in pkg_names:
mods = mods_by_pkg.get(pkg_name)
mods.sort()
for mod_str in mods:
if mod_str.endswith('.__init__'):
mod_str = mod_str[:mod_str.rfind(".")]
mod_obj = _import_module(mod_str)
if not mod_obj:
raise RuntimeError("Unable to import module %s" % mod_str)
for group, opts in _list_opts(mod_obj):
opts_by_group.setdefault(group, []).append((mod_str, opts))
print_group_opts('DEFAULT', opts_by_group.pop('DEFAULT', []))
for group in sorted(opts_by_group.keys()):
print_group_opts(group, opts_by_group[group])
def _import_module(mod_str):
try:
if mod_str.startswith('bin.'):
imp.load_source(mod_str[4:], os.path.join('bin', mod_str[4:]))
return sys.modules[mod_str[4:]]
else:
return importutils.import_module(mod_str)
except Exception as e:
sys.stderr.write("Error importing module %s: %s\n" % (mod_str, str(e)))
return None
def _is_in_group(opt, group):
"Check if opt is in group."
for value in group._opts.values():
# NOTE(llu): Temporary workaround for bug #1262148, wait until
# newly released oslo.config support '==' operator.
if not(value['opt'] != opt):
return True
return False
def _guess_groups(opt, mod_obj):
# is it in the DEFAULT group?
if _is_in_group(opt, cfg.CONF):
return 'DEFAULT'
# what other groups is it in?
for value in cfg.CONF.values():
if isinstance(value, cfg.CONF.GroupAttr):
if _is_in_group(opt, value._group):
return value._group.name
raise RuntimeError(
"Unable to find group for option %s, "
"maybe it's defined twice in the same group?"
% opt.name
)
def _list_opts(obj):
def is_opt(o):
return (isinstance(o, cfg.Opt) and
not isinstance(o, cfg.SubCommandOpt))
opts = list()
for attr_str in dir(obj):
attr_obj = getattr(obj, attr_str)
if is_opt(attr_obj):
opts.append(attr_obj)
elif (isinstance(attr_obj, list) and
all(map(lambda x: is_opt(x), attr_obj))):
opts.extend(attr_obj)
ret = {}
for opt in opts:
ret.setdefault(_guess_groups(opt, obj), []).append(opt)
return ret.items()
def print_group_opts(group, opts_by_module):
print("[%s]" % group)
print('')
for mod, opts in opts_by_module:
print('#')
print('# Options defined in %s' % mod)
print('#')
print('')
for opt in opts:
_print_opt(opt)
print('')
def _get_my_ip():
try:
csock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
csock.connect(('8.8.8.8', 80))
(addr, port) = csock.getsockname()
csock.close()
return addr
except socket.error:
return None
def _sanitize_default(name, value):
"""Set up a reasonably sensible default for pybasedir, my_ip and host."""
if value.startswith(sys.prefix):
# NOTE(jd) Don't use os.path.join, because it is likely to think the
# second part is an absolute pathname and therefore drop the first
# part.
value = os.path.normpath("/usr/" + value[len(sys.prefix):])
elif value.startswith(BASEDIR):
return value.replace(BASEDIR, '/usr/lib/python/site-packages')
elif BASEDIR in value:
return value.replace(BASEDIR, '')
elif value == _get_my_ip():
return '10.0.0.1'
elif value in (socket.gethostname(), socket.getfqdn()) and 'host' in name:
return 'kite'
elif value.strip() != value:
return '"%s"' % value
return value
def _print_opt(opt):
opt_name, opt_default, opt_help = opt.dest, opt.default, opt.help
if not opt_help:
sys.stderr.write('WARNING: "%s" is missing help string.\n' % opt_name)
opt_help = ""
opt_type = None
try:
opt_type = OPTION_REGEX.search(str(type(opt))).group(0)
except (ValueError, AttributeError) as err:
sys.stderr.write("%s\n" % str(err))
sys.exit(1)
opt_help = u'%s (%s)' % (opt_help,
OPT_TYPES[opt_type])
print('#', "\n# ".join(textwrap.wrap(opt_help, WORDWRAP_WIDTH)))
if opt.deprecated_opts:
for deprecated_opt in opt.deprecated_opts:
if deprecated_opt.name:
deprecated_group = (deprecated_opt.group if
deprecated_opt.group else "DEFAULT")
print('# Deprecated group/name - [%s]/%s' %
(deprecated_group,
deprecated_opt.name))
try:
if opt_default is None:
print('#%s=<None>' % opt_name)
elif opt_type == STROPT:
assert(isinstance(opt_default, six.string_types))
print('#%s=%s' % (opt_name, _sanitize_default(opt_name,
opt_default)))
elif opt_type == BOOLOPT:
assert(isinstance(opt_default, bool))
print('#%s=%s' % (opt_name, str(opt_default).lower()))
elif opt_type == INTOPT:
assert(isinstance(opt_default, int) and
not isinstance(opt_default, bool))
print('#%s=%s' % (opt_name, opt_default))
elif opt_type == FLOATOPT:
assert(isinstance(opt_default, float))
print('#%s=%s' % (opt_name, opt_default))
elif opt_type == LISTOPT:
assert(isinstance(opt_default, list))
print('#%s=%s' % (opt_name, ','.join(opt_default)))
elif opt_type == DICTOPT:
assert(isinstance(opt_default, dict))
opt_default_strlist = [str(key) + ':' + str(value)
for (key, value) in opt_default.items()]
print('#%s=%s' % (opt_name, ','.join(opt_default_strlist)))
elif opt_type == MULTISTROPT:
assert(isinstance(opt_default, list))
if not opt_default:
opt_default = ['']
for default in opt_default:
print('#%s=%s' % (opt_name, default))
print('')
except Exception:
sys.stderr.write('Error in option "%s"\n' % opt_name)
sys.exit(1)
def main():
generate(sys.argv[1:])
if __name__ == '__main__':
main()

View File

@ -0,0 +1,111 @@
# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Simple class that stores security context information in the web request.
Projects should subclass this class if they wish to enhance the request
context or provide additional information in their specific WSGI pipeline.
"""
import itertools
import uuid
def generate_request_id():
return 'req-%s' % str(uuid.uuid4())
class RequestContext(object):
"""Helper class to represent useful information about a request context.
Stores information about the security context under which the user
accesses the system, as well as additional request information.
"""
user_idt_format = '{user} {tenant} {domain} {user_domain} {p_domain}'
def __init__(self, auth_token=None, user=None, tenant=None, domain=None,
user_domain=None, project_domain=None, is_admin=False,
read_only=False, show_deleted=False, request_id=None,
instance_uuid=None):
self.auth_token = auth_token
self.user = user
self.tenant = tenant
self.domain = domain
self.user_domain = user_domain
self.project_domain = project_domain
self.is_admin = is_admin
self.read_only = read_only
self.show_deleted = show_deleted
self.instance_uuid = instance_uuid
if not request_id:
request_id = generate_request_id()
self.request_id = request_id
def to_dict(self):
user_idt = (
self.user_idt_format.format(user=self.user or '-',
tenant=self.tenant or '-',
domain=self.domain or '-',
user_domain=self.user_domain or '-',
p_domain=self.project_domain or '-'))
return {'user': self.user,
'tenant': self.tenant,
'domain': self.domain,
'user_domain': self.user_domain,
'project_domain': self.project_domain,
'is_admin': self.is_admin,
'read_only': self.read_only,
'show_deleted': self.show_deleted,
'auth_token': self.auth_token,
'request_id': self.request_id,
'instance_uuid': self.instance_uuid,
'user_identity': user_idt}
def get_admin_context(show_deleted=False):
context = RequestContext(None,
tenant=None,
is_admin=True,
show_deleted=show_deleted)
return context
def get_context_from_function_and_args(function, args, kwargs):
"""Find an arg of type RequestContext and return it.
This is useful in a couple of decorators where we don't
know much about the function we're wrapping.
"""
for arg in itertools.chain(kwargs.values(), args):
if isinstance(arg, RequestContext):
return arg
return None
def is_user_context(context):
"""Indicates if the request context is a normal user."""
if not context:
return False
if context.is_admin:
return False
if not context.user_id or not context.project_id:
return False
return True

View File

View File

@ -0,0 +1,162 @@
# Copyright (c) 2013 Rackspace Hosting
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Multiple DB API backend support.
A DB backend module should implement a method named 'get_backend' which
takes no arguments. The method can return any object that implements DB
API methods.
"""
import functools
import logging
import threading
import time
from kite.openstack.common.db import exception
from kite.openstack.common.gettextutils import _LE
from kite.openstack.common import importutils
LOG = logging.getLogger(__name__)
def safe_for_db_retry(f):
"""Enable db-retry for decorated function, if config option enabled."""
f.__dict__['enable_retry'] = True
return f
class wrap_db_retry(object):
"""Retry db.api methods, if DBConnectionError() raised
Retry decorated db.api methods. If we enabled `use_db_reconnect`
in config, this decorator will be applied to all db.api functions,
marked with @safe_for_db_retry decorator.
Decorator catchs DBConnectionError() and retries function in a
loop until it succeeds, or until maximum retries count will be reached.
"""
def __init__(self, retry_interval, max_retries, inc_retry_interval,
max_retry_interval):
super(wrap_db_retry, self).__init__()
self.retry_interval = retry_interval
self.max_retries = max_retries
self.inc_retry_interval = inc_retry_interval
self.max_retry_interval = max_retry_interval
def __call__(self, f):
@functools.wraps(f)
def wrapper(*args, **kwargs):
next_interval = self.retry_interval
remaining = self.max_retries
while True:
try:
return f(*args, **kwargs)
except exception.DBConnectionError as e:
if remaining == 0:
LOG.exception(_LE('DB exceeded retry limit.'))
raise exception.DBError(e)
if remaining != -1:
remaining -= 1
LOG.exception(_LE('DB connection error.'))
# NOTE(vsergeyev): We are using patched time module, so
# this effectively yields the execution
# context to another green thread.
time.sleep(next_interval)
if self.inc_retry_interval:
next_interval = min(
next_interval * 2,
self.max_retry_interval
)
return wrapper
class DBAPI(object):
def __init__(self, backend_name, backend_mapping=None, lazy=False,
**kwargs):
"""Initialize the chosen DB API backend.
:param backend_name: name of the backend to load
:type backend_name: str
:param backend_mapping: backend name -> module/class to load mapping
:type backend_mapping: dict
:param lazy: load the DB backend lazily on the first DB API method call
:type lazy: bool
Keyword arguments:
:keyword use_db_reconnect: retry DB transactions on disconnect or not
:type use_db_reconnect: bool
:keyword retry_interval: seconds between transaction retries
:type retry_interval: int
:keyword inc_retry_interval: increase retry interval or not
:type inc_retry_interval: bool
:keyword max_retry_interval: max interval value between retries
:type max_retry_interval: int
:keyword max_retries: max number of retries before an error is raised
:type max_retries: int
"""
self._backend = None
self._backend_name = backend_name
self._backend_mapping = backend_mapping or {}
self._lock = threading.Lock()
if not lazy:
self._load_backend()
self.use_db_reconnect = kwargs.get('use_db_reconnect', False)
self.retry_interval = kwargs.get('retry_interval', 1)
self.inc_retry_interval = kwargs.get('inc_retry_interval', True)
self.max_retry_interval = kwargs.get('max_retry_interval', 10)
self.max_retries = kwargs.get('max_retries', 20)
def _load_backend(self):
with self._lock:
if not self._backend:
# Import the untranslated name if we don't have a mapping
backend_path = self._backend_mapping.get(self._backend_name,
self._backend_name)
backend_mod = importutils.import_module(backend_path)
self._backend = backend_mod.get_backend()
def __getattr__(self, key):
if not self._backend:
self._load_backend()
attr = getattr(self._backend, key)
if not hasattr(attr, '__call__'):
return attr
# NOTE(vsergeyev): If `use_db_reconnect` option is set to True, retry
# DB API methods, decorated with @safe_for_db_retry
# on disconnect.
if self.use_db_reconnect and hasattr(attr, 'enable_retry'):
attr = wrap_db_retry(
retry_interval=self.retry_interval,
max_retries=self.max_retries,
inc_retry_interval=self.inc_retry_interval,
max_retry_interval=self.max_retry_interval)(attr)
return attr

View File

@ -0,0 +1,56 @@
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""DB related custom exceptions."""
import six
from kite.openstack.common.gettextutils import _
class DBError(Exception):
"""Wraps an implementation specific exception."""
def __init__(self, inner_exception=None):
self.inner_exception = inner_exception
super(DBError, self).__init__(six.text_type(inner_exception))
class DBDuplicateEntry(DBError):
"""Wraps an implementation specific exception."""
def __init__(self, columns=[], inner_exception=None):
self.columns = columns
super(DBDuplicateEntry, self).__init__(inner_exception)
class DBDeadlock(DBError):
def __init__(self, inner_exception=None):
super(DBDeadlock, self).__init__(inner_exception)
class DBInvalidUnicodeParameter(Exception):
message = _("Invalid Parameter: "
"Unicode is not supported by the current database.")
class DbMigrationError(DBError):
"""Wraps migration specific exception."""
def __init__(self, message=None):
super(DbMigrationError, self).__init__(message)
class DBConnectionError(DBError):
"""Wraps connection specific exception."""
pass

View File

@ -0,0 +1,171 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy
from oslo.config import cfg
database_opts = [
cfg.StrOpt('sqlite_db',
deprecated_group='DEFAULT',
default='kite.sqlite',
help='The file name to use with SQLite'),
cfg.BoolOpt('sqlite_synchronous',
deprecated_group='DEFAULT',
default=True,
help='If True, SQLite uses synchronous mode'),
cfg.StrOpt('backend',
default='sqlalchemy',
deprecated_name='db_backend',
deprecated_group='DEFAULT',
help='The backend to use for db'),
cfg.StrOpt('connection',
help='The SQLAlchemy connection string used to connect to the '
'database',
secret=True,
deprecated_opts=[cfg.DeprecatedOpt('sql_connection',
group='DEFAULT'),
cfg.DeprecatedOpt('sql_connection',
group='DATABASE'),
cfg.DeprecatedOpt('connection',
group='sql'), ]),
cfg.StrOpt('mysql_sql_mode',
default='TRADITIONAL',
help='The SQL mode to be used for MySQL sessions. '
'This option, including the default, overrides any '
'server-set SQL mode. To use whatever SQL mode '
'is set by the server configuration, '
'set this to no value. Example: mysql_sql_mode='),
cfg.IntOpt('idle_timeout',
default=3600,
deprecated_opts=[cfg.DeprecatedOpt('sql_idle_timeout',
group='DEFAULT'),
cfg.DeprecatedOpt('sql_idle_timeout',
group='DATABASE'),
cfg.DeprecatedOpt('idle_timeout',
group='sql')],
help='Timeout before idle sql connections are reaped'),
cfg.IntOpt('min_pool_size',
default=1,
deprecated_opts=[cfg.DeprecatedOpt('sql_min_pool_size',
group='DEFAULT'),
cfg.DeprecatedOpt('sql_min_pool_size',
group='DATABASE')],
help='Minimum number of SQL connections to keep open in a '
'pool'),
cfg.IntOpt('max_pool_size',
default=None,
deprecated_opts=[cfg.DeprecatedOpt('sql_max_pool_size',
group='DEFAULT'),
cfg.DeprecatedOpt('sql_max_pool_size',
group='DATABASE')],
help='Maximum number of SQL connections to keep open in a '
'pool'),
cfg.IntOpt('max_retries',
default=10,
deprecated_opts=[cfg.DeprecatedOpt('sql_max_retries',
group='DEFAULT'),
cfg.DeprecatedOpt('sql_max_retries',
group='DATABASE')],
help='Maximum db connection retries during startup. '
'(setting -1 implies an infinite retry count)'),
cfg.IntOpt('retry_interval',
default=10,
deprecated_opts=[cfg.DeprecatedOpt('sql_retry_interval',
group='DEFAULT'),
cfg.DeprecatedOpt('reconnect_interval',
group='DATABASE')],
help='Interval between retries of opening a sql connection'),
cfg.IntOpt('max_overflow',
default=None,
deprecated_opts=[cfg.DeprecatedOpt('sql_max_overflow',
group='DEFAULT'),
cfg.DeprecatedOpt('sqlalchemy_max_overflow',
group='DATABASE')],
help='If set, use this value for max_overflow with sqlalchemy'),
cfg.IntOpt('connection_debug',
default=0,
deprecated_opts=[cfg.DeprecatedOpt('sql_connection_debug',
group='DEFAULT')],
help='Verbosity of SQL debugging information. 0=None, '
'100=Everything'),
cfg.BoolOpt('connection_trace',
default=False,
deprecated_opts=[cfg.DeprecatedOpt('sql_connection_trace',
group='DEFAULT')],
help='Add python stack traces to SQL as comment strings'),
cfg.IntOpt('pool_timeout',
default=None,
deprecated_opts=[cfg.DeprecatedOpt('sqlalchemy_pool_timeout',
group='DATABASE')],
help='If set, use this value for pool_timeout with sqlalchemy'),
cfg.BoolOpt('use_db_reconnect',
default=False,
help='Enable the experimental use of database reconnect '
'on connection lost'),
cfg.IntOpt('db_retry_interval',
default=1,
help='seconds between db connection retries'),
cfg.BoolOpt('db_inc_retry_interval',
default=True,
help='Whether to increase interval between db connection '
'retries, up to db_max_retry_interval'),
cfg.IntOpt('db_max_retry_interval',
default=10,
help='max seconds between db connection retries, if '
'db_inc_retry_interval is enabled'),
cfg.IntOpt('db_max_retries',
default=20,
help='maximum db connection retries before error is raised. '
'(setting -1 implies an infinite retry count)'),
]
CONF = cfg.CONF
CONF.register_opts(database_opts, 'database')
def set_defaults(sql_connection, sqlite_db, max_pool_size=None,
max_overflow=None, pool_timeout=None):
"""Set defaults for configuration variables."""
cfg.set_defaults(database_opts,
connection=sql_connection,
sqlite_db=sqlite_db)
# Update the QueuePool defaults
if max_pool_size is not None:
cfg.set_defaults(database_opts,
max_pool_size=max_pool_size)
if max_overflow is not None:
cfg.set_defaults(database_opts,
max_overflow=max_overflow)
if pool_timeout is not None:
cfg.set_defaults(database_opts,
pool_timeout=pool_timeout)
def list_opts():
"""Returns a list of oslo.config options available in the library.
The returned list includes all oslo.config options which may be registered
at runtime by the library.
Each element of the list is a tuple. The first element is the name of the
group under which the list of elements in the second element will be
registered. A group name of None corresponds to the [DEFAULT] group in
config files.
The purpose of this is to allow tools like the Oslo sample config file
generator to discover the options exposed to users by this library.
:returns: a list of (group_name, opts) tuples
"""
return [('database', copy.deepcopy(database_opts))]

View File

@ -0,0 +1,268 @@
# coding: utf-8
#
# Copyright (c) 2013 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# Base on code in migrate/changeset/databases/sqlite.py which is under
# the following license:
#
# The MIT License
#
# Copyright (c) 2009 Evan Rosson, Jan Dittberner, Domen Kožar
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
import os
import re
from migrate.changeset import ansisql
from migrate.changeset.databases import sqlite
from migrate import exceptions as versioning_exceptions
from migrate.versioning import api as versioning_api
from migrate.versioning.repository import Repository
import sqlalchemy
from sqlalchemy.schema import UniqueConstraint
from kite.openstack.common.db import exception
from kite.openstack.common.gettextutils import _
def _get_unique_constraints(self, table):
"""Retrieve information about existing unique constraints of the table
This feature is needed for _recreate_table() to work properly.
Unfortunately, it's not available in sqlalchemy 0.7.x/0.8.x.
"""
data = table.metadata.bind.execute(
"""SELECT sql
FROM sqlite_master
WHERE
type='table' AND
name=:table_name""",
table_name=table.name
).fetchone()[0]
UNIQUE_PATTERN = "CONSTRAINT (\w+) UNIQUE \(([^\)]+)\)"
return [
UniqueConstraint(
*[getattr(table.columns, c.strip(' "')) for c in cols.split(",")],
name=name
)
for name, cols in re.findall(UNIQUE_PATTERN, data)
]
def _recreate_table(self, table, column=None, delta=None, omit_uniques=None):
"""Recreate the table properly
Unlike the corresponding original method of sqlalchemy-migrate this one
doesn't drop existing unique constraints when creating a new one.
"""
table_name = self.preparer.format_table(table)
# we remove all indexes so as not to have
# problems during copy and re-create
for index in table.indexes:
index.drop()
# reflect existing unique constraints
for uc in self._get_unique_constraints(table):
table.append_constraint(uc)
# omit given unique constraints when creating a new table if required
table.constraints = set([
cons for cons in table.constraints
if omit_uniques is None or cons.name not in omit_uniques
])
self.append('ALTER TABLE %s RENAME TO migration_tmp' % table_name)
self.execute()
insertion_string = self._modify_table(table, column, delta)
table.create(bind=self.connection)
self.append(insertion_string % {'table_name': table_name})
self.execute()
self.append('DROP TABLE migration_tmp')
self.execute()
def _visit_migrate_unique_constraint(self, *p, **k):
"""Drop the given unique constraint
The corresponding original method of sqlalchemy-migrate just
raises NotImplemented error
"""
self.recreate_table(p[0].table, omit_uniques=[p[0].name])
def patch_migrate():
"""A workaround for SQLite's inability to alter things
SQLite abilities to alter tables are very limited (please read
http://www.sqlite.org/lang_altertable.html for more details).
E. g. one can't drop a column or a constraint in SQLite. The
workaround for this is to recreate the original table omitting
the corresponding constraint (or column).
sqlalchemy-migrate library has recreate_table() method that
implements this workaround, but it does it wrong:
- information about unique constraints of a table
is not retrieved. So if you have a table with one
unique constraint and a migration adding another one
you will end up with a table that has only the
latter unique constraint, and the former will be lost
- dropping of unique constraints is not supported at all
The proper way to fix this is to provide a pull-request to
sqlalchemy-migrate, but the project seems to be dead. So we
can go on with monkey-patching of the lib at least for now.
"""
# this patch is needed to ensure that recreate_table() doesn't drop
# existing unique constraints of the table when creating a new one
helper_cls = sqlite.SQLiteHelper
helper_cls.recreate_table = _recreate_table
helper_cls._get_unique_constraints = _get_unique_constraints
# this patch is needed to be able to drop existing unique constraints
constraint_cls = sqlite.SQLiteConstraintDropper
constraint_cls.visit_migrate_unique_constraint = \
_visit_migrate_unique_constraint
constraint_cls.__bases__ = (ansisql.ANSIColumnDropper,
sqlite.SQLiteConstraintGenerator)
def db_sync(engine, abs_path, version=None, init_version=0):
"""Upgrade or downgrade a database.
Function runs the upgrade() or downgrade() functions in change scripts.
:param engine: SQLAlchemy engine instance for a given database
:param abs_path: Absolute path to migrate repository.
:param version: Database will upgrade/downgrade until this version.
If None - database will update to the latest
available version.
:param init_version: Initial database version
"""
if version is not None:
try:
version = int(version)
except ValueError:
raise exception.DbMigrationError(
message=_("version should be an integer"))
current_version = db_version(engine, abs_path, init_version)
repository = _find_migrate_repo(abs_path)
_db_schema_sanity_check(engine)
if version is None or version > current_version:
return versioning_api.upgrade(engine, repository, version)
else:
return versioning_api.downgrade(engine, repository,
version)
def _db_schema_sanity_check(engine):
"""Ensure all database tables were created with required parameters.
:param engine: SQLAlchemy engine instance for a given database
"""
if engine.name == 'mysql':
onlyutf8_sql = ('SELECT TABLE_NAME,TABLE_COLLATION '
'from information_schema.TABLES '
'where TABLE_SCHEMA=%s and '
'TABLE_COLLATION NOT LIKE "%%utf8%%"')
table_names = [res[0] for res in engine.execute(onlyutf8_sql,
engine.url.database)]
if len(table_names) > 0:
raise ValueError(_('Tables "%s" have non utf8 collation, '
'please make sure all tables are CHARSET=utf8'
) % ','.join(table_names))
def db_version(engine, abs_path, init_version):
"""Show the current version of the repository.
:param engine: SQLAlchemy engine instance for a given database
:param abs_path: Absolute path to migrate repository
:param version: Initial database version
"""
repository = _find_migrate_repo(abs_path)
try:
return versioning_api.db_version(engine, repository)
except versioning_exceptions.DatabaseNotControlledError:
meta = sqlalchemy.MetaData()
meta.reflect(bind=engine)
tables = meta.tables
if len(tables) == 0 or 'alembic_version' in tables:
db_version_control(engine, abs_path, version=init_version)
return versioning_api.db_version(engine, repository)
else:
raise exception.DbMigrationError(
message=_(
"The database is not under version control, but has "
"tables. Please stamp the current version of the schema "
"manually."))
def db_version_control(engine, abs_path, version=None):
"""Mark a database as under this repository's version control.
Once a database is under version control, schema changes should
only be done via change scripts in this repository.
:param engine: SQLAlchemy engine instance for a given database
:param abs_path: Absolute path to migrate repository
:param version: Initial database version
"""
repository = _find_migrate_repo(abs_path)
versioning_api.version_control(engine, repository, version)
return version
def _find_migrate_repo(abs_path):
"""Get the project's change script repository
:param abs_path: Absolute path to migrate repository
"""
if not os.path.exists(abs_path):
raise exception.DbMigrationError("Path %s not found" % abs_path)
return Repository(abs_path)

View File

@ -0,0 +1,119 @@
# Copyright (c) 2011 X.commerce, a business unit of eBay Inc.
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# Copyright 2011 Piston Cloud Computing, Inc.
# Copyright 2012 Cloudscaling Group, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
SQLAlchemy models.
"""
import six
from sqlalchemy import Column, Integer
from sqlalchemy import DateTime
from sqlalchemy.orm import object_mapper
from kite.openstack.common import timeutils
class ModelBase(six.Iterator):
"""Base class for models."""
__table_initialized__ = False
def save(self, session):
"""Save this object."""
# NOTE(boris-42): This part of code should be look like:
# session.add(self)
# session.flush()
# But there is a bug in sqlalchemy and eventlet that
# raises NoneType exception if there is no running
# transaction and rollback is called. As long as
# sqlalchemy has this bug we have to create transaction
# explicitly.
with session.begin(subtransactions=True):
session.add(self)
session.flush()
def __setitem__(self, key, value):
setattr(self, key, value)
def __getitem__(self, key):
return getattr(self, key)
def get(self, key, default=None):
return getattr(self, key, default)
@property
def _extra_keys(self):
"""Specifies custom fields
Subclasses can override this property to return a list
of custom fields that should be included in their dict
representation.
For reference check tests/db/sqlalchemy/test_models.py
"""
return []
def __iter__(self):
columns = dict(object_mapper(self).columns).keys()
# NOTE(russellb): Allow models to specify other keys that can be looked
# up, beyond the actual db columns. An example would be the 'name'
# property for an Instance.
columns.extend(self._extra_keys)
self._i = iter(columns)
return self
# In Python 3, __next__() has replaced next().
def __next__(self):
n = six.advance_iterator(self._i)
return n, getattr(self, n)
def next(self):
return self.__next__()
def update(self, values):
"""Make the model object behave like a dict."""
for k, v in six.iteritems(values):
setattr(self, k, v)
def iteritems(self):
"""Make the model object behave like a dict.
Includes attributes from joins.
"""
local = dict(self)
joined = dict([(k, v) for k, v in six.iteritems(self.__dict__)
if not k[0] == '_'])
local.update(joined)
return six.iteritems(local)
class TimestampMixin(object):
created_at = Column(DateTime, default=lambda: timeutils.utcnow())
updated_at = Column(DateTime, onupdate=lambda: timeutils.utcnow())
class SoftDeleteMixin(object):
deleted_at = Column(DateTime)
deleted = Column(Integer, default=0)
def soft_delete(self, session):
"""Mark this object as deleted."""
self.deleted = self.id
self.deleted_at = timeutils.utcnow()
self.save(session=session)

View File

@ -0,0 +1,157 @@
# Copyright 2013 Mirantis.inc
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Provision test environment for specific DB backends"""
import argparse
import logging
import os
import random
import string
from six import moves
import sqlalchemy
from kite.openstack.common.db import exception as exc
LOG = logging.getLogger(__name__)
def get_engine(uri):
"""Engine creation
Call the function without arguments to get admin connection. Admin
connection required to create temporary user and database for each
particular test. Otherwise use existing connection to recreate connection
to the temporary database.
"""
return sqlalchemy.create_engine(uri, poolclass=sqlalchemy.pool.NullPool)
def _execute_sql(engine, sql, driver):
"""Initialize connection, execute sql query and close it."""
try:
with engine.connect() as conn:
if driver == 'postgresql':
conn.connection.set_isolation_level(0)
for s in sql:
conn.execute(s)
except sqlalchemy.exc.OperationalError:
msg = ('%s does not match database admin '
'credentials or database does not exist.')
LOG.exception(msg % engine.url)
raise exc.DBConnectionError(msg % engine.url)
def create_database(engine):
"""Provide temporary user and database for each particular test."""
driver = engine.name
auth = {
'database': ''.join(random.choice(string.ascii_lowercase)
for i in moves.range(10)),
'user': engine.url.username,
'passwd': engine.url.password,
}
sqls = [
"drop database if exists %(database)s;",
"create database %(database)s;"
]
if driver == 'sqlite':
return 'sqlite:////tmp/%s' % auth['database']
elif driver in ['mysql', 'postgresql']:
sql_query = map(lambda x: x % auth, sqls)
_execute_sql(engine, sql_query, driver)
else:
raise ValueError('Unsupported RDBMS %s' % driver)
params = auth.copy()
params['backend'] = driver
return "%(backend)s://%(user)s:%(passwd)s@localhost/%(database)s" % params
def drop_database(admin_engine, current_uri):
"""Drop temporary database and user after each particular test."""
engine = get_engine(current_uri)
driver = engine.name
auth = {'database': engine.url.database, 'user': engine.url.username}
if driver == 'sqlite':
try:
os.remove(auth['database'])
except OSError:
pass
elif driver in ['mysql', 'postgresql']:
sql = "drop database if exists %(database)s;"
_execute_sql(admin_engine, [sql % auth], driver)
else:
raise ValueError('Unsupported RDBMS %s' % driver)
def main():
"""Controller to handle commands
::create: Create test user and database with random names.
::drop: Drop user and database created by previous command.
"""
parser = argparse.ArgumentParser(
description='Controller to handle database creation and dropping'
' commands.',
epilog='Under normal circumstances is not used directly.'
' Used in .testr.conf to automate test database creation'
' and dropping processes.')
subparsers = parser.add_subparsers(
help='Subcommands to manipulate temporary test databases.')
create = subparsers.add_parser(
'create',
help='Create temporary test '
'databases and users.')
create.set_defaults(which='create')
create.add_argument(
'instances_count',
type=int,
help='Number of databases to create.')
drop = subparsers.add_parser(
'drop',
help='Drop temporary test databases and users.')
drop.set_defaults(which='drop')
drop.add_argument(
'instances',
nargs='+',
help='List of databases uri to be dropped.')
args = parser.parse_args()
connection_string = os.getenv('OS_TEST_DBAPI_ADMIN_CONNECTION',
'sqlite://')
engine = get_engine(connection_string)
which = args.which
if which == "create":
for i in range(int(args.instances_count)):
print(create_database(engine))
elif which == "drop":
for db in args.instances:
drop_database(engine, db)
if __name__ == "__main__":
main()

View File

@ -0,0 +1,897 @@
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Session Handling for SQLAlchemy backend.
Recommended ways to use sessions within this framework:
* Don't use them explicitly; this is like running with ``AUTOCOMMIT=1``.
`model_query()` will implicitly use a session when called without one
supplied. This is the ideal situation because it will allow queries
to be automatically retried if the database connection is interrupted.
.. note:: Automatic retry will be enabled in a future patch.
It is generally fine to issue several queries in a row like this. Even though
they may be run in separate transactions and/or separate sessions, each one
will see the data from the prior calls. If needed, undo- or rollback-like
functionality should be handled at a logical level. For an example, look at
the code around quotas and `reservation_rollback()`.
Examples:
.. code:: python
def get_foo(context, foo):
return (model_query(context, models.Foo).
filter_by(foo=foo).
first())
def update_foo(context, id, newfoo):
(model_query(context, models.Foo).
filter_by(id=id).
update({'foo': newfoo}))
def create_foo(context, values):
foo_ref = models.Foo()
foo_ref.update(values)
foo_ref.save()
return foo_ref
* Within the scope of a single method, keep all the reads and writes within
the context managed by a single session. In this way, the session's
`__exit__` handler will take care of calling `flush()` and `commit()` for
you. If using this approach, you should not explicitly call `flush()` or
`commit()`. Any error within the context of the session will cause the
session to emit a `ROLLBACK`. Database errors like `IntegrityError` will be
raised in `session`'s `__exit__` handler, and any try/except within the
context managed by `session` will not be triggered. And catching other
non-database errors in the session will not trigger the ROLLBACK, so
exception handlers should always be outside the session, unless the
developer wants to do a partial commit on purpose. If the connection is
dropped before this is possible, the database will implicitly roll back the
transaction.
.. note:: Statements in the session scope will not be automatically retried.
If you create models within the session, they need to be added, but you
do not need to call `model.save()`:
.. code:: python
def create_many_foo(context, foos):
session = sessionmaker()
with session.begin():
for foo in foos:
foo_ref = models.Foo()
foo_ref.update(foo)
session.add(foo_ref)
def update_bar(context, foo_id, newbar):
session = sessionmaker()
with session.begin():
foo_ref = (model_query(context, models.Foo, session).
filter_by(id=foo_id).
first())
(model_query(context, models.Bar, session).
filter_by(id=foo_ref['bar_id']).
update({'bar': newbar}))
.. note:: `update_bar` is a trivially simple example of using
``with session.begin``. Whereas `create_many_foo` is a good example of
when a transaction is needed, it is always best to use as few queries as
possible.
The two queries in `update_bar` can be better expressed using a single query
which avoids the need for an explicit transaction. It can be expressed like
so:
.. code:: python
def update_bar(context, foo_id, newbar):
subq = (model_query(context, models.Foo.id).
filter_by(id=foo_id).
limit(1).
subquery())
(model_query(context, models.Bar).
filter_by(id=subq.as_scalar()).
update({'bar': newbar}))
For reference, this emits approximately the following SQL statement:
.. code:: sql
UPDATE bar SET bar = ${newbar}
WHERE id=(SELECT bar_id FROM foo WHERE id = ${foo_id} LIMIT 1);
.. note:: `create_duplicate_foo` is a trivially simple example of catching an
exception while using ``with session.begin``. Here create two duplicate
instances with same primary key, must catch the exception out of context
managed by a single session:
.. code:: python
def create_duplicate_foo(context):
foo1 = models.Foo()
foo2 = models.Foo()
foo1.id = foo2.id = 1
session = sessionmaker()
try:
with session.begin():
session.add(foo1)
session.add(foo2)
except exception.DBDuplicateEntry as e:
handle_error(e)
* Passing an active session between methods. Sessions should only be passed
to private methods. The private method must use a subtransaction; otherwise
SQLAlchemy will throw an error when you call `session.begin()` on an existing
transaction. Public methods should not accept a session parameter and should
not be involved in sessions within the caller's scope.
Note that this incurs more overhead in SQLAlchemy than the above means
due to nesting transactions, and it is not possible to implicitly retry
failed database operations when using this approach.
This also makes code somewhat more difficult to read and debug, because a
single database transaction spans more than one method. Error handling
becomes less clear in this situation. When this is needed for code clarity,
it should be clearly documented.
.. code:: python
def myfunc(foo):
session = sessionmaker()
with session.begin():
# do some database things
bar = _private_func(foo, session)
return bar
def _private_func(foo, session=None):
if not session:
session = sessionmaker()
with session.begin(subtransaction=True):
# do some other database things
return bar
There are some things which it is best to avoid:
* Don't keep a transaction open any longer than necessary.
This means that your ``with session.begin()`` block should be as short
as possible, while still containing all the related calls for that
transaction.
* Avoid ``with_lockmode('UPDATE')`` when possible.
In MySQL/InnoDB, when a ``SELECT ... FOR UPDATE`` query does not match
any rows, it will take a gap-lock. This is a form of write-lock on the
"gap" where no rows exist, and prevents any other writes to that space.
This can effectively prevent any INSERT into a table by locking the gap
at the end of the index. Similar problems will occur if the SELECT FOR UPDATE
has an overly broad WHERE clause, or doesn't properly use an index.
One idea proposed at ODS Fall '12 was to use a normal SELECT to test the
number of rows matching a query, and if only one row is returned,
then issue the SELECT FOR UPDATE.
The better long-term solution is to use
``INSERT .. ON DUPLICATE KEY UPDATE``.
However, this can not be done until the "deleted" columns are removed and
proper UNIQUE constraints are added to the tables.
Enabling soft deletes:
* To use/enable soft-deletes, the `SoftDeleteMixin` must be added
to your model class. For example:
.. code:: python
class NovaBase(models.SoftDeleteMixin, models.ModelBase):
pass
Efficient use of soft deletes:
* There are two possible ways to mark a record as deleted:
`model.soft_delete()` and `query.soft_delete()`.
The `model.soft_delete()` method works with a single already-fetched entry.
`query.soft_delete()` makes only one db request for all entries that
correspond to the query.
* In almost all cases you should use `query.soft_delete()`. Some examples:
.. code:: python
def soft_delete_bar():
count = model_query(BarModel).find(some_condition).soft_delete()
if count == 0:
raise Exception("0 entries were soft deleted")
def complex_soft_delete_with_synchronization_bar(session=None):
if session is None:
session = sessionmaker()
with session.begin(subtransactions=True):
count = (model_query(BarModel).
find(some_condition).
soft_delete(synchronize_session=True))
# Here synchronize_session is required, because we
# don't know what is going on in outer session.
if count == 0:
raise Exception("0 entries were soft deleted")
* There is only one situation where `model.soft_delete()` is appropriate: when
you fetch a single record, work with it, and mark it as deleted in the same
transaction.
.. code:: python
def soft_delete_bar_model():
session = sessionmaker()
with session.begin():
bar_ref = model_query(BarModel).find(some_condition).first()
# Work with bar_ref
bar_ref.soft_delete(session=session)
However, if you need to work with all entries that correspond to query and
then soft delete them you should use the `query.soft_delete()` method:
.. code:: python
def soft_delete_multi_models():
session = sessionmaker()
with session.begin():
query = (model_query(BarModel, session=session).
find(some_condition))
model_refs = query.all()
# Work with model_refs
query.soft_delete(synchronize_session=False)
# synchronize_session=False should be set if there is no outer
# session and these entries are not used after this.
When working with many rows, it is very important to use query.soft_delete,
which issues a single query. Using `model.soft_delete()`, as in the following
example, is very inefficient.
.. code:: python
for bar_ref in bar_refs:
bar_ref.soft_delete(session=session)
# This will produce count(bar_refs) db requests.
"""
import functools
import logging
import re
import time
import six
from sqlalchemy import exc as sqla_exc
from sqlalchemy.interfaces import PoolListener
import sqlalchemy.orm
from sqlalchemy.pool import NullPool, StaticPool
from sqlalchemy.sql.expression import literal_column
from kite.openstack.common.db import exception
from kite.openstack.common.gettextutils import _LE, _LW
from kite.openstack.common import timeutils
LOG = logging.getLogger(__name__)
class SqliteForeignKeysListener(PoolListener):
"""Ensures that the foreign key constraints are enforced in SQLite.
The foreign key constraints are disabled by default in SQLite,
so the foreign key constraints will be enabled here for every
database connection
"""
def connect(self, dbapi_con, con_record):
dbapi_con.execute('pragma foreign_keys=ON')
# note(boris-42): In current versions of DB backends unique constraint
# violation messages follow the structure:
#
# sqlite:
# 1 column - (IntegrityError) column c1 is not unique
# N columns - (IntegrityError) column c1, c2, ..., N are not unique
#
# sqlite since 3.7.16:
# 1 column - (IntegrityError) UNIQUE constraint failed: tbl.k1
#
# N columns - (IntegrityError) UNIQUE constraint failed: tbl.k1, tbl.k2
#
# postgres:
# 1 column - (IntegrityError) duplicate key value violates unique
# constraint "users_c1_key"
# N columns - (IntegrityError) duplicate key value violates unique
# constraint "name_of_our_constraint"
#
# mysql:
# 1 column - (IntegrityError) (1062, "Duplicate entry 'value_of_c1' for key
# 'c1'")
# N columns - (IntegrityError) (1062, "Duplicate entry 'values joined
# with -' for key 'name_of_our_constraint'")
#
# ibm_db_sa:
# N columns - (IntegrityError) SQL0803N One or more values in the INSERT
# statement, UPDATE statement, or foreign key update caused by a
# DELETE statement are not valid because the primary key, unique
# constraint or unique index identified by "2" constrains table
# "NOVA.KEY_PAIRS" from having duplicate values for the index
# key.
_DUP_KEY_RE_DB = {
"sqlite": (re.compile(r"^.*columns?([^)]+)(is|are)\s+not\s+unique$"),
re.compile(r"^.*UNIQUE\s+constraint\s+failed:\s+(.+)$")),
"postgresql": (re.compile(r"^.*duplicate\s+key.*\"([^\"]+)\"\s*\n.*$"),),
"mysql": (re.compile(r"^.*\(1062,.*'([^\']+)'\"\)$"),),
"ibm_db_sa": (re.compile(r"^.*SQL0803N.*$"),),
}
def _raise_if_duplicate_entry_error(integrity_error, engine_name):
"""Raise exception if two entries are duplicated.
In this function will be raised DBDuplicateEntry exception if integrity
error wrap unique constraint violation.
"""
def get_columns_from_uniq_cons_or_name(columns):
# note(vsergeyev): UniqueConstraint name convention: "uniq_t0c10c2"
# where `t` it is table name and columns `c1`, `c2`
# are in UniqueConstraint.
uniqbase = "uniq_"
if not columns.startswith(uniqbase):
if engine_name == "postgresql":
return [columns[columns.index("_") + 1:columns.rindex("_")]]
return [columns]
return columns[len(uniqbase):].split("0")[1:]
if engine_name not in ["ibm_db_sa", "mysql", "sqlite", "postgresql"]:
return
# FIXME(johannes): The usage of the .message attribute has been
# deprecated since Python 2.6. However, the exceptions raised by
# SQLAlchemy can differ when using unicode() and accessing .message.
# An audit across all three supported engines will be necessary to
# ensure there are no regressions.
for pattern in _DUP_KEY_RE_DB[engine_name]:
match = pattern.match(integrity_error.message)
if match:
break
else:
return
# NOTE(mriedem): The ibm_db_sa integrity error message doesn't provide the
# columns so we have to omit that from the DBDuplicateEntry error.
columns = ''
if engine_name != 'ibm_db_sa':
columns = match.group(1)
if engine_name == "sqlite":
columns = [c.split('.')[-1] for c in columns.strip().split(", ")]
else:
columns = get_columns_from_uniq_cons_or_name(columns)
raise exception.DBDuplicateEntry(columns, integrity_error)
# NOTE(comstud): In current versions of DB backends, Deadlock violation
# messages follow the structure:
#
# mysql:
# (OperationalError) (1213, 'Deadlock found when trying to get lock; try '
# 'restarting transaction') <query_str> <query_args>
_DEADLOCK_RE_DB = {
"mysql": re.compile(r"^.*\(1213, 'Deadlock.*")
}
def _raise_if_deadlock_error(operational_error, engine_name):
"""Raise exception on deadlock condition.
Raise DBDeadlock exception if OperationalError contains a Deadlock
condition.
"""
re = _DEADLOCK_RE_DB.get(engine_name)
if re is None:
return
# FIXME(johannes): The usage of the .message attribute has been
# deprecated since Python 2.6. However, the exceptions raised by
# SQLAlchemy can differ when using unicode() and accessing .message.
# An audit across all three supported engines will be necessary to
# ensure there are no regressions.
m = re.match(operational_error.message)
if not m:
return
raise exception.DBDeadlock(operational_error)
def _wrap_db_error(f):
@functools.wraps(f)
def _wrap(self, *args, **kwargs):
try:
assert issubclass(
self.__class__, sqlalchemy.orm.session.Session
), ('_wrap_db_error() can only be applied to methods of '
'subclasses of sqlalchemy.orm.session.Session.')
return f(self, *args, **kwargs)
except UnicodeEncodeError:
raise exception.DBInvalidUnicodeParameter()
except sqla_exc.OperationalError as e:
_raise_if_db_connection_lost(e, self.bind)
_raise_if_deadlock_error(e, self.bind.dialect.name)
# NOTE(comstud): A lot of code is checking for OperationalError
# so let's not wrap it for now.
raise
# note(boris-42): We should catch unique constraint violation and
# wrap it by our own DBDuplicateEntry exception. Unique constraint
# violation is wrapped by IntegrityError.
except sqla_exc.IntegrityError as e:
# note(boris-42): SqlAlchemy doesn't unify errors from different
# DBs so we must do this. Also in some tables (for example
# instance_types) there are more than one unique constraint. This
# means we should get names of columns, which values violate
# unique constraint, from error message.
_raise_if_duplicate_entry_error(e, self.bind.dialect.name)
raise exception.DBError(e)
except Exception as e:
LOG.exception(_LE('DB exception wrapped.'))
raise exception.DBError(e)
return _wrap
def _synchronous_switch_listener(dbapi_conn, connection_rec):
"""Switch sqlite connections to non-synchronous mode."""
dbapi_conn.execute("PRAGMA synchronous = OFF")
def _add_regexp_listener(dbapi_con, con_record):
"""Add REGEXP function to sqlite connections."""
def regexp(expr, item):
reg = re.compile(expr)
return reg.search(six.text_type(item)) is not None
dbapi_con.create_function('regexp', 2, regexp)
def _thread_yield(dbapi_con, con_record):
"""Ensure other greenthreads get a chance to be executed.
If we use eventlet.monkey_patch(), eventlet.greenthread.sleep(0) will
execute instead of time.sleep(0).
Force a context switch. With common database backends (eg MySQLdb and
sqlite), there is no implicit yield caused by network I/O since they are
implemented by C libraries that eventlet cannot monkey patch.
"""
time.sleep(0)
def _ping_listener(engine, dbapi_conn, connection_rec, connection_proxy):
"""Ensures that MySQL and DB2 connections are alive.
Borrowed from:
http://groups.google.com/group/sqlalchemy/msg/a4ce563d802c929f
"""
cursor = dbapi_conn.cursor()
try:
ping_sql = 'select 1'
if engine.name == 'ibm_db_sa':
# DB2 requires a table expression
ping_sql = 'select 1 from (values (1)) AS t1'
cursor.execute(ping_sql)
except Exception as ex:
if engine.dialect.is_disconnect(ex, dbapi_conn, cursor):
msg = _LW('Database server has gone away: %s') % ex
LOG.warning(msg)
raise sqla_exc.DisconnectionError(msg)
else:
raise
def _set_session_sql_mode(dbapi_con, connection_rec,
connection_proxy, sql_mode=None):
"""Set the sql_mode session variable.
MySQL supports several server modes. The default is None, but sessions
may choose to enable server modes like TRADITIONAL, ANSI,
several STRICT_* modes and others.
Note: passing in '' (empty string) for sql_mode clears
the SQL mode for the session, overriding a potentially set
server default.
"""
cursor = dbapi_con.cursor()
cursor.execute("SET SESSION sql_mode = %s", [sql_mode])
def _mysql_get_effective_sql_mode(engine):
"""Returns the effective SQL mode for connections from the engine pool.
Returns ``None`` if the mode isn't available, otherwise returns the mode.
"""
# Get the real effective SQL mode. Even when unset by
# our own config, the server may still be operating in a specific
# SQL mode as set by the server configuration.
# Also note that the checkout listener will be called on execute to
# set the mode if it's registered.
row = engine.execute("SHOW VARIABLES LIKE 'sql_mode'").fetchone()
if row is None:
return
return row[1]
def _mysql_check_effective_sql_mode(engine):
"""Logs a message based on the effective SQL mode for MySQL connections."""
realmode = _mysql_get_effective_sql_mode(engine)
if realmode is None:
LOG.warning(_LW('Unable to detect effective SQL mode'))
return
LOG.debug('MySQL server mode set to %s', realmode)
# 'TRADITIONAL' mode enables several other modes, so
# we need a substring match here
if not ('TRADITIONAL' in realmode.upper() or
'STRICT_ALL_TABLES' in realmode.upper()):
LOG.warning(_LW("MySQL SQL mode is '%s', "
"consider enabling TRADITIONAL or STRICT_ALL_TABLES"),
realmode)
def _mysql_set_mode_callback(engine, sql_mode):
if sql_mode is not None:
mode_callback = functools.partial(_set_session_sql_mode,
sql_mode=sql_mode)
sqlalchemy.event.listen(engine, 'checkout', mode_callback)
_mysql_check_effective_sql_mode(engine)
def _is_db_connection_error(args):
"""Return True if error in connecting to db."""
# NOTE(adam_g): This is currently MySQL specific and needs to be extended
# to support Postgres and others.
# For the db2, the error code is -30081 since the db2 is still not ready
conn_err_codes = ('2002', '2003', '2006', '2013', '-30081')
for err_code in conn_err_codes:
if args.find(err_code) != -1:
return True
return False
def _raise_if_db_connection_lost(error, engine):
# NOTE(vsergeyev): Function is_disconnect(e, connection, cursor)
# requires connection and cursor in incoming parameters,
# but we have no possibility to create connection if DB
# is not available, so in such case reconnect fails.
# But is_disconnect() ignores these parameters, so it
# makes sense to pass to function None as placeholder
# instead of connection and cursor.
if engine.dialect.is_disconnect(error, None, None):
raise exception.DBConnectionError(error)
def create_engine(sql_connection, sqlite_fk=False, mysql_sql_mode=None,
idle_timeout=3600,
connection_debug=0, max_pool_size=None, max_overflow=None,
pool_timeout=None, sqlite_synchronous=True,
connection_trace=False, max_retries=10, retry_interval=10):
"""Return a new SQLAlchemy engine."""
connection_dict = sqlalchemy.engine.url.make_url(sql_connection)
engine_args = {
"pool_recycle": idle_timeout,
'convert_unicode': True,
}
logger = logging.getLogger('sqlalchemy.engine')
# Map SQL debug level to Python log level
if connection_debug >= 100:
logger.setLevel(logging.DEBUG)
elif connection_debug >= 50:
logger.setLevel(logging.INFO)
else:
logger.setLevel(logging.WARNING)
if "sqlite" in connection_dict.drivername:
if sqlite_fk:
engine_args["listeners"] = [SqliteForeignKeysListener()]
engine_args["poolclass"] = NullPool
if sql_connection == "sqlite://":
engine_args["poolclass"] = StaticPool
engine_args["connect_args"] = {'check_same_thread': False}
else:
if max_pool_size is not None:
engine_args['pool_size'] = max_pool_size
if max_overflow is not None:
engine_args['max_overflow'] = max_overflow
if pool_timeout is not None:
engine_args['pool_timeout'] = pool_timeout
engine = sqlalchemy.create_engine(sql_connection, **engine_args)
sqlalchemy.event.listen(engine, 'checkin', _thread_yield)
if engine.name in ['mysql', 'ibm_db_sa']:
ping_callback = functools.partial(_ping_listener, engine)
sqlalchemy.event.listen(engine, 'checkout', ping_callback)
if engine.name == 'mysql':
if mysql_sql_mode:
_mysql_set_mode_callback(engine, mysql_sql_mode)
elif 'sqlite' in connection_dict.drivername:
if not sqlite_synchronous:
sqlalchemy.event.listen(engine, 'connect',
_synchronous_switch_listener)
sqlalchemy.event.listen(engine, 'connect', _add_regexp_listener)
if connection_trace and engine.dialect.dbapi.__name__ == 'MySQLdb':
_patch_mysqldb_with_stacktrace_comments()
try:
engine.connect()
except sqla_exc.OperationalError as e:
if not _is_db_connection_error(e.args[0]):
raise
remaining = max_retries
if remaining == -1:
remaining = 'infinite'
while True:
msg = _LW('SQL connection failed. %s attempts left.')
LOG.warning(msg % remaining)
if remaining != 'infinite':
remaining -= 1
time.sleep(retry_interval)
try:
engine.connect()
break
except sqla_exc.OperationalError as e:
if (remaining != 'infinite' and remaining == 0) or \
not _is_db_connection_error(e.args[0]):
raise
return engine
class Query(sqlalchemy.orm.query.Query):
"""Subclass of sqlalchemy.query with soft_delete() method."""
def soft_delete(self, synchronize_session='evaluate'):
return self.update({'deleted': literal_column('id'),
'updated_at': literal_column('updated_at'),
'deleted_at': timeutils.utcnow()},
synchronize_session=synchronize_session)
class Session(sqlalchemy.orm.session.Session):
"""Custom Session class to avoid SqlAlchemy Session monkey patching."""
@_wrap_db_error
def query(self, *args, **kwargs):
return super(Session, self).query(*args, **kwargs)
@_wrap_db_error
def flush(self, *args, **kwargs):
return super(Session, self).flush(*args, **kwargs)
@_wrap_db_error
def execute(self, *args, **kwargs):
return super(Session, self).execute(*args, **kwargs)
def get_maker(engine, autocommit=True, expire_on_commit=False):
"""Return a SQLAlchemy sessionmaker using the given engine."""
return sqlalchemy.orm.sessionmaker(bind=engine,
class_=Session,
autocommit=autocommit,
expire_on_commit=expire_on_commit,
query_cls=Query)
def _patch_mysqldb_with_stacktrace_comments():
"""Adds current stack trace as a comment in queries.
Patches MySQLdb.cursors.BaseCursor._do_query.
"""
import MySQLdb.cursors
import traceback
old_mysql_do_query = MySQLdb.cursors.BaseCursor._do_query
def _do_query(self, q):
stack = ''
for filename, line, method, function in traceback.extract_stack():
# exclude various common things from trace
if filename.endswith('session.py') and method == '_do_query':
continue
if filename.endswith('api.py') and method == 'wrapper':
continue
if filename.endswith('utils.py') and method == '_inner':
continue
if filename.endswith('exception.py') and method == '_wrap':
continue
# db/api is just a wrapper around db/sqlalchemy/api
if filename.endswith('db/api.py'):
continue
# only trace inside kite
index = filename.rfind('kite')
if index == -1:
continue
stack += "File:%s:%s Method:%s() Line:%s | " \
% (filename[index:], line, method, function)
# strip trailing " | " from stack
if stack:
stack = stack[:-3]
qq = "%s /* %s */" % (q, stack)
else:
qq = q
old_mysql_do_query(self, qq)
setattr(MySQLdb.cursors.BaseCursor, '_do_query', _do_query)
class EngineFacade(object):
"""A helper class for removing of global engine instances from kite.db.
As a library, kite.db can't decide where to store/when to create engine
and sessionmaker instances, so this must be left for a target application.
On the other hand, in order to simplify the adoption of kite.db changes,
we'll provide a helper class, which creates engine and sessionmaker
on its instantiation and provides get_engine()/get_session() methods
that are compatible with corresponding utility functions that currently
exist in target projects, e.g. in Nova.
engine/sessionmaker instances will still be global (and they are meant to
be global), but they will be stored in the app context, rather that in the
kite.db context.
Note: using of this helper is completely optional and you are encouraged to
integrate engine/sessionmaker instances into your apps any way you like
(e.g. one might want to bind a session to a request context). Two important
things to remember:
1. An Engine instance is effectively a pool of DB connections, so it's
meant to be shared (and it's thread-safe).
2. A Session instance is not meant to be shared and represents a DB
transactional context (i.e. it's not thread-safe). sessionmaker is
a factory of sessions.
"""
def __init__(self, sql_connection,
sqlite_fk=False, autocommit=True,
expire_on_commit=False, **kwargs):
"""Initialize engine and sessionmaker instances.
:param sqlite_fk: enable foreign keys in SQLite
:type sqlite_fk: bool
:param autocommit: use autocommit mode for created Session instances
:type autocommit: bool
:param expire_on_commit: expire session objects on commit
:type expire_on_commit: bool
Keyword arguments:
:keyword mysql_sql_mode: the SQL mode to be used for MySQL sessions.
(defaults to TRADITIONAL)
:keyword idle_timeout: timeout before idle sql connections are reaped
(defaults to 3600)
:keyword connection_debug: verbosity of SQL debugging information.
0=None, 100=Everything (defaults to 0)
:keyword max_pool_size: maximum number of SQL connections to keep open
in a pool (defaults to SQLAlchemy settings)
:keyword max_overflow: if set, use this value for max_overflow with
sqlalchemy (defaults to SQLAlchemy settings)
:keyword pool_timeout: if set, use this value for pool_timeout with
sqlalchemy (defaults to SQLAlchemy settings)
:keyword sqlite_synchronous: if True, SQLite uses synchronous mode
(defaults to True)
:keyword connection_trace: add python stack traces to SQL as comment
strings (defaults to False)
:keyword max_retries: maximum db connection retries during startup.
(setting -1 implies an infinite retry count)
(defaults to 10)
:keyword retry_interval: interval between retries of opening a sql
connection (defaults to 10)
"""
super(EngineFacade, self).__init__()
self._engine = create_engine(
sql_connection=sql_connection,
sqlite_fk=sqlite_fk,
mysql_sql_mode=kwargs.get('mysql_sql_mode', 'TRADITIONAL'),
idle_timeout=kwargs.get('idle_timeout', 3600),
connection_debug=kwargs.get('connection_debug', 0),
max_pool_size=kwargs.get('max_pool_size'),
max_overflow=kwargs.get('max_overflow'),
pool_timeout=kwargs.get('pool_timeout'),
sqlite_synchronous=kwargs.get('sqlite_synchronous', True),
connection_trace=kwargs.get('connection_trace', False),
max_retries=kwargs.get('max_retries', 10),
retry_interval=kwargs.get('retry_interval', 10))
self._session_maker = get_maker(
engine=self._engine,
autocommit=autocommit,
expire_on_commit=expire_on_commit)
def get_engine(self):
"""Get the engine instance (note, that it's shared)."""
return self._engine
def get_session(self, **kwargs):
"""Get a Session instance.
If passed, keyword arguments values override the ones used when the
sessionmaker instance was created.
:keyword autocommit: use autocommit mode for created Session instances
:type autocommit: bool
:keyword expire_on_commit: expire session objects on commit
:type expire_on_commit: bool
"""
for arg in kwargs:
if arg not in ('autocommit', 'expire_on_commit'):
del kwargs[arg]
return self._session_maker(**kwargs)
@classmethod
def from_config(cls, connection_string, conf,
sqlite_fk=False, autocommit=True, expire_on_commit=False):
"""Initialize EngineFacade using oslo.config config instance options.
:param connection_string: SQLAlchemy connection string
:type connection_string: string
:param conf: oslo.config config instance
:type conf: oslo.config.cfg.ConfigOpts
:param sqlite_fk: enable foreign keys in SQLite
:type sqlite_fk: bool
:param autocommit: use autocommit mode for created Session instances
:type autocommit: bool
:param expire_on_commit: expire session objects on commit
:type expire_on_commit: bool
"""
return cls(sql_connection=connection_string,
sqlite_fk=sqlite_fk,
autocommit=autocommit,
expire_on_commit=expire_on_commit,
**dict(conf.database.items()))

View File

@ -0,0 +1,153 @@
# Copyright (c) 2013 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import functools
import os
import fixtures
import six
from kite.openstack.common.db.sqlalchemy import session
from kite.openstack.common.db.sqlalchemy import utils
from kite.openstack.common.fixture import lockutils
from kite.openstack.common import test
class DbFixture(fixtures.Fixture):
"""Basic database fixture.
Allows to run tests on various db backends, such as SQLite, MySQL and
PostgreSQL. By default use sqlite backend. To override default backend
uri set env variable OS_TEST_DBAPI_CONNECTION with database admin
credentials for specific backend.
"""
def _get_uri(self):
return os.getenv('OS_TEST_DBAPI_CONNECTION', 'sqlite://')
def __init__(self, test):
super(DbFixture, self).__init__()
self.test = test
def setUp(self):
super(DbFixture, self).setUp()
self.test.engine = session.create_engine(self._get_uri())
self.test.sessionmaker = session.get_maker(self.test.engine)
self.addCleanup(self.test.engine.dispose)
class DbTestCase(test.BaseTestCase):
"""Base class for testing of DB code.
Using `DbFixture`. Intended to be the main database test case to use all
the tests on a given backend with user defined uri. Backend specific
tests should be decorated with `backend_specific` decorator.
"""
FIXTURE = DbFixture
def setUp(self):
super(DbTestCase, self).setUp()
self.useFixture(self.FIXTURE(self))
ALLOWED_DIALECTS = ['sqlite', 'mysql', 'postgresql']
def backend_specific(*dialects):
"""Decorator to skip backend specific tests on inappropriate engines.
::dialects: list of dialects names under which the test will be launched.
"""
def wrap(f):
@functools.wraps(f)
def ins_wrap(self):
if not set(dialects).issubset(ALLOWED_DIALECTS):
raise ValueError(
"Please use allowed dialects: %s" % ALLOWED_DIALECTS)
if self.engine.name not in dialects:
msg = ('The test "%s" can be run '
'only on %s. Current engine is %s.')
args = (f.__name__, ' '.join(dialects), self.engine.name)
self.skip(msg % args)
else:
return f(self)
return ins_wrap
return wrap
@six.add_metaclass(abc.ABCMeta)
class OpportunisticFixture(DbFixture):
"""Base fixture to use default CI databases.
The databases exist in OpenStack CI infrastructure. But for the
correct functioning in local environment the databases must be
created manually.
"""
DRIVER = abc.abstractproperty(lambda: None)
DBNAME = PASSWORD = USERNAME = 'openstack_citest'
def _get_uri(self):
return utils.get_connect_string(backend=self.DRIVER,
user=self.USERNAME,
passwd=self.PASSWORD,
database=self.DBNAME)
@six.add_metaclass(abc.ABCMeta)
class OpportunisticTestCase(DbTestCase):
"""Base test case to use default CI databases.
The subclasses of the test case are running only when openstack_citest
database is available otherwise a tests will be skipped.
"""
FIXTURE = abc.abstractproperty(lambda: None)
def setUp(self):
# TODO(bnemec): Remove this once infra is ready for
# https://review.openstack.org/#/c/74963/ to merge.
self.useFixture(lockutils.LockFixture('opportunistic-db'))
credentials = {
'backend': self.FIXTURE.DRIVER,
'user': self.FIXTURE.USERNAME,
'passwd': self.FIXTURE.PASSWORD,
'database': self.FIXTURE.DBNAME}
if self.FIXTURE.DRIVER and not utils.is_backend_avail(**credentials):
msg = '%s backend is not available.' % self.FIXTURE.DRIVER
return self.skip(msg)
super(OpportunisticTestCase, self).setUp()
class MySQLOpportunisticFixture(OpportunisticFixture):
DRIVER = 'mysql'
class PostgreSQLOpportunisticFixture(OpportunisticFixture):
DRIVER = 'postgresql'
class MySQLOpportunisticTestCase(OpportunisticTestCase):
FIXTURE = MySQLOpportunisticFixture
class PostgreSQLOpportunisticTestCase(OpportunisticTestCase):
FIXTURE = PostgreSQLOpportunisticFixture

View File

@ -0,0 +1,269 @@
# Copyright 2010-2011 OpenStack Foundation
# Copyright 2012-2013 IBM Corp.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import functools
import logging
import os
import subprocess
import lockfile
from six import moves
from six.moves.urllib import parse
import sqlalchemy
import sqlalchemy.exc
from kite.openstack.common.db.sqlalchemy import utils
from kite.openstack.common.gettextutils import _LE
from kite.openstack.common import test
LOG = logging.getLogger(__name__)
def _have_mysql(user, passwd, database):
present = os.environ.get('TEST_MYSQL_PRESENT')
if present is None:
return utils.is_backend_avail(backend='mysql',
user=user,
passwd=passwd,
database=database)
return present.lower() in ('', 'true')
def _have_postgresql(user, passwd, database):
present = os.environ.get('TEST_POSTGRESQL_PRESENT')
if present is None:
return utils.is_backend_avail(backend='postgres',
user=user,
passwd=passwd,
database=database)
return present.lower() in ('', 'true')
def _set_db_lock(lock_path=None, lock_prefix=None):
def decorator(f):
@functools.wraps(f)
def wrapper(*args, **kwargs):
try:
path = lock_path or os.environ.get("KITE_LOCK_PATH")
lock = lockfile.FileLock(os.path.join(path, lock_prefix))
with lock:
LOG.debug('Got lock "%s"' % f.__name__)
return f(*args, **kwargs)
finally:
LOG.debug('Lock released "%s"' % f.__name__)
return wrapper
return decorator
class BaseMigrationTestCase(test.BaseTestCase):
"""Base class fort testing of migration utils."""
def __init__(self, *args, **kwargs):
super(BaseMigrationTestCase, self).__init__(*args, **kwargs)
self.DEFAULT_CONFIG_FILE = os.path.join(os.path.dirname(__file__),
'test_migrations.conf')
# Test machines can set the TEST_MIGRATIONS_CONF variable
# to override the location of the config file for migration testing
self.CONFIG_FILE_PATH = os.environ.get('TEST_MIGRATIONS_CONF',
self.DEFAULT_CONFIG_FILE)
self.test_databases = {}
self.migration_api = None
def setUp(self):
super(BaseMigrationTestCase, self).setUp()
# Load test databases from the config file. Only do this
# once. No need to re-run this on each test...
LOG.debug('config_path is %s' % self.CONFIG_FILE_PATH)
if os.path.exists(self.CONFIG_FILE_PATH):
cp = moves.configparser.RawConfigParser()
try:
cp.read(self.CONFIG_FILE_PATH)
defaults = cp.defaults()
for key, value in defaults.items():
self.test_databases[key] = value
except moves.configparser.ParsingError as e:
self.fail("Failed to read test_migrations.conf config "
"file. Got error: %s" % e)
else:
self.fail("Failed to find test_migrations.conf config "
"file.")
self.engines = {}
for key, value in self.test_databases.items():
self.engines[key] = sqlalchemy.create_engine(value)
# We start each test case with a completely blank slate.
self._reset_databases()
def tearDown(self):
# We destroy the test data store between each test case,
# and recreate it, which ensures that we have no side-effects
# from the tests
self._reset_databases()
super(BaseMigrationTestCase, self).tearDown()
def execute_cmd(self, cmd=None):
process = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
output = process.communicate()[0]
LOG.debug(output)
self.assertEqual(0, process.returncode,
"Failed to run: %s\n%s" % (cmd, output))
def _reset_pg(self, conn_pieces):
(user,
password,
database,
host) = utils.get_db_connection_info(conn_pieces)
os.environ['PGPASSWORD'] = password
os.environ['PGUSER'] = user
# note(boris-42): We must create and drop database, we can't
# drop database which we have connected to, so for such
# operations there is a special database template1.
sqlcmd = ("psql -w -U %(user)s -h %(host)s -c"
" '%(sql)s' -d template1")
sql = ("drop database if exists %s;") % database
droptable = sqlcmd % {'user': user, 'host': host, 'sql': sql}
self.execute_cmd(droptable)
sql = ("create database %s;") % database
createtable = sqlcmd % {'user': user, 'host': host, 'sql': sql}
self.execute_cmd(createtable)
os.unsetenv('PGPASSWORD')
os.unsetenv('PGUSER')
@_set_db_lock(lock_prefix='migration_tests-')
def _reset_databases(self):
for key, engine in self.engines.items():
conn_string = self.test_databases[key]
conn_pieces = parse.urlparse(conn_string)
engine.dispose()
if conn_string.startswith('sqlite'):
# We can just delete the SQLite database, which is
# the easiest and cleanest solution
db_path = conn_pieces.path.strip('/')
if os.path.exists(db_path):
os.unlink(db_path)
# No need to recreate the SQLite DB. SQLite will
# create it for us if it's not there...
elif conn_string.startswith('mysql'):
# We can execute the MySQL client to destroy and re-create
# the MYSQL database, which is easier and less error-prone
# than using SQLAlchemy to do this via MetaData...trust me.
(user, password, database, host) = \
utils.get_db_connection_info(conn_pieces)
sql = ("drop database if exists %(db)s; "
"create database %(db)s;") % {'db': database}
cmd = ("mysql -u \"%(user)s\" -p\"%(password)s\" -h %(host)s "
"-e \"%(sql)s\"") % {'user': user, 'password': password,
'host': host, 'sql': sql}
self.execute_cmd(cmd)
elif conn_string.startswith('postgresql'):
self._reset_pg(conn_pieces)
class WalkVersionsMixin(object):
def _walk_versions(self, engine=None, snake_walk=False, downgrade=True):
# Determine latest version script from the repo, then
# upgrade from 1 through to the latest, with no data
# in the databases. This just checks that the schema itself
# upgrades successfully.
# Place the database under version control
self.migration_api.version_control(engine, self.REPOSITORY,
self.INIT_VERSION)
self.assertEqual(self.INIT_VERSION,
self.migration_api.db_version(engine,
self.REPOSITORY))
LOG.debug('latest version is %s' % self.REPOSITORY.latest)
versions = range(self.INIT_VERSION + 1, self.REPOSITORY.latest + 1)
for version in versions:
# upgrade -> downgrade -> upgrade
self._migrate_up(engine, version, with_data=True)
if snake_walk:
downgraded = self._migrate_down(
engine, version - 1, with_data=True)
if downgraded:
self._migrate_up(engine, version)
if downgrade:
# Now walk it back down to 0 from the latest, testing
# the downgrade paths.
for version in reversed(versions):
# downgrade -> upgrade -> downgrade
downgraded = self._migrate_down(engine, version - 1)
if snake_walk and downgraded:
self._migrate_up(engine, version)
self._migrate_down(engine, version - 1)
def _migrate_down(self, engine, version, with_data=False):
try:
self.migration_api.downgrade(engine, self.REPOSITORY, version)
except NotImplementedError:
# NOTE(sirp): some migrations, namely release-level
# migrations, don't support a downgrade.
return False
self.assertEqual(
version, self.migration_api.db_version(engine, self.REPOSITORY))
# NOTE(sirp): `version` is what we're downgrading to (i.e. the 'target'
# version). So if we have any downgrade checks, they need to be run for
# the previous (higher numbered) migration.
if with_data:
post_downgrade = getattr(
self, "_post_downgrade_%03d" % (version + 1), None)
if post_downgrade:
post_downgrade(engine)
return True
def _migrate_up(self, engine, version, with_data=False):
"""migrate up to a new version of the db.
We allow for data insertion and post checks at every
migration version with special _pre_upgrade_### and
_check_### functions in the main test.
"""
# NOTE(sdague): try block is here because it's impossible to debug
# where a failed data migration happens otherwise
try:
if with_data:
data = None
pre_upgrade = getattr(
self, "_pre_upgrade_%03d" % version, None)
if pre_upgrade:
data = pre_upgrade(engine)
self.migration_api.upgrade(engine, self.REPOSITORY, version)
self.assertEqual(version,
self.migration_api.db_version(engine,
self.REPOSITORY))
if with_data:
check = getattr(self, "_check_%03d" % version, None)
if check:
check(engine, data)
except Exception:
LOG.error(_LE("Failed to migrate to version %s on engine %s") %
(version, engine))
raise

View File

@ -0,0 +1,647 @@
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# Copyright 2010-2011 OpenStack Foundation.
# Copyright 2012 Justin Santa Barbara
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import re
import sqlalchemy
from sqlalchemy import Boolean
from sqlalchemy import CheckConstraint
from sqlalchemy import Column
from sqlalchemy.engine import reflection
from sqlalchemy.ext.compiler import compiles
from sqlalchemy import func
from sqlalchemy import Index
from sqlalchemy import Integer
from sqlalchemy import MetaData
from sqlalchemy import or_
from sqlalchemy.sql.expression import literal_column
from sqlalchemy.sql.expression import UpdateBase
from sqlalchemy.sql import select
from sqlalchemy import String
from sqlalchemy import Table
from sqlalchemy.types import NullType
from kite.openstack.common import context as request_context
from kite.openstack.common.db.sqlalchemy import models
from kite.openstack.common.gettextutils import _, _LI, _LW
from kite.openstack.common import timeutils
LOG = logging.getLogger(__name__)
_DBURL_REGEX = re.compile(r"[^:]+://([^:]+):([^@]+)@.+")
def sanitize_db_url(url):
match = _DBURL_REGEX.match(url)
if match:
return '%s****:****%s' % (url[:match.start(1)], url[match.end(2):])
return url
class InvalidSortKey(Exception):
message = _("Sort key supplied was not valid.")
# copy from glance/db/sqlalchemy/api.py
def paginate_query(query, model, limit, sort_keys, marker=None,
sort_dir=None, sort_dirs=None):
"""Returns a query with sorting / pagination criteria added.
Pagination works by requiring a unique sort_key, specified by sort_keys.
(If sort_keys is not unique, then we risk looping through values.)
We use the last row in the previous page as the 'marker' for pagination.
So we must return values that follow the passed marker in the order.
With a single-valued sort_key, this would be easy: sort_key > X.
With a compound-values sort_key, (k1, k2, k3) we must do this to repeat
the lexicographical ordering:
(k1 > X1) or (k1 == X1 && k2 > X2) or (k1 == X1 && k2 == X2 && k3 > X3)
We also have to cope with different sort_directions.
Typically, the id of the last row is used as the client-facing pagination
marker, then the actual marker object must be fetched from the db and
passed in to us as marker.
:param query: the query object to which we should add paging/sorting
:param model: the ORM model class
:param limit: maximum number of items to return
:param sort_keys: array of attributes by which results should be sorted
:param marker: the last item of the previous page; we returns the next
results after this value.
:param sort_dir: direction in which results should be sorted (asc, desc)
:param sort_dirs: per-column array of sort_dirs, corresponding to sort_keys
:rtype: sqlalchemy.orm.query.Query
:return: The query with sorting/pagination added.
"""
if 'id' not in sort_keys:
# TODO(justinsb): If this ever gives a false-positive, check
# the actual primary key, rather than assuming its id
LOG.warning(_LW('Id not in sort_keys; is sort_keys unique?'))
assert(not (sort_dir and sort_dirs))
# Default the sort direction to ascending
if sort_dirs is None and sort_dir is None:
sort_dir = 'asc'
# Ensure a per-column sort direction
if sort_dirs is None:
sort_dirs = [sort_dir for _sort_key in sort_keys]
assert(len(sort_dirs) == len(sort_keys))
# Add sorting
for current_sort_key, current_sort_dir in zip(sort_keys, sort_dirs):
try:
sort_dir_func = {
'asc': sqlalchemy.asc,
'desc': sqlalchemy.desc,
}[current_sort_dir]
except KeyError:
raise ValueError(_("Unknown sort direction, "
"must be 'desc' or 'asc'"))
try:
sort_key_attr = getattr(model, current_sort_key)
except AttributeError:
raise InvalidSortKey()
query = query.order_by(sort_dir_func(sort_key_attr))
# Add pagination
if marker is not None:
marker_values = []
for sort_key in sort_keys:
v = getattr(marker, sort_key)
marker_values.append(v)
# Build up an array of sort criteria as in the docstring
criteria_list = []
for i in range(len(sort_keys)):
crit_attrs = []
for j in range(i):
model_attr = getattr(model, sort_keys[j])
crit_attrs.append((model_attr == marker_values[j]))
model_attr = getattr(model, sort_keys[i])
if sort_dirs[i] == 'desc':
crit_attrs.append((model_attr < marker_values[i]))
else:
crit_attrs.append((model_attr > marker_values[i]))
criteria = sqlalchemy.sql.and_(*crit_attrs)
criteria_list.append(criteria)
f = sqlalchemy.sql.or_(*criteria_list)
query = query.filter(f)
if limit is not None:
query = query.limit(limit)
return query
def _read_deleted_filter(query, db_model, read_deleted):
if 'deleted' not in db_model.__table__.columns:
raise ValueError(_("There is no `deleted` column in `%s` table. "
"Project doesn't use soft-deleted feature.")
% db_model.__name__)
default_deleted_value = db_model.__table__.c.deleted.default.arg
if read_deleted == 'no':
query = query.filter(db_model.deleted == default_deleted_value)
elif read_deleted == 'yes':
pass # omit the filter to include deleted and active
elif read_deleted == 'only':
query = query.filter(db_model.deleted != default_deleted_value)
else:
raise ValueError(_("Unrecognized read_deleted value '%s'")
% read_deleted)
return query
def _project_filter(query, db_model, context, project_only):
if project_only and 'project_id' not in db_model.__table__.columns:
raise ValueError(_("There is no `project_id` column in `%s` table.")
% db_model.__name__)
if request_context.is_user_context(context) and project_only:
if project_only == 'allow_none':
is_none = None
query = query.filter(or_(db_model.project_id == context.project_id,
db_model.project_id == is_none))
else:
query = query.filter(db_model.project_id == context.project_id)
return query
def model_query(context, model, session, args=None, project_only=False,
read_deleted=None):
"""Query helper that accounts for context's `read_deleted` field.
:param context: context to query under
:param model: Model to query. Must be a subclass of ModelBase.
:type model: models.ModelBase
:param session: The session to use.
:type session: sqlalchemy.orm.session.Session
:param args: Arguments to query. If None - model is used.
:type args: tuple
:param project_only: If present and context is user-type, then restrict
query to match the context's project_id. If set to
'allow_none', restriction includes project_id = None.
:type project_only: bool
:param read_deleted: If present, overrides context's read_deleted field.
:type read_deleted: bool
Usage:
..code:: python
result = (utils.model_query(context, models.Instance, session=session)
.filter_by(uuid=instance_uuid)
.all())
query = utils.model_query(
context, Node,
session=session,
args=(func.count(Node.id), func.sum(Node.ram))
).filter_by(project_id=project_id)
"""
if not read_deleted:
if hasattr(context, 'read_deleted'):
# NOTE(viktors): some projects use `read_deleted` attribute in
# their contexts instead of `show_deleted`.
read_deleted = context.read_deleted
else:
read_deleted = context.show_deleted
if not issubclass(model, models.ModelBase):
raise TypeError(_("model should be a subclass of ModelBase"))
query = session.query(model) if not args else session.query(*args)
query = _read_deleted_filter(query, model, read_deleted)
query = _project_filter(query, model, context, project_only)
return query
def get_table(engine, name):
"""Returns an sqlalchemy table dynamically from db.
Needed because the models don't work for us in migrations
as models will be far out of sync with the current data.
"""
metadata = MetaData()
metadata.bind = engine
return Table(name, metadata, autoload=True)
class InsertFromSelect(UpdateBase):
"""Form the base for `INSERT INTO table (SELECT ... )` statement."""
def __init__(self, table, select):
self.table = table
self.select = select
@compiles(InsertFromSelect)
def visit_insert_from_select(element, compiler, **kw):
"""Form the `INSERT INTO table (SELECT ... )` statement."""
return "INSERT INTO %s %s" % (
compiler.process(element.table, asfrom=True),
compiler.process(element.select))
class ColumnError(Exception):
"""Error raised when no column or an invalid column is found."""
def _get_not_supported_column(col_name_col_instance, column_name):
try:
column = col_name_col_instance[column_name]
except KeyError:
msg = _("Please specify column %s in col_name_col_instance "
"param. It is required because column has unsupported "
"type by sqlite).")
raise ColumnError(msg % column_name)
if not isinstance(column, Column):
msg = _("col_name_col_instance param has wrong type of "
"column instance for column %s It should be instance "
"of sqlalchemy.Column.")
raise ColumnError(msg % column_name)
return column
def drop_unique_constraint(migrate_engine, table_name, uc_name, *columns,
**col_name_col_instance):
"""Drop unique constraint from table.
DEPRECATED: this function is deprecated and will be removed from kite.db
in a few releases. Please use UniqueConstraint.drop() method directly for
sqlalchemy-migrate migration scripts.
This method drops UC from table and works for mysql, postgresql and sqlite.
In mysql and postgresql we are able to use "alter table" construction.
Sqlalchemy doesn't support some sqlite column types and replaces their
type with NullType in metadata. We process these columns and replace
NullType with the correct column type.
:param migrate_engine: sqlalchemy engine
:param table_name: name of table that contains uniq constraint.
:param uc_name: name of uniq constraint that will be dropped.
:param columns: columns that are in uniq constraint.
:param col_name_col_instance: contains pair column_name=column_instance.
column_instance is instance of Column. These params
are required only for columns that have unsupported
types by sqlite. For example BigInteger.
"""
from migrate.changeset import UniqueConstraint
meta = MetaData()
meta.bind = migrate_engine
t = Table(table_name, meta, autoload=True)
if migrate_engine.name == "sqlite":
override_cols = [
_get_not_supported_column(col_name_col_instance, col.name)
for col in t.columns
if isinstance(col.type, NullType)
]
for col in override_cols:
t.columns.replace(col)
uc = UniqueConstraint(*columns, table=t, name=uc_name)
uc.drop()
def drop_old_duplicate_entries_from_table(migrate_engine, table_name,
use_soft_delete, *uc_column_names):
"""Drop all old rows having the same values for columns in uc_columns.
This method drop (or mark ad `deleted` if use_soft_delete is True) old
duplicate rows form table with name `table_name`.
:param migrate_engine: Sqlalchemy engine
:param table_name: Table with duplicates
:param use_soft_delete: If True - values will be marked as `deleted`,
if False - values will be removed from table
:param uc_column_names: Unique constraint columns
"""
meta = MetaData()
meta.bind = migrate_engine
table = Table(table_name, meta, autoload=True)
columns_for_group_by = [table.c[name] for name in uc_column_names]
columns_for_select = [func.max(table.c.id)]
columns_for_select.extend(columns_for_group_by)
duplicated_rows_select = select(columns_for_select,
group_by=columns_for_group_by,
having=func.count(table.c.id) > 1)
for row in migrate_engine.execute(duplicated_rows_select):
# NOTE(boris-42): Do not remove row that has the biggest ID.
delete_condition = table.c.id != row[0]
is_none = None # workaround for pyflakes
delete_condition &= table.c.deleted_at == is_none
for name in uc_column_names:
delete_condition &= table.c[name] == row[name]
rows_to_delete_select = select([table.c.id]).where(delete_condition)
for row in migrate_engine.execute(rows_to_delete_select).fetchall():
LOG.info(_LI("Deleting duplicated row with id: %(id)s from table: "
"%(table)s") % dict(id=row[0], table=table_name))
if use_soft_delete:
delete_statement = table.update().\
where(delete_condition).\
values({
'deleted': literal_column('id'),
'updated_at': literal_column('updated_at'),
'deleted_at': timeutils.utcnow()
})
else:
delete_statement = table.delete().where(delete_condition)
migrate_engine.execute(delete_statement)
def _get_default_deleted_value(table):
if isinstance(table.c.id.type, Integer):
return 0
if isinstance(table.c.id.type, String):
return ""
raise ColumnError(_("Unsupported id columns type"))
def _restore_indexes_on_deleted_columns(migrate_engine, table_name, indexes):
table = get_table(migrate_engine, table_name)
insp = reflection.Inspector.from_engine(migrate_engine)
real_indexes = insp.get_indexes(table_name)
existing_index_names = dict(
[(index['name'], index['column_names']) for index in real_indexes])
# NOTE(boris-42): Restore indexes on `deleted` column
for index in indexes:
if 'deleted' not in index['column_names']:
continue
name = index['name']
if name in existing_index_names:
column_names = [table.c[c] for c in existing_index_names[name]]
old_index = Index(name, *column_names, unique=index["unique"])
old_index.drop(migrate_engine)
column_names = [table.c[c] for c in index['column_names']]
new_index = Index(index["name"], *column_names, unique=index["unique"])
new_index.create(migrate_engine)
def change_deleted_column_type_to_boolean(migrate_engine, table_name,
**col_name_col_instance):
if migrate_engine.name == "sqlite":
return _change_deleted_column_type_to_boolean_sqlite(
migrate_engine, table_name, **col_name_col_instance)
insp = reflection.Inspector.from_engine(migrate_engine)
indexes = insp.get_indexes(table_name)
table = get_table(migrate_engine, table_name)
old_deleted = Column('old_deleted', Boolean, default=False)
old_deleted.create(table, populate_default=False)
table.update().\
where(table.c.deleted == table.c.id).\
values(old_deleted=True).\
execute()
table.c.deleted.drop()
table.c.old_deleted.alter(name="deleted")
_restore_indexes_on_deleted_columns(migrate_engine, table_name, indexes)
def _change_deleted_column_type_to_boolean_sqlite(migrate_engine, table_name,
**col_name_col_instance):
insp = reflection.Inspector.from_engine(migrate_engine)
table = get_table(migrate_engine, table_name)
columns = []
for column in table.columns:
column_copy = None
if column.name != "deleted":
if isinstance(column.type, NullType):
column_copy = _get_not_supported_column(col_name_col_instance,
column.name)
else:
column_copy = column.copy()
else:
column_copy = Column('deleted', Boolean, default=0)
columns.append(column_copy)
constraints = [constraint.copy() for constraint in table.constraints]
meta = table.metadata
new_table = Table(table_name + "__tmp__", meta,
*(columns + constraints))
new_table.create()
indexes = []
for index in insp.get_indexes(table_name):
column_names = [new_table.c[c] for c in index['column_names']]
indexes.append(Index(index["name"], *column_names,
unique=index["unique"]))
c_select = []
for c in table.c:
if c.name != "deleted":
c_select.append(c)
else:
c_select.append(table.c.deleted == table.c.id)
ins = InsertFromSelect(new_table, select(c_select))
migrate_engine.execute(ins)
table.drop()
[index.create(migrate_engine) for index in indexes]
new_table.rename(table_name)
new_table.update().\
where(new_table.c.deleted == new_table.c.id).\
values(deleted=True).\
execute()
def change_deleted_column_type_to_id_type(migrate_engine, table_name,
**col_name_col_instance):
if migrate_engine.name == "sqlite":
return _change_deleted_column_type_to_id_type_sqlite(
migrate_engine, table_name, **col_name_col_instance)
insp = reflection.Inspector.from_engine(migrate_engine)
indexes = insp.get_indexes(table_name)
table = get_table(migrate_engine, table_name)
new_deleted = Column('new_deleted', table.c.id.type,
default=_get_default_deleted_value(table))
new_deleted.create(table, populate_default=True)
deleted = True # workaround for pyflakes
table.update().\
where(table.c.deleted == deleted).\
values(new_deleted=table.c.id).\
execute()
table.c.deleted.drop()
table.c.new_deleted.alter(name="deleted")
_restore_indexes_on_deleted_columns(migrate_engine, table_name, indexes)
def _change_deleted_column_type_to_id_type_sqlite(migrate_engine, table_name,
**col_name_col_instance):
# NOTE(boris-42): sqlaclhemy-migrate can't drop column with check
# constraints in sqlite DB and our `deleted` column has
# 2 check constraints. So there is only one way to remove
# these constraints:
# 1) Create new table with the same columns, constraints
# and indexes. (except deleted column).
# 2) Copy all data from old to new table.
# 3) Drop old table.
# 4) Rename new table to old table name.
insp = reflection.Inspector.from_engine(migrate_engine)
meta = MetaData(bind=migrate_engine)
table = Table(table_name, meta, autoload=True)
default_deleted_value = _get_default_deleted_value(table)
columns = []
for column in table.columns:
column_copy = None
if column.name != "deleted":
if isinstance(column.type, NullType):
column_copy = _get_not_supported_column(col_name_col_instance,
column.name)
else:
column_copy = column.copy()
else:
column_copy = Column('deleted', table.c.id.type,
default=default_deleted_value)
columns.append(column_copy)
def is_deleted_column_constraint(constraint):
# NOTE(boris-42): There is no other way to check is CheckConstraint
# associated with deleted column.
if not isinstance(constraint, CheckConstraint):
return False
sqltext = str(constraint.sqltext)
return (sqltext.endswith("deleted in (0, 1)") or
sqltext.endswith("deleted IN (:deleted_1, :deleted_2)"))
constraints = []
for constraint in table.constraints:
if not is_deleted_column_constraint(constraint):
constraints.append(constraint.copy())
new_table = Table(table_name + "__tmp__", meta,
*(columns + constraints))
new_table.create()
indexes = []
for index in insp.get_indexes(table_name):
column_names = [new_table.c[c] for c in index['column_names']]
indexes.append(Index(index["name"], *column_names,
unique=index["unique"]))
ins = InsertFromSelect(new_table, table.select())
migrate_engine.execute(ins)
table.drop()
[index.create(migrate_engine) for index in indexes]
new_table.rename(table_name)
deleted = True # workaround for pyflakes
new_table.update().\
where(new_table.c.deleted == deleted).\
values(deleted=new_table.c.id).\
execute()
# NOTE(boris-42): Fix value of deleted column: False -> "" or 0.
deleted = False # workaround for pyflakes
new_table.update().\
where(new_table.c.deleted == deleted).\
values(deleted=default_deleted_value).\
execute()
def get_connect_string(backend, database, user=None, passwd=None):
"""Get database connection
Try to get a connection with a very specific set of values, if we get
these then we'll run the tests, otherwise they are skipped
"""
args = {'backend': backend,
'user': user,
'passwd': passwd,
'database': database}
if backend == 'sqlite':
template = '%(backend)s:///%(database)s'
else:
template = "%(backend)s://%(user)s:%(passwd)s@localhost/%(database)s"
return template % args
def is_backend_avail(backend, database, user=None, passwd=None):
try:
connect_uri = get_connect_string(backend=backend,
database=database,
user=user,
passwd=passwd)
engine = sqlalchemy.create_engine(connect_uri)
connection = engine.connect()
except Exception:
# intentionally catch all to handle exceptions even if we don't
# have any backend code loaded.
return False
else:
connection.close()
engine.dispose()
return True
def get_db_connection_info(conn_pieces):
database = conn_pieces.path.strip('/')
loc_pieces = conn_pieces.netloc.split('@')
host = loc_pieces[1]
auth_pieces = loc_pieces[0].split(':')
user = auth_pieces[0]
password = ""
if len(auth_pieces) > 1:
password = auth_pieces[1].strip()
return (user, password, database, host)

View File

@ -0,0 +1,99 @@
# Copyright (c) 2013 Hewlett-Packard Development Company, L.P.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
##############################################################################
##############################################################################
##
## DO NOT MODIFY THIS FILE
##
## This file is being graduated to the kitetest library. Please make all
## changes there, and only backport critical fixes here. - dhellmann
##
##############################################################################
##############################################################################
"""Common utilities used in testing"""
import logging
import os
import tempfile
import fixtures
import testtools
_TRUE_VALUES = ('True', 'true', '1', 'yes')
_LOG_FORMAT = "%(levelname)8s [%(name)s] %(message)s"
class BaseTestCase(testtools.TestCase):
def setUp(self):
super(BaseTestCase, self).setUp()
self._set_timeout()
self._fake_output()
self._fake_logs()
self.useFixture(fixtures.NestedTempfile())
self.useFixture(fixtures.TempHomeDir())
self.tempdirs = []
def _set_timeout(self):
test_timeout = os.environ.get('OS_TEST_TIMEOUT', 0)
try:
test_timeout = int(test_timeout)
except ValueError:
# If timeout value is invalid do not set a timeout.
test_timeout = 0
if test_timeout > 0:
self.useFixture(fixtures.Timeout(test_timeout, gentle=True))
def _fake_output(self):
if os.environ.get('OS_STDOUT_CAPTURE') in _TRUE_VALUES:
stdout = self.useFixture(fixtures.StringStream('stdout')).stream
self.useFixture(fixtures.MonkeyPatch('sys.stdout', stdout))
if os.environ.get('OS_STDERR_CAPTURE') in _TRUE_VALUES:
stderr = self.useFixture(fixtures.StringStream('stderr')).stream
self.useFixture(fixtures.MonkeyPatch('sys.stderr', stderr))
def _fake_logs(self):
if os.environ.get('OS_DEBUG') in _TRUE_VALUES:
level = logging.DEBUG
else:
level = logging.INFO
capture_logs = os.environ.get('OS_LOG_CAPTURE') in _TRUE_VALUES
if capture_logs:
self.useFixture(
fixtures.FakeLogger(
format=_LOG_FORMAT,
level=level,
nuke_handlers=capture_logs,
)
)
else:
logging.basicConfig(format=_LOG_FORMAT, level=level)
def create_tempfiles(self, files, ext='.conf'):
tempfiles = []
for (basename, contents) in files:
if not os.path.isabs(basename):
(fd, path) = tempfile.mkstemp(prefix=basename, suffix=ext)
else:
path = basename + ext
fd = os.open(path, os.O_CREAT | os.O_WRONLY)
tempfiles.append(path)
try:
os.write(fd, contents)
finally:
os.close(fd)
return tempfiles

View File

@ -0,0 +1,15 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from kite.openstack.common import gettextutils
gettextutils.install('kite', lazy=True)

View File

@ -22,4 +22,8 @@ class BaseTestCase(base.BaseTestCase):
super(BaseTestCase, self).setUp()
self.config_fixture = self.useFixture(config.Config())
self.CONF = self.config_fixture.conf
service.parse_args(args=[])
def config(self, *args, **kwargs):
self.config_fixture.config(*args, **kwargs)

View File

44
kite/tests/db/base.py Normal file
View File

@ -0,0 +1,44 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
from kite.db.kvs import api as kvs_api
from kite.db.sqlalchemy import api as sql_api
from kite.db.sqlalchemy import migration
from kite.tests import base
connection = os.getenv('OS_TEST_DBAPI_CONNECTION', 'sqlite://')
# TODO(jamielennox): there is some support in OSLO for migration tests and
# opportunistic tests. We should do more extensive real testing.
class BaseTestCase(base.BaseTestCase):
scenarios = [('sqlitedb', {'connection': connection,
'backend': sql_api}),
('kvsdb', {'connection': None,
'backend': kvs_api})
]
def setUp(self):
super(BaseTestCase, self).setUp()
self.backend.reset()
if self.connection:
self.config_fixture.config(connection=self.connection,
group='database')
migration.upgrade()
self.addCleanup(migration.downgrade)
self.DB = self.backend.get_backend()

View File

@ -0,0 +1,136 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from testscenarios import load_tests_apply_scenarios as load_tests # noqa
from kite.common import exception
from kite.tests.db import base
TEST_NAME = 'test-name'
TEST_SIG = 'test-sig'
TEST_KEY = 'test-enc'
class KeyDbTestCase(base.BaseTestCase):
def test_retrieve(self):
# Set a key and expect to get the same key back.
generation = self.DB.set_key(name=TEST_NAME,
signature=TEST_SIG,
key=TEST_KEY,
group=False)
key = self.DB.get_key(TEST_NAME)
self.assertEqual(key['name'], TEST_NAME)
self.assertEqual(key['key'], TEST_KEY)
self.assertEqual(key['signature'], TEST_SIG)
self.assertEqual(key['generation'], generation)
self.assertIs(key['group'], False)
self.assertIsNone(key['expiration'])
def test_no_key(self):
# return None if a key is not in the database
self.assertIsNone(self.DB.get_key(TEST_NAME))
def test_generations(self):
another_key = 'another-key'
# set a key and make sure that the generation is set and returned
gen1 = self.DB.set_key(name=TEST_NAME,
signature=TEST_SIG,
key=TEST_KEY,
group=False)
key1 = self.DB.get_key(TEST_NAME)
self.assertEqual(key1['key'], TEST_KEY)
self.assertEqual(key1['generation'], gen1)
# set a new key for the same name and make sure that the generation is
# updated
gen2 = self.DB.set_key(name=TEST_NAME,
signature='another-sig',
key=another_key,
group=False)
key2 = self.DB.get_key(TEST_NAME)
self.assertEqual(key2['generation'], gen2)
self.assertEqual(key2['key'], another_key)
# Check that if we ask specifically for the first key we get it back
key3 = self.DB.get_key(TEST_NAME, gen1)
self.assertEqual(key3['key'], TEST_KEY)
self.assertEqual(key3['generation'], gen1)
def test_no_group_filter(self):
# install a non group key
generation = self.DB.set_key(name=TEST_NAME,
signature=TEST_SIG,
key=TEST_KEY,
group=False)
# test that if i can retrieve and specify a non-group key
key1 = self.DB.get_key(TEST_NAME)
self.assertEqual(key1['key'], TEST_KEY)
self.assertEqual(key1['generation'], generation)
key2 = self.DB.get_key(TEST_NAME, group=False)
self.assertEqual(key2['key'], TEST_KEY)
self.assertEqual(key2['generation'], generation)
# if i ask for a group key of that name then it should fail
key3 = self.DB.get_key(TEST_NAME, group=True)
self.assertIsNone(key3)
def test_with_group_filter(self):
# install a group key
generation = self.DB.set_key(name=TEST_NAME,
signature=TEST_SIG,
key=TEST_KEY,
group=True)
# i should be able to ask for and retrieve a group key
key1 = self.DB.get_key(TEST_NAME)
self.assertEqual(key1['key'], TEST_KEY)
self.assertEqual(key1['generation'], generation)
key2 = self.DB.get_key(TEST_NAME, group=True)
self.assertEqual(key2['key'], TEST_KEY)
self.assertEqual(key2['generation'], generation)
# if i ask for that key but not a group key it will fail
key3 = self.DB.get_key(TEST_NAME, group=False)
self.assertIsNone(key3)
def test_cant_change_group_status(self):
group_key_name = 'name1'
host_key_name = 'name2'
# install a host and group key
self.DB.set_key(name=group_key_name,
signature=TEST_SIG,
key=TEST_KEY,
group=True)
self.DB.set_key(name=host_key_name,
signature=TEST_SIG,
key=TEST_KEY,
group=False)
# should not be able to change a group key to a host key
self.assertRaises(exception.IntegrityError, self.DB.set_key,
name=group_key_name, signature='xxx', key='xxx',
group=False)
# should not be able to change a host key to a group key
self.assertRaises(exception.IntegrityError, self.DB.set_key,
name=host_key_name, signature='xxx', key='xxx',
group=True)

View File

@ -1,6 +1,9 @@
[DEFAULT]
# The list of modules to copy from oslo-incubator.git
module=config
module=db
module=db.sqlalchemy
module=fixture
module=jsonutils
module=timeutils

View File

@ -1,3 +1,4 @@
alembic>=0.4.1
Babel>=1.3
iso8601>=0.1.9
oslo.config>=1.2.0

View File

@ -48,3 +48,4 @@ output_file = kite/locale/kite.pot
[entry_points]
console_scripts =
kite-api = kite.cli.api:main
kite-manage = kite.cli.manage:main

View File

@ -2,6 +2,8 @@ coverage>=3.6
discover
fixtures>=0.3.14
hacking>=0.8.0,<0.9
lockfile>=0.8
posix_ipc
python-subunit>=0.0.18
sphinx>=1.1.2,<1.2
oslo.sphinx

View File

25
tools/config/check_uptodate.sh Executable file
View File

@ -0,0 +1,25 @@
#!/usr/bin/env bash
PROJECT_NAME=${PROJECT_NAME:-kite}
CFGFILE_NAME=${PROJECT_NAME}.conf.sample
if [ -e etc/${PROJECT_NAME}/${CFGFILE_NAME} ]; then
CFGFILE=etc/${PROJECT_NAME}/${CFGFILE_NAME}
elif [ -e etc/${CFGFILE_NAME} ]; then
CFGFILE=etc/${CFGFILE_NAME}
else
echo "${0##*/}: can not find config file"
exit 1
fi
TEMPDIR=`mktemp -d /tmp/${PROJECT_NAME}.XXXXXX`
trap "rm -rf $TEMPDIR" EXIT
tools/config/generate_sample.sh -b ./ -p ${PROJECT_NAME} -o ${TEMPDIR}
if ! diff -u ${TEMPDIR}/${CFGFILE_NAME} ${CFGFILE}
then
echo "${0##*/}: ${PROJECT_NAME}.conf.sample is not up to date."
echo "${0##*/}: Please run ${0%%${0##*/}}generate_sample.sh."
exit 1
fi

119
tools/config/generate_sample.sh Executable file
View File

@ -0,0 +1,119 @@
#!/usr/bin/env bash
print_hint() {
echo "Try \`${0##*/} --help' for more information." >&2
}
PARSED_OPTIONS=$(getopt -n "${0##*/}" -o hb:p:m:l:o: \
--long help,base-dir:,package-name:,output-dir:,module:,library: -- "$@")
if [ $? != 0 ] ; then print_hint ; exit 1 ; fi
eval set -- "$PARSED_OPTIONS"
while true; do
case "$1" in
-h|--help)
echo "${0##*/} [options]"
echo ""
echo "options:"
echo "-h, --help show brief help"
echo "-b, --base-dir=DIR project base directory"
echo "-p, --package-name=NAME project package name"
echo "-o, --output-dir=DIR file output directory"
echo "-m, --module=MOD extra python module to interrogate for options"
echo "-l, --library=LIB extra library that registers options for discovery"
exit 0
;;
-b|--base-dir)
shift
BASEDIR=`echo $1 | sed -e 's/\/*$//g'`
shift
;;
-p|--package-name)
shift
PACKAGENAME=`echo $1`
shift
;;
-o|--output-dir)
shift
OUTPUTDIR=`echo $1 | sed -e 's/\/*$//g'`
shift
;;
-m|--module)
shift
MODULES="$MODULES -m $1"
shift
;;
-l|--library)
shift
LIBRARIES="$LIBRARIES -l $1"
shift
;;
--)
break
;;
esac
done
BASEDIR=${BASEDIR:-`pwd`}
if ! [ -d $BASEDIR ]
then
echo "${0##*/}: missing project base directory" >&2 ; print_hint ; exit 1
elif [[ $BASEDIR != /* ]]
then
BASEDIR=$(cd "$BASEDIR" && pwd)
fi
PACKAGENAME=${PACKAGENAME:-$(python setup.py --name)}
TARGETDIR=$BASEDIR/$PACKAGENAME
if ! [ -d $TARGETDIR ]
then
echo "${0##*/}: invalid project package name" >&2 ; print_hint ; exit 1
fi
OUTPUTDIR=${OUTPUTDIR:-$BASEDIR/etc}
# NOTE(bnemec): Some projects put their sample config in etc/,
# some in etc/$PACKAGENAME/
if [ -d $OUTPUTDIR/$PACKAGENAME ]
then
OUTPUTDIR=$OUTPUTDIR/$PACKAGENAME
elif ! [ -d $OUTPUTDIR ]
then
echo "${0##*/}: cannot access \`$OUTPUTDIR': No such file or directory" >&2
exit 1
fi
BASEDIRESC=`echo $BASEDIR | sed -e 's/\//\\\\\//g'`
find $TARGETDIR -type f -name "*.pyc" -delete
FILES=$(find $TARGETDIR -type f -name "*.py" ! -path "*/tests/*" \
-exec grep -l "Opt(" {} + | sed -e "s/^$BASEDIRESC\///g" | sort -u)
RC_FILE="`dirname $0`/oslo.config.generator.rc"
if test -r "$RC_FILE"
then
source "$RC_FILE"
fi
for mod in ${KITE_CONFIG_GENERATOR_EXTRA_MODULES}; do
MODULES="$MODULES -m $mod"
done
for lib in ${KITE_CONFIG_GENERATOR_EXTRA_LIBRARIES}; do
LIBRARIES="$LIBRARIES -l $lib"
done
export EVENTLET_NO_GREENDNS=yes
OS_VARS=$(set | sed -n '/^OS_/s/=[^=]*$//gp' | xargs)
[ "$OS_VARS" ] && eval "unset \$OS_VARS"
DEFAULT_MODULEPATH=kite.openstack.common.config.generator
MODULEPATH=${MODULEPATH:-$DEFAULT_MODULEPATH}
OUTPUTFILE=$OUTPUTDIR/$PACKAGENAME.conf.sample
python -m $MODULEPATH $MODULES $LIBRARIES $FILES > $OUTPUTFILE
# Hook to allow projects to append custom config file snippets
CONCAT_FILES=$(ls $BASEDIR/tools/config/*.conf.sample 2>/dev/null)
for CONCAT_FILE in $CONCAT_FILES; do
cat $CONCAT_FILE >> $OUTPUTFILE
done