Attempt to make backends more driver based like in Ceilometer

This commit is contained in:
Endre Karlson 2012-10-28 12:24:43 +01:00
parent 998a3ef376
commit d1a5e56a50
23 changed files with 568 additions and 247 deletions

View File

@ -16,7 +16,7 @@
from moniker.openstack.common import cfg
from moniker.openstack.common import log as logging
from moniker.openstack.common.rpc import service as rpc_service
from moniker import database
from moniker import storage
from moniker import utils
from moniker import policy
from moniker.agent import api as agent_api
@ -36,44 +36,43 @@ class Service(rpc_service.Service):
super(Service, self).__init__(*args, **kwargs)
self.init_database()
def init_database(self):
self.database = database.get_driver()
engine = storage.get_engine(cfg.CONF)
self.storage_driver = engine
self.storage_conn = engine.get_connection(cfg.CONF)
# Server Methods
def create_server(self, context, values):
server = self.database.create_server(context, values)
server = self.storage_conn.create_server(context, values)
utils.notify(context, 'api', 'server.create', server)
return server
def get_servers(self, context):
return self.database.get_servers(context)
return self.storage_conn.get_servers(context)
def get_server(self, context, server_id):
return self.database.get_server(context, server_id)
return self.storage_conn.get_server(context, server_id)
def update_server(self, context, server_id, values):
server = self.database.update_server(context, server_id, values)
server = self.storage_conn.update_server(context, server_id, values)
utils.notify(context, 'api', 'server.update', server)
return server
def delete_server(self, context, server_id):
server = self.database.get_server(context, server_id)
server = self.storage_conn.get_server(context, server_id)
utils.notify(context, 'api', 'server.delete', server)
return self.database.delete_server(context, server_id)
return self.storage_conn.delete_server(context, server_id)
# Domain Methods
def create_domain(self, context, values):
values['tenant_id'] = context.tenant
domain = self.database.create_domain(context, values)
domain = self.storage_conn.create_domain(context, values)
agent_api.create_domain(context, domain)
utils.notify(context, 'api', 'domain.create', domain)
@ -81,13 +80,13 @@ class Service(rpc_service.Service):
return domain
def get_domains(self, context):
return self.database.get_domains(context)
return self.storage_conn.get_domains(context)
def get_domain(self, context, domain_id):
return self.database.get_domain(context, domain_id)
return self.storage_conn.get_domain(context, domain_id)
def update_domain(self, context, domain_id, values):
domain = self.database.update_domain(context, domain_id, values)
domain = self.storage_conn.update_domain(context, domain_id, values)
agent_api.update_domain(context, domain)
utils.notify(context, 'api', 'domain.update', domain)
@ -95,18 +94,18 @@ class Service(rpc_service.Service):
return domain
def delete_domain(self, context, domain_id):
domain = self.database.get_domain(context, domain_id)
domain = self.storage_conn.get_domain(context, domain_id)
agent_api.delete_domain(context, domain)
utils.notify(context, 'api', 'domain.delete', domain)
return self.database.delete_domain(context, domain_id)
return self.storage_conn.delete_domain(context, domain_id)
# Record Methods
def create_record(self, context, domain_id, values):
record = self.database.create_record(context, domain_id, values)
record = self.storage_conn.create_record(context, domain_id, values)
domain = self.database.get_domain(context, domain_id)
domain = self.storage_conn.get_domain(context, domain_id)
agent_api.create_record(context, domain, record)
utils.notify(context, 'api', 'record.create', record)
@ -114,15 +113,15 @@ class Service(rpc_service.Service):
return record
def get_records(self, context, domain_id):
return self.database.get_records(context, domain_id)
return self.storage_conn.get_records(context, domain_id)
def get_record(self, context, domain_id, record_id):
return self.database.get_record(context, record_id)
return self.storage_conn.get_record(context, record_id)
def update_record(self, context, domain_id, record_id, values):
record = self.database.update_record(context, record_id, values)
record = self.storage_conn.update_record(context, record_id, values)
domain = self.database.get_domain(context, domain_id)
domain = self.storage_conn.get_domain(context, domain_id)
agent_api.update_record(context, domain, record)
utils.notify(context, 'api', 'record.update', record)
@ -130,11 +129,11 @@ class Service(rpc_service.Service):
return record
def delete_record(self, context, domain_id, record_id):
record = self.database.get_record(context, record_id)
record = self.storage_conn.get_record(context, record_id)
domain = self.database.get_domain(context, domain_id)
domain = self.storage_conn.get_domain(context, domain_id)
agent_api.delete_record(context, domain, record)
utils.notify(context, 'api', 'record.delete', record)
return self.database.delete_record(context, record_id)
return self.storage_conn.delete_record(context, record_id)

View File

@ -1,83 +0,0 @@
# Copyright 2012 Managed I.T.
#
# Author: Kiall Mac Innes <kiall@managedit.ie>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from moniker.openstack.common import cfg
cfg.CONF.register_opts([
cfg.StrOpt('database-driver', default='sqlalchemy',
help='The database driver to use'),
cfg.StrOpt('database-connection', default='sqlite:///test.sqlite',
help='The database connection string'),
])
class BaseDatabase(object):
def create_server(self, context, values):
raise NotImplementedError()
def get_servers(self, context):
raise NotImplementedError()
def get_server(self, context, server_id):
raise NotImplementedError()
def update_server(self, context, server_id, values):
raise NotImplementedError()
def delete_server(self, context, server_id):
raise NotImplementedError()
def create_domain(self, context, values):
raise NotImplementedError()
def get_domains(self, context):
raise NotImplementedError()
def get_domain(self, context, domain_id):
raise NotImplementedError()
def update_domain(self, context, domain_id, values):
raise NotImplementedError()
def delete_domain(self, context, domain_id):
raise NotImplementedError()
def create_record(self, context, domain_id, values):
raise NotImplementedError()
def get_records(self, context, domain_id):
raise NotImplementedError()
def get_record(self, context, record_id):
raise NotImplementedError()
def update_record(self, context, record_id, values):
raise NotImplementedError()
def delete_record(self, context, record_id):
raise NotImplementedError()
def get_driver(*args, **kwargs):
# TODO: Switch to the config var + entry point loading
from moniker.database.sqlalchemy import Sqlalchemy
return Sqlalchemy(*args, **kwargs)
def reinitialize(*args, **kwargs):
""" Reset the DB to default - Used for testing purposes """
from moniker.database.sqlalchemy.session import reset_session
reset_session(*args, **kwargs)

View File

@ -1,61 +0,0 @@
# Copyright 2012 Managed I.T.
#
# Author: Kiall Mac Innes <kiall@managedit.ie>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session, sessionmaker
from moniker.openstack.common import cfg
from moniker.openstack.common import log as logging
LOG = logging.getLogger(__name__)
_ENGINE = None
_SESSION = None
def get_session():
global _ENGINE, _SESSION
if _ENGINE is None:
_ENGINE = get_engine()
if _SESSION is None:
Session = sessionmaker(bind=_ENGINE, autocommit=True,
expire_on_commit=False)
_SESSION = scoped_session(Session)
return _SESSION
def get_engine():
url = cfg.CONF.database_connection
engine_args = {
'echo': False,
'convert_unicode': True,
}
if cfg.CONF.verbose or cfg.CONF.debug:
engine_args['echo'] = True
engine = create_engine(url, **engine_args)
engine.connect()
return engine
def reset_session():
global _ENGINE, _SESSION
_ENGINE = None
_SESSION = None

View File

@ -0,0 +1,60 @@
# Copyright 2012 Managed I.T.
#
# Author: Kiall Mac Innes <kiall@managedit.ie>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from urlparse import urlparse
from stevedore import driver
from moniker.openstack.common import cfg
from moniker.openstack.common import log
LOG = log.getLogger(__name__)
DRIVER_NAMESPACE = 'moniker.storage'
cfg.CONF.register_opts([
cfg.StrOpt(
'database_connection',
default='sqlite:///moniker.db',
help='The database driver to use')
])
def register_opts(conf):
engine = get_engine(conf)
engine.register_opts(conf)
def get_engine(conf):
engine_name = urlparse(conf.database_connection).scheme
LOG.debug('looking for %r engine in %r', engine_name, DRIVER_NAMESPACE)
mgr = driver.DriverManager(
DRIVER_NAMESPACE,
engine_name,
invoke_on_load=True)
return mgr.driver
def get_connection(conf):
engine = get_engine(conf)
engine.register_opts(conf)
return engine.get_connection(conf)
def reset_data(*args, **kwargs):
""" Reset the DB to default - Used for testing purposes """
from moniker.storage.sqla.models import Base
conn = get_connection(cfg.CONF)
Base.metadata.drop_all(conn.session.bind)

169
moniker/storage/base.py Normal file
View File

@ -0,0 +1,169 @@
import abc
class StorageEngine(object):
"""
Base class for storage engines
"""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def register_opts(self, conf):
"""
Register any configuration options used by this engine.
"""
@abc.abstractmethod
def get_connection(self, conf):
"""
Return a Connection instance based on the configuration settings.
"""
class Connection(object):
"""
A Connection
"""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def __init__(self, conf):
"""
Constructor...
"""
@abc.abstractmethod
def create_server(self, context):
"""
Create a Server.
:param context: RPC Context.
"""
@abc.abstractmethod
def get_servers(self, context):
"""
Get Servers.
:param context: RPC Context.
"""
@abc.abstractmethod
def get_server(self, context, server_id):
"""
Get a Server via ID.
:param context: RPC Context.
:param server_id: Server ID to get.
"""
@abc.abstractmethod
def update_server(self, context, server_id, values):
"""
Update a Server via ID
:param context: RPC Context.
:param server_id: Server ID to update.
:param values: Values to update the Server from
"""
@abc.abstractmethod
def delete_server(self, context, server_id):
"""
Delete a Server via ID.
:param context: RPC Context.
:param server_id: Delete a Server via ID
"""
@abc.abstractmethod
def create_domain(self, context, values):
"""
Create a new Domain.
:param context: RPC Context.
:param values: Values to create the new Domain from.
"""
@abc.abstractmethod
def get_domains(self, context):
"""
Get all Domains.
:param context: RPC Context.
"""
@abc.abstractmethod
def get_domain(self, context, domain_id):
"""
Get a Domain via its ID.
:param context: RPC Context.
:param domain_id: ID of the Domain.
"""
@abc.abstractmethod
def update_domain(self, context, domain_id, values):
"""
Update a Domain via ID.
:param context: RPC Context.
:param domain_id: Values to update the Domain with
:param values: Values to update the Domain from.
"""
@abc.abstractmethod
def delete_domain(self, context, domain_id):
"""
Delete a Domain
:param context: RPC Context.
:param domain_id: Domain ID to delete.
"""
@abc.abstractmethod
def create_record(self, context, domain_id, values):
"""
Create a record on a given Domain ID
:param context: RPC Context.
:param domain_id: Domain ID to create the record in.
:param values: Values to create the new Record from.
"""
@abc.abstractmethod
def get_records(self, context, domain_id):
"""
Get a list of records via a Domain's ID
:param context: RPC Context.
:param domain_id: Domain ID where the records recide.
"""
@abc.abstractmethod
def get_record(self, context, record_id):
"""
Get a record via ID
:param context: RPC Context.
:param record_id: Record ID to get
"""
@abc.abstractmethod
def update_record(self, context, record_id, values):
"""
Update a record via ID
:param context: RPC Context
:param record_id: Record ID to update
"""
@abc.abstractmethod
def delete_record(self, context, record_id):
"""
Delete a record
:param context: RPC Context
:param record_id: Record ID to delete
"""

View File

@ -13,23 +13,45 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from sqlalchemy.orm.exc import NoResultFound
from sqlalchemy.orm import exc
from moniker.openstack.common import log as logging
from moniker import exceptions
from moniker.database import BaseDatabase
from moniker.database.sqlalchemy import models
from moniker.database.sqlalchemy.session import get_session
from moniker.storage import base
from moniker.storage.sqla import models
from moniker.storage.sqla.session import get_session
LOG = logging.getLogger(__name__)
class Sqlalchemy(BaseDatabase):
def __init__(self):
self.session = get_session()
self._initialize_database() # HACK: Remove me
class SQLAlchemyStorage(base.StorageEngine):
OPTIONS = []
def _initialize_database(self):
def register_opts(self, conf):
conf.register_opts(self.OPTIONS)
def get_connection(self, conf):
return Connection(conf)
class Connection(base.Connection):
"""
SQLAlchemy connection
"""
def __init__(self, conf):
LOG.info('connecting to %s', conf.database_connection)
self.session = self._get_connection(conf)
# NOTE: Need to fix this properly...
self.register_models()
def _get_connection(self, conf):
"""
Return a connection to the database.
"""
return get_session()
def register_models(self):
""" Semi-Private Method to create the database schema """
models.Base.metadata.create_all(self.session.bind)
@ -40,7 +62,7 @@ class Sqlalchemy(BaseDatabase):
server.update(values)
try:
server.save()
server.save(self.session)
except exceptions.Duplicate:
raise exceptions.DuplicateServer()
@ -51,7 +73,7 @@ class Sqlalchemy(BaseDatabase):
try:
result = query.all()
except NoResultFound:
except exc.NoResultFound:
LOG.debug('No results found')
return []
else:
@ -62,7 +84,7 @@ class Sqlalchemy(BaseDatabase):
try:
server = query.filter(models.Server.id == server_id).one()
except NoResultFound:
except exc.NoResultFound:
raise exceptions.ServerNotFound(server_id)
else:
return server
@ -78,7 +100,7 @@ class Sqlalchemy(BaseDatabase):
server.update(values)
try:
server.save()
server.save(self.session)
except exceptions.Duplicate:
raise exceptions.DuplicateServer()
@ -87,7 +109,7 @@ class Sqlalchemy(BaseDatabase):
def delete_server(self, context, server_id):
server = self._get_server(context, server_id)
server.delete()
server.delete(self.session)
# Domain Methods
def create_domain(self, context, values):
@ -96,7 +118,7 @@ class Sqlalchemy(BaseDatabase):
domain.update(values)
try:
domain.save()
domain.save(self.session)
except exceptions.Duplicate:
raise exceptions.DuplicateDomain()
@ -107,7 +129,7 @@ class Sqlalchemy(BaseDatabase):
try:
result = query.all()
except NoResultFound:
except exc.NoResultFound:
LOG.debug('No results found')
return []
else:
@ -118,7 +140,7 @@ class Sqlalchemy(BaseDatabase):
try:
domain = query.filter(models.Domain.id == domain_id).one()
except NoResultFound:
except exc.NoResultFound:
raise exceptions.DomainNotFound(domain_id)
else:
return domain
@ -134,7 +156,7 @@ class Sqlalchemy(BaseDatabase):
domain.update(values)
try:
domain.save()
domain.save(self.session)
except exceptions.Duplicate:
raise exceptions.DuplicateDomain()
@ -143,7 +165,7 @@ class Sqlalchemy(BaseDatabase):
def delete_domain(self, context, domain_id):
domain = self._get_domain(context, domain_id)
domain.delete()
domain.delete(self.session)
# Record Methods
def create_record(self, context, domain_id, values):
@ -154,7 +176,7 @@ class Sqlalchemy(BaseDatabase):
domain.records.append(record)
domain.save()
domain.save(self.session)
return dict(record)
@ -168,7 +190,7 @@ class Sqlalchemy(BaseDatabase):
try:
record = query.filter(models.Record.id == record_id).one()
except NoResultFound:
except exc.NoResultFound:
raise exceptions.RecordNotFound(record_id)
else:
return record
@ -183,11 +205,11 @@ class Sqlalchemy(BaseDatabase):
record.update(values)
record.save()
record.save(self.session)
return dict(record)
def delete_record(self, context, record_id):
record = self._get_record(context, record_id)
record.delete()
record.delete(self.session)

View File

@ -22,10 +22,11 @@ Various conveniences used for migration scripts
"""
import sqlalchemy.types
from sqlalchemy import (DateTime, Boolean, String, Text, Integer, Enum)
from moniker.openstack.common import log as logging
from moniker.database.sqlalchemy.types import UUID, Inet
logger = logging.getLogger('moniker.database.migrate_repo.schema')
from moniker.openstack.common import log as logging
from moniker.storage.sqla.types import UUID, Inet
logger = logging.getLogger('moniker.storage.sqla.migrate_repo.schema')
String = lambda length: sqlalchemy.types.String(
@ -50,10 +51,10 @@ DateTime = lambda: sqlalchemy.types.DateTime(timezone=False)
Integer = lambda: sqlalchemy.types.Integer()
UUID = lambda: moniker.database.sqlalchemy.types.UUID()
UUID = lambda: moniker.storage.sqla.types.UUID()
Inet = lambda: moniker.database.sqlalchemy.types.Inet()
Inet = lambda: moniker.storage.sqla.types.Inet()
RECORD_TYPES = ['A', 'AAAA', 'CNAME', 'MX', 'SRV', 'TXT', 'NS']

View File

@ -19,7 +19,7 @@
from migrate import *
from sqlalchemy.schema import (Column, MetaData, Table)
from moniker.database.sqlalchemy.migrate_repo.schema import (
from moniker.storage.sqla.migrate_repo.schema import (
Integer, String, Text, create_tables,
drop_tables, RECORD_TYPES)

View File

@ -16,20 +16,37 @@
# License for the specific language governing permissions and limitations
# under the License.
from uuid import uuid4
from urlparse import urlparse
from sqlalchemy import (Column, DateTime, String, Text, Integer, ForeignKey,
Enum)
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import relationship, backref, object_mapper
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.ext.hybrid import hybrid_property
from moniker import exceptions
from moniker.openstack.common import timeutils
from moniker.openstack.common import cfg
from moniker.openstack.common import log as logging
from moniker.database.sqlalchemy.session import get_session
from moniker.database.sqlalchemy.types import UUID, Inet
from moniker.openstack.common import timeutils
from moniker.storage.sqla.session import get_session
from moniker.storage.sqla.types import UUID, Inet
LOG = logging.getLogger(__name__)
sql_opts = [
cfg.IntOpt('mysql_engine', default='InnoDB', help='MySQL engine')
]
cfg.CONF.register_opts(sql_opts)
def table_args():
engine_name = urlparse(cfg.CONF.database_connection).scheme
if engine_name == 'mysql':
return {'mysql_engine': cfg.CONF.mysql_engine}
return None
class Base(object):
__abstract__ = True
@ -44,11 +61,11 @@ class Base(object):
'version_id_col': version
}
def save(self, session=None):
""" Save this object """
if not session:
session = get_session()
__table_args__ = table_args()
__table_initialized__ = False
def save(self, session):
""" Save this object """
session.add(self)
try:
@ -59,7 +76,7 @@ class Base(object):
else:
raise
def delete(self, session=None):
def delete(self, session):
""" Delete this object """
if not session:
session = get_session()

View File

@ -0,0 +1,188 @@
# Copyright 2012 Managed I.T.
#
# Author: Kiall Mac Innes <kiall@managedit.ie>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Session Handling for SQLAlchemy backend."""
import re
import time
import sqlalchemy
from sqlalchemy.exc import DisconnectionError, OperationalError
import sqlalchemy.orm
from sqlalchemy.pool import NullPool, StaticPool
from moniker.openstack.common import cfg
from moniker.openstack.common import log as logging
LOG = logging.getLogger(__name__)
_MAKER = None
_ENGINE = None
sql_opts = [
cfg.IntOpt('sql_connection_debug', default=100,
help='Verbosity of SQL debugging information. 0=None,'
' 100=Everything'),
cfg.BoolOpt('sql_connection_trace', default=False,
help='Add python stack traces to SQL as comment strings'),
cfg.BoolOpt('sqlite_synchronous', default=True,
help='If passed, use synchronous mode for sqlite'),
cfg.IntOpt('sql_idle_timeout', default=3600,
help='timeout before idle sql connections are reaped'),
cfg.IntOpt('sql_max_retries', default=10,
help='maximum db connection retries during startup. '
'(setting -1 implies an infinite retry count)'),
cfg.IntOpt('sql_retry_interval', default=10,
help='interval between retries of opening a sql connection')
]
cfg.CONF.register_opts(sql_opts)
def get_session(autocommit=True, expire_on_commit=False, autoflush=True):
"""Return a SQLAlchemy session."""
global _MAKER
if _MAKER is None:
engine = get_engine()
_MAKER = get_maker(engine, autocommit, expire_on_commit, autoflush)
session = _MAKER()
return session
def synchronous_switch_listener(dbapi_conn, connection_rec):
"""Switch sqlite connections to non-synchronous mode"""
dbapi_conn.execute("PRAGMA synchronous = OFF")
def add_regexp_listener(dbapi_con, con_record):
"""Add REGEXP function to sqlite connections."""
def regexp(expr, item):
reg = re.compile(expr)
return reg.search(unicode(item)) is not None
dbapi_con.create_function('regexp', 2, regexp)
def ping_listener(dbapi_conn, connection_rec, connection_proxy):
"""
Ensures that MySQL connections checked out of the
pool are alive.
Borrowed from:
http://groups.google.com/group/sqlalchemy/msg/a4ce563d802c929f
"""
try:
dbapi_conn.cursor().execute('select 1')
except dbapi_conn.OperationalError, ex:
if ex.args[0] in (2006, 2013, 2014, 2045, 2055):
LOG.warn('Got mysql server has gone away: %s', ex)
raise DisconnectionError("Database server went away")
else:
raise
def is_db_connection_error(args):
"""Return True if error in connecting to db."""
# NOTE(adam_g): This is currently MySQL specific and needs to be extended
# to support Postgres and others.
conn_err_codes = ('2002', '2003', '2006')
for err_code in conn_err_codes:
if args.find(err_code) != -1:
return True
return False
def get_engine():
"""Return a SQLAlchemy engine."""
global _ENGINE
if _ENGINE is None:
connection_dict = sqlalchemy.engine.url.make_url(
cfg.CONF.database_connection)
engine_args = {
"pool_recycle": cfg.CONF.sql_idle_timeout,
"echo": False,
'convert_unicode': True,
}
# Map our SQL debug level to SQLAlchemy's options
if cfg.CONF.sql_connection_debug >= 100:
engine_args['echo'] = 'debug'
elif cfg.CONF.sql_connection_debug >= 50:
engine_args['echo'] = True
if "sqlite" in connection_dict.drivername:
engine_args["poolclass"] = NullPool
if cfg.CONF.database_connection == "sqlite://":
engine_args["poolclass"] = StaticPool
engine_args["connect_args"] = {'check_same_thread': False}
_ENGINE = sqlalchemy.create_engine(cfg.CONF.database_connection,
**engine_args)
if 'mysql' in connection_dict.drivername:
sqlalchemy.event.listen(_ENGINE, 'checkout', ping_listener)
elif "sqlite" in connection_dict.drivername:
if not cfg.CONF.sqlite_synchronous:
sqlalchemy.event.listen(_ENGINE, 'connect',
synchronous_switch_listener)
sqlalchemy.event.listen(_ENGINE, 'connect', add_regexp_listener)
if (cfg.CONF.sql_connection_trace and
_ENGINE.dialect.dbapi.__name__ == 'MySQLdb'):
import MySQLdb.cursors
_do_query = debug_mysql_do_query()
setattr(MySQLdb.cursors.BaseCursor, '_do_query', _do_query)
try:
_ENGINE.connect()
except OperationalError, e:
if not is_db_connection_error(e.args[0]):
raise
remaining = cfg.CONF.sql_max_retries
if remaining == -1:
remaining = 'infinite'
while True:
msg = _('SQL connection failed. %s attempts left.')
LOG.warn(msg % remaining)
if remaining != 'infinite':
remaining -= 1
time.sleep(cfg.CONF.sql_retry_interval)
try:
_ENGINE.connect()
break
except OperationalError, e:
if (remaining != 'infinite' and remaining == 0) or \
not is_db_connection_error(e.args[0]):
raise
return _ENGINE
def get_maker(engine, autocommit=True, expire_on_commit=False, autoflush=True):
"""Return a SQLAlchemy sessionmaker using the given engine."""
return sqlalchemy.orm.sessionmaker(bind=engine,
autocommit=autocommit,
autoflush=autoflush,
expire_on_commit=expire_on_commit)
def func():
# ugly hack sqlalchemy name conflict from impl_sqlalchemy
return sqlalchemy.func

View File

@ -18,22 +18,22 @@ import unittest
import mox
from moniker.openstack.common import cfg
from moniker.openstack.common.context import RequestContext, get_admin_context
from moniker.database import reinitialize as reinitialize_database
from moniker.storage import reset_data
from moniker.storage import sqla # Import for database_connection cfg def.
class TestCase(unittest.TestCase):
def setUp(self):
super(TestCase, self).setUp()
self.mox = mox.Mox()
self.config(database_driver='sqlalchemy',
database_connection='sqlite://',
self.config(database_connection='sqlite://',
rpc_backend='moniker.openstack.common.rpc.impl_fake',
notification_driver=[])
reinitialize_database()
def tearDown(self):
cfg.CONF.reset()
self.mox.UnsetStubs()
reset_data()
super(TestCase, self).tearDown()
def config(self, **kwargs):

View File

@ -15,22 +15,25 @@
# under the License.
import copy
from nose import SkipTest
from moniker.openstack.common import cfg
from moniker.openstack.common import log as logging
from moniker.tests import TestCase
from moniker import database
from moniker import storage
from moniker import exceptions
LOG = logging.getLogger(__name__)
class DatabaseTestCase(TestCase):
class StorageTestCase(TestCase):
__test__ = False
def get_database_driver(self, *args, **kwargs):
return database.get_driver(*args, **kwargs)
def get_storage_driver(self, conf=cfg.CONF):
engine = storage.get_engine(conf)
connection = engine.get_connection(conf)
return connection
class DatabaseDriverTestCase(DatabaseTestCase):
class StorageDriverTestCase(StorageTestCase):
__test__ = False
server_fixtures = [{
@ -48,18 +51,18 @@ class DatabaseDriverTestCase(DatabaseTestCase):
}]
def setUp(self):
super(DatabaseDriverTestCase, self).setUp()
self.driver = self.get_database_driver()
super(StorageDriverTestCase, self).setUp()
self.storage_conn = self.get_storage_driver()
self.admin_context = self.get_admin_context()
def create_server_fixture(self, fixture=0, values={}):
_values = copy.copy(self.server_fixtures[fixture])
_values.update(values)
return self.driver.create_server(self.admin_context, _values)
return self.storage_conn.create_server(self.admin_context, _values)
def test_init(self):
self.get_database_driver()
self.get_storage_driver()
def test_create_server(self):
values = {
@ -68,7 +71,8 @@ class DatabaseDriverTestCase(DatabaseTestCase):
'ipv6': '2001:db8::1',
}
result = self.driver.create_server(self.admin_context, values=values)
result = self.storage_conn.create_server(
self.admin_context, values=values)
self.assertIsNotNone(result['id'])
self.assertIsNotNone(result['created_at'])
@ -89,8 +93,8 @@ class DatabaseDriverTestCase(DatabaseTestCase):
}]
for value in values:
result = self.driver.create_server(self.admin_context,
values=value)
result = self.storage_conn.create_server(
self.admin_context, values=value)
self.assertIsNotNone(result['id'])
self.assertIsNotNone(result['created_at'])
@ -128,13 +132,13 @@ class DatabaseDriverTestCase(DatabaseTestCase):
self.create_server_fixture(values=value)
def test_get_servers(self):
actual = self.driver.get_servers(self.admin_context)
actual = self.storage_conn.get_servers(self.admin_context)
self.assertEqual(actual, [])
# Create a single server
server_one = self.create_server_fixture()
actual = self.driver.get_servers(self.admin_context)
actual = self.storage_conn.get_servers(self.admin_context)
self.assertEqual(len(actual), 1)
self.assertEqual(str(actual[0]['name']), str(server_one['name']))
@ -144,13 +148,14 @@ class DatabaseDriverTestCase(DatabaseTestCase):
# Create a second server
self.create_server_fixture(fixture=1)
actual = self.driver.get_servers(self.admin_context)
actual = self.storage_conn.get_servers(self.admin_context)
self.assertEqual(len(actual), 2)
def test_get_server(self):
# Create a server
expected = self.create_server_fixture()
actual = self.driver.get_server(self.admin_context, expected['id'])
actual = self.storage_conn.get_server(
self.admin_context, expected['id'])
self.assertEqual(str(actual['name']), str(expected['name']))
self.assertEqual(str(actual['ipv4']), str(expected['ipv4']))
@ -159,15 +164,15 @@ class DatabaseDriverTestCase(DatabaseTestCase):
def test_get_server_missing(self):
with self.assertRaises(exceptions.ServerNotFound):
uuid = 'caf771fc-6b05-4891-bee1-c2a48621f57b'
self.driver.get_server(self.admin_context, uuid)
self.storage_conn.get_server(self.admin_context, uuid)
def test_update_server(self):
# Create a server
server = self.create_server_fixture()
values = self.server_fixtures[1]
updated = self.driver.update_server(self.admin_context, server['id'],
values)
updated = self.storage_conn.update_server(
self.admin_context, server['id'], values)
self.assertEqual(str(updated['name']), str(values['name']))
self.assertEqual(str(updated['ipv4']), str(values['ipv4']))
@ -181,25 +186,26 @@ class DatabaseDriverTestCase(DatabaseTestCase):
values = self.server_fixtures[0]
with self.assertRaises(exceptions.DuplicateServer):
self.driver.update_server(self.admin_context, server['id'], values)
self.storage_conn.update_server(
self.admin_context, server['id'], values)
def test_update_server_missing(self):
with self.assertRaises(exceptions.ServerNotFound):
uuid = 'caf771fc-6b05-4891-bee1-c2a48621f57b'
self.driver.update_server(self.admin_context, uuid, {})
self.storage_conn.update_server(self.admin_context, uuid, {})
def test_delete_server(self):
server = self.create_server_fixture(fixture=0)
self.driver.delete_server(self.admin_context, server['id'])
self.storage_conn.delete_server(self.admin_context, server['id'])
with self.assertRaises(exceptions.ServerNotFound):
self.driver.get_server(self.admin_context, server['id'])
self.storage_conn.get_server(self.admin_context, server['id'])
def test_delete_server_missing(self):
with self.assertRaises(exceptions.ServerNotFound):
uuid = 'caf771fc-6b05-4891-bee1-c2a48621f57b'
self.driver.delete_server(self.admin_context, uuid)
self.storage_conn.delete_server(self.admin_context, uuid)
def test_create_domain(self):
raise SkipTest()

View File

@ -14,19 +14,15 @@
# License for the specific language governing permissions and limitations
# under the License.
from moniker.openstack.common import log as logging
from moniker.tests.database import DatabaseDriverTestCase
from moniker.tests.storage import StorageDriverTestCase
from moniker import exceptions
LOG = logging.getLogger(__name__)
class SqlalchemyTest(DatabaseDriverTestCase):
class SqlalchemyTest(StorageDriverTestCase):
__test__ = True
def setUp(self):
super(SqlalchemyTest, self).setUp()
self.config(database_driver='sqlalchemy')
# def create_server(self, **kwargs):
# context = kwargs.pop('context', self.get_admin_context())
# service = kwargs.pop('service', self.get_central_service())

View File

@ -15,6 +15,8 @@
# License for the specific language governing permissions and limitations
# under the License.
from setuptools import setup, find_packages
import textwrap
from moniker.openstack.common import setup as common_setup
install_requires = common_setup.parse_requirements(['tools/pip-requires'])
@ -46,11 +48,14 @@ setup(
'bin/moniker-api',
'bin/moniker-central',
],
entry_points={
'moniker.cli': [
'database init = moniker.cli.database:InitCommand',
'database sync = moniker.cli.database:SyncCommand',
],
},
cmdclass=common_setup.get_cmdclass(),
entry_points=textwrap.dedent("""
[moniker.storage]
mysql = moniker.storage.impl_sqlalchemy:SQLAlchemyStorage
postgresql = moniker.storage.impl_sqlalchemy:SQLAlchemyStorage
sqlite = moniker.storage.impl_sqlalchemy:SQLAlchemyStorage
[moniker.cli]
database init = InitCommand
database sync = moniker.cli.database:SyncCommand
""")
)

View File

@ -6,6 +6,8 @@ ipaddr
PasteDeploy
sqlalchemy-migrate>=0.7.2
https://github.com/managedit/python-monikerclient/tarball/master#egg=monikerclient
stevedore
# From OpenStack Common
routes==1.12.3
iso8601>=0.1.4