Pool Manager Service Changes

- Added the Pool Manager cache (including test cases)
- Added the pool_manager_status table (including test cases)
- Added the PoolManagerStatus object
- Added the PoolServer object
- Added the BackendOption object
- Added support for global and server specific backend configurations
- Added a backend driver for pool manager (BIND9)
- Stubbed out the pool manager service (including periodic_sync)

Change-Id: I3d94e5df663d2938b0e6017fe43c28c71f7a29a0
Partially-implements: blueprint server-pools-service
This commit is contained in:
rjrjr 2014-10-24 15:12:35 -07:00
parent d9a8ae7a39
commit bc17db433c
34 changed files with 1944 additions and 264 deletions

86
contrib/pmdomain.py Normal file
View File

@ -0,0 +1,86 @@
# Copyright (C) 2014 eBay Inc.
#
# Author: Ron Rickard <rrickard@ebaysf.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import sys
import getopt
import eventlet
from oslo.config import cfg
from designate import utils
from designate.pool_manager import rpcapi
from designate.context import DesignateContext
from designate.objects import Domain
from designate import rpc
# This is required to ensure ampq works without hanging.
eventlet.monkey_patch(os=False)
def main(argv):
# TODO(Ron): remove this application once unit testing is in place.
usage = 'pmdomain.py -c <domain-name> | -d <domain-name>'
domain_name = None
create = False
delete = False
try:
opts, args = getopt.getopt(
argv, "hc:d:", ["help", "create=", "delete="])
except getopt.GetoptError:
print('%s' % usage)
sys.exit(2)
for opt, arg in opts:
if opt in ("-h", "--help"):
print('%s' % usage)
sys.exit()
elif opt in ("-c", "--create"):
create = True
domain_name = arg
elif opt in ("-d", "--delete"):
delete = True
domain_name = arg
if (delete and create) or (not delete and not create):
print('%s' % usage)
sys.exit(2)
# Read the Designate configuration file.
utils.read_config('designate', [])
rpc.init(cfg.CONF)
context = DesignateContext.get_admin_context(
tenant=utils.generate_uuid(),
user=utils.generate_uuid())
pool_manager_api = rpcapi.PoolManagerAPI()
# For the BIND9 backend, all that is needed is a name.
values = {
'name': domain_name
}
domain = Domain(**values)
if create:
pool_manager_api.create_domain(context, domain)
if delete:
pool_manager_api.delete_domain(context, domain)
if __name__ == "__main__":
main(sys.argv[1:])

View File

@ -39,6 +39,8 @@ cfg.CONF.register_opts([
cfg.StrOpt('central-topic', default='central', help='Central Topic'),
cfg.StrOpt('agent-topic', default='agent', help='Agent Topic'),
cfg.StrOpt('mdns-topic', default='mdns', help='mDNS Topic'),
cfg.StrOpt('pool-manager-topic', default='pool_manager',
help='Pool Manager Topic'),
# Default TTL
cfg.IntOpt('default-ttl', default=3600),

View File

@ -15,6 +15,7 @@
# under the License.
from designate.openstack.common import log as logging
from designate.backend.base import Backend
from designate.backend.base import PoolBackend
LOG = logging.getLogger(__name__)
@ -25,3 +26,18 @@ def get_backend(backend_driver, central_service):
cls = Backend.get_driver(backend_driver)
return cls(central_service=central_service)
def get_server_object(backend_driver, server_id):
LOG.debug("Loading pool backend driver: %s" % backend_driver)
cls = PoolBackend.get_driver(backend_driver)
return cls.get_server_object(backend_driver, server_id)
def get_pool_backend(backend_driver, backend_options):
LOG.debug("Loading pool backend driver: %s" % backend_driver)
cls = PoolBackend.get_driver(backend_driver)
return cls(backend_options)

View File

@ -15,11 +15,15 @@
# under the License.
import abc
from oslo.config import cfg
import designate.pool_manager.backend_section_name as backend_section_name
from designate.openstack.common import log as logging
from designate.i18n import _LW
from designate import exceptions
from designate.context import DesignateContext
from designate.plugin import DriverPlugin
from designate import objects
LOG = logging.getLogger(__name__)
@ -155,3 +159,170 @@ class Backend(DriverPlugin):
return {
'status': None
}
class PoolBackend(Backend):
def __init__(self, backend_options):
super(PoolBackend, self).__init__(None)
self.backend_options = backend_options
@classmethod
def _create_server_object(cls, backend, server_id, backend_options,
server_section_name):
"""
Create the server object.
"""
server_values = {
'id': server_id,
'host': cfg.CONF[server_section_name].host,
'port': cfg.CONF[server_section_name].port,
'backend': backend,
'backend_options': backend_options,
'tsig_key': cfg.CONF[server_section_name].tsig_key
}
return objects.PoolServer(**server_values)
@classmethod
def _create_backend_option_objects(cls, global_section_name,
server_section_name):
"""
Create the backend_option object list.
"""
backend_options = []
for key in cfg.CONF[global_section_name].keys():
backend_option = cls._create_backend_option_object(
key, global_section_name, server_section_name)
backend_options.append(backend_option)
return backend_options
@classmethod
def _create_backend_option_object(cls, key, global_section_name,
server_section_name):
"""
Create the backend_option object. If a server specific backend option
value exists, use it. Otherwise use the global backend option value.
"""
value = cfg.CONF[server_section_name].get(key)
if value is None:
value = cfg.CONF[global_section_name].get(key)
backend_option_values = {
'key': key,
'value': value
}
return objects.BackendOption(**backend_option_values)
@classmethod
def _register_opts(cls, backend, server_id):
"""
Register the global and server specific backend options.
"""
global_section_name = backend_section_name \
.generate_global_section_name(backend)
server_section_name = backend_section_name \
.generate_server_section_name(backend, server_id)
# Register the global backend options.
global_opts = cls.get_cfg_opts()
cfg.CONF.register_group(cfg.OptGroup(name=global_section_name))
cfg.CONF.register_opts(global_opts, group=global_section_name)
# Register the server specific backend options.
server_opts = global_opts
server_opts.append(cfg.StrOpt('host', help='Server Host'))
server_opts.append(cfg.IntOpt('port', default=53, help='Server Port'))
server_opts.append(cfg.StrOpt('tsig-key', help='Server TSIG Key'))
cfg.CONF.register_group(cfg.OptGroup(name=server_section_name))
cfg.CONF.register_opts(server_opts, group=server_section_name)
# Ensure the server specific backend options do not have a default
# value. This is necessary so the default value does not override
# a global backend option value set in the configuration file.
for key in cfg.CONF[global_section_name].keys():
cfg.CONF.set_default(key, None, group=server_section_name)
return global_section_name, server_section_name
@abc.abstractmethod
def get_cfg_opts(self):
"""
Get the configuration options.
"""
@classmethod
def get_server_object(cls, backend, server_id):
"""
Get the server object from the backend driver for the server_id.
"""
global_section_name, server_section_name = cls._register_opts(
backend, server_id)
backend_options = cls._create_backend_option_objects(
global_section_name, server_section_name)
return cls._create_server_object(
backend, server_id, backend_options, server_section_name)
def get_backend_option(self, key):
"""
Get the backend option value using the backend option key.
"""
for backend_option in self.backend_options:
if backend_option['key'] == key:
return backend_option['value']
def create_tsigkey(self, context, tsigkey):
pass
def update_tsigkey(self, context, tsigkey):
pass
def delete_tsigkey(self, context, tsigkey):
pass
def create_server(self, context, server):
pass
def update_server(self, context, server):
pass
def delete_server(self, context, server):
pass
@abc.abstractmethod
def create_domain(self, context, domain):
"""Create a DNS domain"""
def update_domain(self, context, domain):
pass
@abc.abstractmethod
def delete_domain(self, context, domain):
"""Delete a DNS domain"""
def create_recordset(self, context, domain, recordset):
pass
def update_recordset(self, context, domain, recordset):
pass
def delete_recordset(self, context, domain, recordset):
pass
def create_record(self, context, domain, recordset, record):
pass
def update_record(self, context, domain, recordset, record):
pass
def delete_record(self, context, domain, recordset, record):
pass
def sync_domain(self, context, domain, records):
pass
def sync_record(self, context, domain, record):
pass
def ping(self, context):
pass

View File

@ -0,0 +1,80 @@
# Copyright 2014 eBay Inc.
#
# Author: Ron Rickard <rrickard@ebay.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo.config import cfg
from designate.openstack.common import log as logging
from designate import utils
from designate.backend import base
LOG = logging.getLogger(__name__)
cfg_opts = [
cfg.ListOpt('masters', help="Master servers from which to transfer from"),
cfg.StrOpt('rndc-host', default='127.0.0.1', help='RNDC Host'),
cfg.IntOpt('rndc-port', default=953, help='RNDC Port'),
cfg.StrOpt('rndc-config-file', default=None, help='RNDC Config File'),
cfg.StrOpt('rndc-key-file', default=None, help='RNDC Key File'),
]
class Bind9PoolBackend(base.PoolBackend):
__plugin_name__ = 'bind9_pool'
@classmethod
def get_cfg_opts(cls):
return cfg_opts
def create_domain(self, context, domain):
LOG.debug('Create Domain')
masters = '; '.join(self.get_backend_option('masters'))
rndc_op = [
'addzone',
'%s { type slave; masters { %s;}; file "%s.slave"; };' %
(domain['name'], masters, domain['name']),
]
self._execute_rndc(rndc_op)
def delete_domain(self, context, domain):
LOG.debug('Delete Domain')
rndc_op = [
'delzone',
'%s' % domain['name'],
]
self._execute_rndc(rndc_op)
def _rndc_base(self):
rndc_call = [
'rndc',
'-s', self.get_backend_option('rndc_host'),
'-p', str(self.get_backend_option('rndc_port')),
]
if self.get_backend_option('rndc_config_file'):
rndc_call.extend(
['-c', self.get_backend_option('rndc_config_file')])
if self.get_backend_option('rndc_key_file'):
rndc_call.extend(
['-k', self.get_backend_option('rndc_key_file')])
return rndc_call
def _execute_rndc(self, rndc_op):
rndc_call = self._rndc_base()
rndc_call.extend(rndc_op)
LOG.debug('Executing RNDC call: %s' % " ".join(rndc_call))
utils.execute(*rndc_call)

View File

@ -0,0 +1,38 @@
# Copyright 2014 eBay Inc.
#
# Author: Ron Rickard <rrickard@ebaysf.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import sys
from oslo.config import cfg
from designate import service
from designate import utils
from designate.openstack.common import log as logging
from designate.pool_manager import service as pool_manager_service
CONF = cfg.CONF
CONF.import_opt('workers', 'designate.pool_manager',
group='service:pool_manager')
def main():
utils.read_config('designate', sys.argv)
logging.setup('designate')
server = pool_manager_service.Service.create(
binary='designate-pool-manager')
service.serve(server, workers=CONF['service:pool_manager'].workers)
service.wait()

View File

@ -208,6 +208,10 @@ class DuplicateBlacklist(Duplicate):
error_type = 'duplicate_blacklist'
class DuplicatePoolManagerStatus(Duplicate):
error_type = 'duplication_pool_manager_status'
class MethodNotAllowed(Base):
expected = True
error_code = 405
@ -256,6 +260,10 @@ class ReportNotFound(NotFound):
error_type = 'report_not_found'
class PoolManagerStatusNotFound(NotFound):
error_type = 'pool_manager_status_not_found'
class LastServerDeleteNotAllowed(BadRequest):
error_type = 'last_server_delete_not_allowed'

View File

@ -0,0 +1,59 @@
# Copyright 2012 Managed I.T.
#
# Author: Kiall Mac Innes <kiall@managedit.ie>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
from migrate.versioning import api as versioning_api
from oslo.config import cfg
from designate.openstack.common import log as logging
from designate.manage import base
from designate.sqlalchemy import utils
LOG = logging.getLogger(__name__)
REPOSITORY = os.path.abspath(os.path.join(os.path.dirname(__file__), '..',
'pool_manager',
'cache', 'impl_sqlalchemy',
'migrate_repo'))
cfg.CONF.import_opt('connection',
'designate.pool_manager.cache.impl_sqlalchemy',
group='pool_manager_cache:sqlalchemy')
CONF = cfg.CONF
def get_manager():
return utils.get_migration_manager(
REPOSITORY, CONF['pool_manager_cache:sqlalchemy'].connection)
class DatabaseCommands(base.Commands):
def version(self):
current = get_manager().version()
latest = versioning_api.version(repository=REPOSITORY).value
print("Current: %s Latest: %s" % (current, latest))
def sync(self):
get_manager().upgrade(None)
@base.args('revision', nargs='?')
def upgrade(self, revision):
get_manager().upgrade(revision)
@base.args('revision', nargs='?')
def downgrade(self, revision):
get_manager().downgrade(revision)

View File

@ -17,8 +17,11 @@
from designate.objects.base import DesignateObject # noqa
from designate.objects.base import DictObjectMixin # noqa
from designate.objects.base import ListObjectMixin # noqa
from designate.objects.backend_option import BackendOption, BackendOptionList # noqa
from designate.objects.blacklist import Blacklist, BlacklistList # noqa
from designate.objects.domain import Domain, DomainList # noqa
from designate.objects.pool_manager_status import PoolManagerStatus, PoolManagerStatusList # noqa
from designate.objects.pool_server import PoolServer, PoolServerList # noqa
from designate.objects.quota import Quota, QuotaList # noqa
from designate.objects.rrdata_a import RRData_A # noqa
from designate.objects.rrdata_aaaa import RRData_AAAA # noqa

View File

@ -0,0 +1,27 @@
# Copyright 2014 eBay Inc.
#
# Author: Ron Rickard <rrickard@ebaysf.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from designate.objects import base
class BackendOption(base.DictObjectMixin, base.DesignateObject):
FIELDS = {
'key': {},
'value': {}
}
class BackendOptionList(base.ListObjectMixin, base.DesignateObject):
LIST_ITEM_TYPE = BackendOption

View File

@ -0,0 +1,31 @@
# Copyright 2014 eBay Inc.
#
# Author: Ron Rickard <rrickard@ebaysf.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from designate.objects import base
class PoolManagerStatus(base.DictObjectMixin, base.PersistentObjectMixin,
base.DesignateObject):
FIELDS = {
'server_id': {},
'domain_id': {},
'status': {},
'serial_number': {},
'action': {}
}
class PoolManagerStatusList(base.ListObjectMixin, base.DesignateObject):
LIST_ITEM_TYPE = PoolManagerStatus

View File

@ -0,0 +1,32 @@
# Copyright 2014 eBay Inc.
#
# Author: Ron Rickard <rrickard@ebaysf.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from designate.objects import base
# TODO(Ron): replace the Server object with this object.
class PoolServer(base.DictObjectMixin, base.DesignateObject):
FIELDS = {
'id': {},
'host': {},
'port': {},
'backend': {},
'backend_options': {},
'tsig_key': {}
}
class PoolServerList(base.ListObjectMixin, base.DesignateObject):
LIST_ITEM_TYPE = PoolServer

View File

@ -0,0 +1,46 @@
# Copyright 2014 eBay Inc.
#
# Author: Ron Rickard <rrickard@ebaysf.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo.config import cfg
cfg.CONF.register_group(cfg.OptGroup(
name='service:pool_manager', title="Configuration for Pool Manager Service"
))
OPTS = [
cfg.IntOpt('workers', default=None,
help='Number of Pool Manager worker processes to spawn'),
cfg.StrOpt('pool-name', default='default',
help='The name of the pool managed by this instance of the '
'Pool Manager'),
cfg.IntOpt('threshold-percentage', default=100,
help='The percentage of servers requiring a successful update '
'for a domain change to be considered active'),
cfg.IntOpt('poll-timeout', default=30,
help='The time to wait for a NOTIFY response from a name '
'server'),
cfg.IntOpt('poll-retry-interval', default=2,
help='The time between retrying to send a NOTIFY request and '
'waiting for a NOTIFY response'),
cfg.IntOpt('poll-max-retries', default=3,
help='The maximum number of times minidns will retry sending '
'a NOTIFY request and wait for a NOTIFY response'),
cfg.IntOpt('periodic-sync-interval', default=120,
help='The time between synchronizing the servers with Storage'),
cfg.StrOpt('cache-driver', default='sqlalchemy',
help='The cache driver to use'),
]
cfg.CONF.register_opts(OPTS, group='service:pool_manager')

View File

@ -0,0 +1,136 @@
# Copyright 2014 eBay Inc.
#
# Author: Ron Rickard <rrickard@ebaysf.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import re
from oslo.config import iniparser
from designate import utils
GLOBAL_SECTION_NAME_LABEL = '*'
SECTION_NAME_PREFIX = 'backend'
SECTION_NAME_SEPARATOR = ':'
UUID_PATTERN = '-'.join([
'[0-9A-Fa-f]{8}',
'[0-9A-Fa-f]{4}',
'[0-9A-Fa-f]{4}',
'[0-9A-Fa-f]{4}',
'[0-9A-Fa-f]{12}'
])
SECTION_PATTERN = SECTION_NAME_SEPARATOR.join([
'^%s' % SECTION_NAME_PREFIX,
'(.*)',
'(%s)' % UUID_PATTERN
])
SECTION_LABELS = [
'backend',
'server_id'
]
class SectionNameParser(iniparser.BaseParser):
"""
Used to retrieve the configuration file section names and parse the names.
"""
def __init__(self, pattern, labels):
super(SectionNameParser, self).__init__()
self.regex = re.compile(pattern)
self.labels = labels
self.sections = []
def assignment(self, key, value):
pass
def new_section(self, section):
match = self.regex.match(section)
if match:
value = {
'name': section
}
index = 1
for label in self.labels:
value[label] = match.group(index)
index += 1
self.sections.append(value)
def parse(self, filename):
with open(filename) as f:
return super(SectionNameParser, self).parse(f)
@classmethod
def find_sections(cls, filename, pattern, labels):
parser = cls(pattern, labels)
parser.parse(filename)
return parser.sections
def find_server_sections():
"""
Find the server specific backend section names.
A server specific backend section name is:
[backend:<backend_driver>:<server_id>]
"""
config_files = utils.find_config('designate.conf')
all_sections = []
for filename in config_files:
sections = SectionNameParser.find_sections(
filename, SECTION_PATTERN, SECTION_LABELS)
all_sections.extend(sections)
return all_sections
def _generate_section_name(backend_driver, label):
"""
Generate the section name.
A section name is:
[backend:<backend_driver>:<label>]
"""
return SECTION_NAME_SEPARATOR.join([
SECTION_NAME_PREFIX,
backend_driver,
label
])
def generate_global_section_name(backend_driver):
"""
Generate the global backend section name.
A global backend section name is:
[backend:<backend_driver>:*]
"""
return _generate_section_name(backend_driver, GLOBAL_SECTION_NAME_LABEL)
def generate_server_section_name(backend_driver, server_id):
"""
Generate the server specific backend section name.
A server specific backend section name is:
[backend:<backend_driver>:<server_id>]
"""
return _generate_section_name(backend_driver, server_id)

View File

@ -0,0 +1,26 @@
# Copyright 2014 eBay Inc.
#
# Author: Ron Rickard <rrickard@ebaysf.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from designate.openstack.common import log as logging
from designate.pool_manager.cache.base import PoolManagerCache
LOG = logging.getLogger(__name__)
def get_pool_manager_cache(cache_driver):
"""Return the engine class from the provided engine name"""
cls = PoolManagerCache.get_driver(cache_driver)
return cls()

89
designate/pool_manager/cache/base.py vendored Normal file
View File

@ -0,0 +1,89 @@
# Copyright 2014 eBay Inc.
#
# Author: Ron Rickard <rrickard@ebaysf.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import six
from designate.plugin import DriverPlugin
@six.add_metaclass(abc.ABCMeta)
class PoolManagerCache(DriverPlugin):
"""Base class for cache plugins"""
__plugin_ns__ = 'designate.pool_manager.cache'
__plugin_type__ = 'pool_manager_cache'
@abc.abstractmethod
def create_pool_manager_status(self, context, pool_manager_status):
"""
Create a pool manager status.
:param context: Security context information
:param pool_manager_status: Pool manager status object to create
"""
@abc.abstractmethod
def get_pool_manager_status(self, context, pool_manager_status_id):
"""
Get a pool manager status by ID.
:param context: Security context information
:param pool_manager_status_id: Pool manager status ID to get
"""
@abc.abstractmethod
def find_pool_manager_statuses(self, context, criterion=None, marker=None,
limit=None, sort_key=None, sort_dir=None):
"""
Find pool manager statuses.
:param context: Security context information
:param criterion: Criteria to filter by
:param marker: Resource ID from which after the requested page will
start after
:param limit: Integer limit of objects of the page size after the
marker
:param sort_key: Key from which to sort after
:param sort_dir: Direction to sort after using sort_key
"""
@abc.abstractmethod
def find_pool_manager_status(self, context, criterion):
"""
Find a single pool manager status.
:param context: Security context information
:param criterion: Criteria to filter by
"""
@abc.abstractmethod
def update_pool_manager_status(self, context, pool_manager_status):
"""
Update a pool manager status
:param context: Security context information
:param pool_manager_status: Pool manager status object to update
"""
@abc.abstractmethod
def delete_pool_manager_status(self, context, pool_manager_status_id):
"""
Delete a pool manager status
:param context: Security context information
:param pool_manager_status_id: Pool manager status ID to delete
"""

View File

@ -0,0 +1,88 @@
# Copyright 2014 eBay Inc.
#
# Author: Ron Rickard <rrickard@ebaysf.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo.config import cfg
from oslo.db import options
from designate.openstack.common import log as logging
from designate import exceptions
from designate import objects
from designate.pool_manager.cache import base as cache_base
from designate.sqlalchemy import base as sqlalchemy_base
from designate.pool_manager.cache.impl_sqlalchemy import tables
LOG = logging.getLogger(__name__)
cfg.CONF.register_group(cfg.OptGroup(
name='pool_manager_cache:sqlalchemy',
title="Configuration for SQLAlchemy Pool Manager Cache"
))
cfg.CONF.register_opts(options.database_opts,
group='pool_manager_cache:sqlalchemy')
class SQLAlchemyPoolManagerCache(sqlalchemy_base.SQLAlchemy,
cache_base.PoolManagerCache):
"""SQLAlchemy connection"""
__plugin_name__ = 'sqlalchemy'
def __init__(self):
super(SQLAlchemyPoolManagerCache, self).__init__()
def get_name(self):
return self.name
def _find_pool_manager_statuses(self, context, criterion, one=False,
marker=None, limit=None, sort_key=None,
sort_dir=None):
return self._find(
context, tables.pool_manager_statuses, objects.PoolManagerStatus,
objects.PoolManagerStatusList,
exceptions.PoolManagerStatusNotFound, criterion, one, marker,
limit, sort_key, sort_dir)
def create_pool_manager_status(self, context, pool_manager_status):
return self._create(
tables.pool_manager_statuses, pool_manager_status,
exceptions.DuplicatePoolManagerStatus)
def get_pool_manager_status(self, context, pool_manager_status_id):
return self._find_pool_manager_statuses(
context, {'id': pool_manager_status_id}, one=True)
def find_pool_manager_statuses(self, context, criterion=None, marker=None,
limit=None, sort_key=None, sort_dir=None):
return self._find_pool_manager_statuses(
context, criterion, marker=marker, limit=limit, sort_key=sort_key,
sort_dir=sort_dir)
def find_pool_manager_status(self, context, criterion):
return self._find_pool_manager_statuses(context, criterion, one=True)
def update_pool_manager_status(self, context, pool_manager_status):
return self._update(
context, tables.pool_manager_statuses, pool_manager_status,
exceptions.DuplicatePoolManagerStatus,
exceptions.PoolManagerStatusNotFound)
def delete_pool_manager_status(self, context, pool_manager_status_id):
pool_manager_status = self._find_pool_manager_statuses(
context, {'id': pool_manager_status_id}, one=True)
return self._delete(
context, tables.pool_manager_statuses, pool_manager_status,
exceptions.PoolManagerStatusNotFound)

View File

@ -0,0 +1,4 @@
This is a database migration repository for the project Designate.
More information at
http://code.google.com/p/sqlalchemy-migrate/

View File

@ -0,0 +1,24 @@
#!/usr/bin/env python
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012 Hewlett-Packard Development Company, L.P.
# All Rights Reserved.
#
# Author: Patrick Galbraith <patg@hp.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from migrate.versioning.shell import main
if __name__ == '__main__':
main(debug='False')

View File

@ -0,0 +1,25 @@
[db_settings]
# Used to identify which repository this database is versioned under.
# You can use the name of your project.
repository_id=DesignatePoolManager
# The name of the database table used to track the schema version.
# This name shouldn't already be used by your project.
# If this is changed once a database is under version control, you'll need to
# change the table name in each database too.
version_table=migrate_version
# When committing a change script, Migrate will attempt to generate the
# sql for all supported databases; normally, if one of them fails - probably
# because you don't have that database installed - it is ignored and the
# commit continues, perhaps ending successfully.
# Databases in this list MUST compile successfully during a commit, or the
# entire commit will fail. List the databases your application will actually
# be using to ensure your updates to that database work properly.
# This must be a list; example: ['postgres','sqlite']
required_dbs=[]
# When creating new change scripts, Migrate will stamp the new script with
# a version number. By default this is latest_version + 1. You can set this
# to 'true' to tell Migrate to use the UTC timestamp instead.
use_timestamp_numbering=False

View File

@ -0,0 +1,76 @@
# Copyright 2014 eBay Inc.
#
# Author: Ron Rickard <rrickard@ebaysf.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from sqlalchemy import Integer, DateTime, Enum, UniqueConstraint
from sqlalchemy.schema import Table, Column, MetaData
from designate.sqlalchemy.types import UUID
UPDATE_STATUSES = ['SUCCESS', 'ERROR']
UPDATE_ACTIONS = ['CREATE', 'DELETE', 'UPDATE']
meta = MetaData()
pool_manager_statuses = Table(
'pool_manager_statuses', meta,
Column('id', UUID(), primary_key=True),
Column('version', Integer(), nullable=False),
Column('created_at', DateTime()),
Column('updated_at', DateTime()),
Column('server_id', UUID(), nullable=False),
Column('domain_id', UUID(), nullable=False),
Column('action', Enum(name='update_actions', *UPDATE_ACTIONS),
nullable=False),
Column('status', Enum(name='update_statuses', *UPDATE_STATUSES),
nullable=True),
Column('serial_number', Integer, nullable=False),
UniqueConstraint('server_id', 'domain_id', 'action',
name='unique_pool_manager_status'),
mysql_engine='InnoDB',
mysql_charset='utf8')
def upgrade(migrate_engine):
meta.bind = migrate_engine
with migrate_engine.begin() as conn:
if migrate_engine.name == "mysql":
conn.execute("SET foreign_key_checks = 0;")
elif migrate_engine.name == "postgresql":
conn.execute("SET CONSTRAINTS ALL DEFERRED;")
pool_manager_statuses.create(conn)
if migrate_engine.name == "mysql":
conn.execute("SET foreign_key_checks = 1;")
def downgrade(migrate_engine):
meta.bind = migrate_engine
with migrate_engine.begin() as conn:
if migrate_engine.name == "mysql":
conn.execute("SET foreign_key_checks = 0;")
elif migrate_engine.name == "postgresql":
conn.execute("SET CONSTRAINTS ALL DEFERRED;")
pool_manager_statuses.drop()
if migrate_engine.name == "mysql":
conn.execute("SET foreign_key_checks = 1;")

View File

@ -0,0 +1,50 @@
# Copyright 2014 eBay Inc.
#
# Author: Ron rickard <rrickard@ebaysf.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from sqlalchemy import (Table, MetaData, Column, Integer, DateTime, Enum,
UniqueConstraint, ForeignKeyConstraint)
from oslo.utils import timeutils
from designate import utils
from designate.sqlalchemy.types import UUID
UPDATE_STATUSES = ['SUCCESS', 'ERROR']
UPDATE_ACTIONS = ['CREATE', 'DELETE', 'UPDATE']
metadata = MetaData()
pool_manager_statuses = Table(
'pool_manager_statuses', metadata,
Column('id', UUID, default=utils.generate_uuid, primary_key=True),
Column('version', Integer(), default=1, nullable=False),
Column('created_at', DateTime, default=lambda: timeutils.utcnow()),
Column('updated_at', DateTime, onupdate=lambda: timeutils.utcnow()),
Column('server_id', UUID, nullable=False),
Column('domain_id', UUID, nullable=False),
Column('action', Enum(name='update_actions', *UPDATE_ACTIONS),
nullable=False),
Column('status', Enum(name='update_statuses', *UPDATE_STATUSES),
nullable=True),
Column('serial_number', Integer, nullable=False),
UniqueConstraint('server_id', 'domain_id', 'action',
name='unique_pool_manager_status'),
ForeignKeyConstraint(['domain_id'], ['domains.id']),
mysql_engine='InnoDB',
mysql_charset='utf8',
)

View File

@ -0,0 +1,64 @@
# Copyright 2014 eBay Inc.
#
# Author: Ron Rickard <rrickard@ebaysf.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo.config import cfg
from oslo import messaging
from designate.openstack.common import log as logging
from designate.i18n import _LI
from designate import rpc
LOG = logging.getLogger(__name__)
class PoolManagerAPI(object):
"""
Client side of the Pool Manager RPC API.
API version history:
API version history:
1.0 - Initial version
"""
RPC_API_VERSION = '1.0'
def __init__(self, topic=None):
topic = topic if topic else cfg.CONF.pool_manager_topic
target = messaging.Target(topic=topic, version=self.RPC_API_VERSION)
self.client = rpc.get_client(target, version_cap='1.0')
def create_domain(self, context, domain):
LOG.info(_LI("create_domain: Calling pool manager's create_domain."))
return self.client.cast(
context, 'create_domain', domain=domain)
def delete_domain(self, context, domain):
LOG.info(_LI("delete_domain: Calling pool manager's delete_domain."))
return self.client.cast(
context, 'delete_domain', domain=domain)
def update_domain(self, context, domain):
LOG.info(_LI("update_domain: Calling pool manager's update_domain."))
return self.client.cast(
context, 'update_domain', domain=domain)
def update_status(self, context, domain, server, status, serial_number):
LOG.info(_LI("update_status: Calling pool manager's update_status."))
return self.client.cast(
context, 'update_status', domain=domain, server=server,
status=status, serial_number=serial_number)

View File

@ -0,0 +1,152 @@
# Copyright 2014 eBay Inc.
#
# Author: Ron Rickard <rrickard@ebaysf.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import designate.pool_manager.backend_section_name as backend_section_name
from oslo.config import cfg
from oslo import messaging
from designate import backend
from designate import service
from designate import storage
from designate.pool_manager import cache
from designate.openstack.common import log as logging
from designate.openstack.common import processutils
from designate.openstack.common import threadgroup
LOG = logging.getLogger(__name__)
cfg.CONF.import_opt('storage_driver', 'designate.central', 'service:central')
class Service(service.RPCService):
"""
Service side of the Pool Manager RPC API.
API version history:
1.0 - Initial version
"""
RPC_API_VERSION = '1.0'
target = messaging.Target(version=RPC_API_VERSION)
def __init__(self, *args, **kwargs):
super(Service, self).__init__(*args, **kwargs)
# Get a storage connection.
storage_driver = cfg.CONF['service:central'].storage_driver
self.storage = storage.get_storage(storage_driver)
# Get a pool manager cache connection.
cache_driver = cfg.CONF['service:pool_manager'].cache_driver
self.cache = cache.get_pool_manager_cache(cache_driver)
self.servers = []
self.server_backend_maps = []
sections = backend_section_name.find_server_sections()
for section in sections:
backend_driver = section['backend']
server_id = section['server_id']
server = backend.get_server_object(backend_driver, server_id)
backend_instance = backend.get_pool_backend(
backend_driver, server['backend_options'])
server_backend_map = {
'server': server,
'backend_instance': backend_instance
}
self.servers.append(server)
self.server_backend_maps.append(server_backend_map)
self.thread_group = threadgroup.ThreadGroup()
def start(self):
for server_backend_map in self.server_backend_maps:
backend_instance = server_backend_map['backend_instance']
backend_instance.start()
self.thread_group.add_timer(
cfg.CONF['service:pool_manager'].periodic_sync_interval,
self.periodic_sync)
super(Service, self).start()
def stop(self):
super(Service, self).stop()
self.thread_group.stop(True)
for server_backend_map in self.server_backend_maps:
backend_instance = server_backend_map['backend_instance']
backend_instance.stop()
def create_domain(self, context, domain):
"""
:param context: Security context information.
:param domain: The designate domain object.
:return: None
"""
LOG.debug("Calling create_domain.")
for server_backend_map in self.server_backend_maps:
backend_instance = server_backend_map['backend_instance']
try:
backend_instance.create_domain(context, domain)
LOG.debug("success")
except processutils.ProcessExecutionError:
LOG.debug("failure")
def delete_domain(self, context, domain):
"""
:param context: Security context information.
:param domain: The designate domain object.
:return: None
"""
LOG.debug("Calling delete_domain.")
for server_backend_map in self.server_backend_maps:
backend_instance = server_backend_map['backend_instance']
try:
backend_instance.delete_domain(context, domain)
LOG.debug("success")
except processutils.ProcessExecutionError:
LOG.debug("failure")
def update_domain(self, context, domain):
"""
:param context: Security context information.
:param domain: The designate domain object.
:return: None
"""
LOG.debug("Calling update_domain.")
def update_status(self, context, domain, server, status, serial_number):
"""
:param context: Security context information.
:param domain: The designate domain object.
:param server: The name server for which this serial number is
applicable.
:param status: The status, 'SUCCESS' or 'ERROR'.
:param serial_number: The serial number received from the name server
for the domain.
:return: None
"""
LOG.debug("Calling update_status.")
def periodic_sync(self):
"""
:return: None
"""
LOG.debug("Calling periodic_sync.")

View File

@ -0,0 +1,292 @@
# Copyright 2012 Managed I.T.
#
# Author: Kiall Mac Innes <kiall@managedit.ie>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import threading
import six
from oslo.db.sqlalchemy import utils as oslo_utils
from oslo.db import exception as oslo_db_exception
from oslo.utils import timeutils
from sqlalchemy import exc as sqlalchemy_exc
from sqlalchemy import select
from designate.openstack.common import log as logging
from designate import exceptions
from designate.sqlalchemy import session
from designate.sqlalchemy import utils
LOG = logging.getLogger(__name__)
def _set_object_from_model(obj, model, **extra):
"""Update a DesignateObject with the values from a SQLA Model"""
for fieldname in obj.FIELDS.keys():
if hasattr(model, fieldname):
if fieldname in extra.keys():
obj[fieldname] = extra[fieldname]
else:
obj[fieldname] = getattr(model, fieldname)
obj.obj_reset_changes()
return obj
def _set_listobject_from_models(obj, models, map_=None):
for model in models:
extra = {}
if map_ is not None:
extra = map_(model)
obj.objects.append(
_set_object_from_model(obj.LIST_ITEM_TYPE(), model, **extra))
obj.obj_reset_changes()
return obj
@six.add_metaclass(abc.ABCMeta)
class SQLAlchemy(object):
def __init__(self):
super(SQLAlchemy, self).__init__()
self.engine = session.get_engine(self.get_name())
self.local_store = threading.local()
@abc.abstractmethod
def get_name(self):
"""Get the name."""
@property
def session(self):
# NOTE: This uses a thread local store, allowing each greenthread to
# have it's own session stored correctly. Without this, each
# greenthread may end up using a single global session, which
# leads to bad things happening.
if not hasattr(self.local_store, 'session'):
self.local_store.session = session.get_session(self.get_name())
return self.local_store.session
def begin(self):
self.session.begin(subtransactions=True)
def commit(self):
self.session.commit()
def rollback(self):
self.session.rollback()
def _apply_criterion(self, table, query, criterion):
if criterion is not None:
for name, value in criterion.items():
column = getattr(table.c, name)
# Wildcard value: '*'
if isinstance(value, basestring) and '*' in value:
queryval = value.replace('*', '%')
query = query.where(column.like(queryval))
elif isinstance(value, basestring) and value.startswith('!'):
queryval = value[1:]
query = query.where(column != queryval)
else:
query = query.where(column == value)
return query
def _apply_tenant_criteria(self, context, table, query):
if hasattr(table.c, 'tenant_id'):
if context.all_tenants:
LOG.debug('Including all tenants items in query results')
else:
query = query.where(table.c.tenant_id == context.tenant)
return query
def _apply_deleted_criteria(self, context, table, query):
if hasattr(table.c, 'deleted'):
if context.show_deleted:
LOG.debug('Including deleted items in query results')
else:
query = query.where(table.c.deleted == "0")
return query
def _apply_version_increment(self, context, table, query):
"""
Apply Version Incrementing SQL fragment a Query
This should be called on all UPDATE queries, as it will ensure the
version column is correctly incremented.
"""
if hasattr(table.c, 'version'):
# NOTE(kiall): This will translate into a true SQL increment.
query = query.values({'version': table.c.version + 1})
return query
def _create(self, table, obj, exc_dup, skip_values=None):
values = obj.obj_get_changes()
if skip_values is not None:
for skip_value in skip_values:
values.pop(skip_value, None)
query = table.insert()
try:
resultproxy = self.session.execute(query, [dict(values)])
except oslo_db_exception.DBDuplicateEntry:
raise exc_dup()
# Refetch the row, for generated columns etc
query = select([table]).where(
table.c.id == resultproxy.inserted_primary_key[0])
resultproxy = self.session.execute(query)
return _set_object_from_model(obj, resultproxy.fetchone())
def _find(self, context, table, cls, list_cls, exc_notfound, criterion,
one=False, marker=None, limit=None, sort_key=None,
sort_dir=None, query=None):
sort_key = sort_key or 'created_at'
sort_dir = sort_dir or 'asc'
# Build the query
if query is None:
query = select([table])
query = self._apply_criterion(table, query, criterion)
query = self._apply_tenant_criteria(context, table, query)
query = self._apply_deleted_criteria(context, table, query)
# Execute the Query
if one:
# NOTE(kiall): If we expect one value, and two rows match, we raise
# a NotFound. Limiting to 2 allows us to determine
# when we need to raise, while selecting the minimal
# number of rows.
resultproxy = self.session.execute(query.limit(2))
results = resultproxy.fetchall()
if len(results) != 1:
raise exc_notfound()
else:
return _set_object_from_model(cls(), results[0])
else:
if marker is not None:
# If marker is not none and basestring we query it.
# Otherwise, return all matching records
marker_query = select([table]).where(table.c.id == marker)
try:
marker_resultproxy = self.session.execute(marker_query)
marker = marker_resultproxy.fetchone()
if marker is None:
raise exceptions.MarkerNotFound(
'Marker %s could not be found' % marker)
except oslo_db_exception.DBError as e:
# Malformed UUIDs return StatementError wrapped in a
# DBError
if isinstance(e.inner_exception,
sqlalchemy_exc.StatementError):
raise exceptions.InvalidMarker()
else:
raise
try:
query = utils.paginate_query(
query, table, limit,
[sort_key, 'id', 'created_at'], marker=marker,
sort_dir=sort_dir)
resultproxy = self.session.execute(query)
results = resultproxy.fetchall()
return _set_listobject_from_models(list_cls(), results)
except oslo_utils.InvalidSortKey as sort_key_error:
raise exceptions.InvalidSortKey(sort_key_error.message)
# Any ValueErrors are propagated back to the user as is.
# Limits, sort_dir and sort_key are checked at the API layer.
# If however central or storage is called directly, invalid values
# show up as ValueError
except ValueError as value_error:
raise exceptions.ValueError(value_error.message)
def _update(self, context, table, obj, exc_dup, exc_notfound,
skip_values=None):
values = obj.obj_get_changes()
if skip_values is not None:
for skip_value in skip_values:
values.pop(skip_value, None)
query = table.update()\
.where(table.c.id == obj.id)\
.values(**values)
query = self._apply_tenant_criteria(context, table, query)
query = self._apply_deleted_criteria(context, table, query)
query = self._apply_version_increment(context, table, query)
try:
resultproxy = self.session.execute(query)
except oslo_db_exception.DBDuplicateEntry:
raise exc_dup()
if resultproxy.rowcount != 1:
raise exc_notfound()
# Refetch the row, for generated columns etc
query = select([table]).where(table.c.id == obj.id)
resultproxy = self.session.execute(query)
return _set_object_from_model(obj, resultproxy.fetchone())
def _delete(self, context, table, obj, exc_notfound):
if hasattr(table.c, 'deleted'):
# Perform a Soft Delete
# TODO(kiall): If the object has any changed fields, they will be
# persisted here when we don't want that.
obj.deleted = obj.id.replace('-', '')
obj.deleted_at = timeutils.utcnow()
# NOTE(kiall): It should be impossible for a duplicate exception to
# be raised in this call, therefore, it is OK to pass
# in "None" as the exc_dup param.
return self._update(context, table, obj, None, exc_notfound)
# Delete the quota.
query = table.delete().where(table.c.id == obj.id)
query = self._apply_tenant_criteria(context, table, query)
query = self._apply_deleted_criteria(context, table, query)
resultproxy = self.session.execute(query)
if resultproxy.rowcount != 1:
raise exc_notfound()
# Refetch the row, for generated columns etc
query = select([table]).where(table.c.id == obj.id)
resultproxy = self.session.execute(query)
return _set_object_from_model(obj, resultproxy.fetchone())

View File

@ -181,12 +181,6 @@ class Storage(DriverPlugin):
:param context: RPC Context.
:param criterion: Criteria to filter by.
:param marker: Resource ID from which after the requested page will
start after
:param limit: Integer limit of objects of the page size after the
marker
:param sort_key: Key from which to sort after.
:param sort_dir: Direction to sort after using sort_key.
"""
@abc.abstractmethod
@ -380,7 +374,6 @@ class Storage(DriverPlugin):
Find RecordSets.
:param context: RPC Context.
:param domain_id: Domain ID where the recordsets reside.
:param criterion: Criteria to filter by.
:param marker: Resource ID from which after the requested page will
start after

View File

@ -14,23 +14,17 @@
# License for the specific language governing permissions and limitations
# under the License.
import time
import threading
import hashlib
from oslo.config import cfg
from oslo.db.sqlalchemy import utils as oslo_utils
from oslo.db import options
from oslo.db import exception as oslo_db_exception
from oslo.utils import timeutils
from sqlalchemy import exc as sqlalchemy_exc
from sqlalchemy import select, distinct, func
from designate.openstack.common import log as logging
from designate import exceptions
from designate import objects
from designate.sqlalchemy import session
from designate.sqlalchemy import utils
from designate.storage import base
from designate.sqlalchemy import base as sqlalchemy_base
from designate.storage import base as storage_base
from designate.storage.impl_sqlalchemy import tables
@ -43,261 +37,15 @@ cfg.CONF.register_group(cfg.OptGroup(
cfg.CONF.register_opts(options.database_opts, group='storage:sqlalchemy')
def _set_object_from_model(obj, model, **extra):
"""Update a DesignateObject with the values from a SQLA Model"""
for fieldname in obj.FIELDS.keys():
if hasattr(model, fieldname):
if fieldname in extra.keys():
obj[fieldname] = extra[fieldname]
else:
obj[fieldname] = getattr(model, fieldname)
obj.obj_reset_changes()
return obj
def _set_listobject_from_models(obj, models, map_=None):
for model in models:
extra = {}
if map_ is not None:
extra = map_(model)
obj.objects.append(
_set_object_from_model(obj.LIST_ITEM_TYPE(), model, **extra))
obj.obj_reset_changes()
return obj
class SQLAlchemyStorage(base.Storage):
class SQLAlchemyStorage(sqlalchemy_base.SQLAlchemy, storage_base.Storage):
"""SQLAlchemy connection"""
__plugin_name__ = 'sqlalchemy'
def __init__(self):
super(SQLAlchemyStorage, self).__init__()
self.engine = session.get_engine(self.name)
self.local_store = threading.local()
@property
def session(self):
# NOTE: This uses a thread local store, allowing each greenthread to
# have it's own session stored correctly. Without this, each
# greenthread may end up using a single global session, which
# leads to bad things happening.
if not hasattr(self.local_store, 'session'):
self.local_store.session = session.get_session(self.name)
return self.local_store.session
def begin(self):
self.session.begin(subtransactions=True)
def commit(self):
self.session.commit()
def rollback(self):
self.session.rollback()
def _apply_criterion(self, table, query, criterion):
if criterion is not None:
for name, value in criterion.items():
column = getattr(table.c, name)
# Wildcard value: '*'
if isinstance(value, basestring) and '*' in value:
queryval = value.replace('*', '%')
query = query.where(column.like(queryval))
elif isinstance(value, basestring) and value.startswith('!'):
queryval = value[1:]
query = query.where(column != queryval)
else:
query = query.where(column == value)
return query
def _apply_tenant_criteria(self, context, table, query):
if hasattr(table.c, 'tenant_id'):
if context.all_tenants:
LOG.debug('Including all tenants items in query results')
else:
query = query.where(table.c.tenant_id == context.tenant)
return query
def _apply_deleted_criteria(self, context, table, query):
if hasattr(table.c, 'deleted'):
if context.show_deleted:
LOG.debug('Including deleted items in query results')
else:
query = query.where(table.c.deleted == "0")
return query
def _apply_version_increment(self, context, table, query):
"""
Apply Version Incrementing SQL fragment a Query
This should be called on all UPDATE queries, as it will ensure the
version column is correctly incremented.
"""
if hasattr(table.c, 'version'):
# NOTE(kiall): This will translate into a true SQL increment.
query = query.values({'version': table.c.version + 1})
return query
def _create(self, table, obj, exc_dup, skip_values=None):
values = obj.obj_get_changes()
if skip_values is not None:
for skip_value in skip_values:
values.pop(skip_value, None)
query = table.insert()
try:
resultproxy = self.session.execute(query, [dict(values)])
except oslo_db_exception.DBDuplicateEntry:
raise exc_dup()
# Refetch the row, for generated columns etc
query = select([table]).where(
table.c.id == resultproxy.inserted_primary_key[0])
resultproxy = self.session.execute(query)
return _set_object_from_model(obj, resultproxy.fetchone())
def _find(self, context, table, cls, list_cls, exc_notfound, criterion,
one=False, marker=None, limit=None, sort_key=None,
sort_dir=None, query=None):
sort_key = sort_key or 'created_at'
sort_dir = sort_dir or 'asc'
# Build the query
if query is None:
query = select([table])
query = self._apply_criterion(table, query, criterion)
query = self._apply_tenant_criteria(context, table, query)
query = self._apply_deleted_criteria(context, table, query)
# Execute the Query
if one:
# NOTE(kiall): If we expect one value, and two rows match, we raise
# a NotFound. Limiting to 2 allows us to determine
# when we need to raise, while selecting the minimal
# number of rows.
resultproxy = self.session.execute(query.limit(2))
results = resultproxy.fetchall()
if len(results) != 1:
raise exc_notfound()
else:
return _set_object_from_model(cls(), results[0])
else:
if marker is not None:
# If marker is not none and basestring we query it.
# Otherwise, return all matching records
marker_query = select([table]).where(table.c.id == marker)
try:
marker_resultproxy = self.session.execute(marker_query)
marker = marker_resultproxy.fetchone()
if marker is None:
raise exceptions.MarkerNotFound(
'Marker %s could not be found' % marker)
except oslo_db_exception.DBError as e:
# Malformed UUIDs return StatementError wrapped in a
# DBError
if isinstance(e.inner_exception,
sqlalchemy_exc.StatementError):
raise exceptions.InvalidMarker()
else:
raise
try:
query = utils.paginate_query(
query, table, limit,
[sort_key, 'id', 'created_at'], marker=marker,
sort_dir=sort_dir)
resultproxy = self.session.execute(query)
results = resultproxy.fetchall()
return _set_listobject_from_models(list_cls(), results)
except oslo_utils.InvalidSortKey as sort_key_error:
raise exceptions.InvalidSortKey(sort_key_error.message)
# Any ValueErrors are propagated back to the user as is.
# Limits, sort_dir and sort_key are checked at the API layer.
# If however central or storage is called directly, invalid values
# show up as ValueError
except ValueError as value_error:
raise exceptions.ValueError(value_error.message)
def _update(self, context, table, obj, exc_dup, exc_notfound,
skip_values=None):
values = obj.obj_get_changes()
if skip_values is not None:
for skip_value in skip_values:
values.pop(skip_value, None)
query = table.update()\
.where(table.c.id == obj.id)\
.values(**values)
query = self._apply_tenant_criteria(context, table, query)
query = self._apply_deleted_criteria(context, table, query)
query = self._apply_version_increment(context, table, query)
try:
resultproxy = self.session.execute(query)
except oslo_db_exception.DBDuplicateEntry:
raise exc_dup()
if resultproxy.rowcount != 1:
raise exc_notfound()
# Refetch the row, for generated columns etc
query = select([table]).where(table.c.id == obj.id)
resultproxy = self.session.execute(query)
return _set_object_from_model(obj, resultproxy.fetchone())
def _delete(self, context, table, obj, exc_notfound):
if hasattr(table.c, 'deleted'):
# Perform a Soft Delete
# TODO(kiall): If the object has any changed fields, they will be
# persisted here when we don't want that.
obj.deleted = obj.id.replace('-', '')
obj.deleted_at = timeutils.utcnow()
# NOTE(kiall): It should be impossible for a duplicate exception to
# be raised in this call, therefore, it is OK to pass
# in "None" as the exc_dup param.
return self._update(context, table, obj, None, exc_notfound)
# Delete the quota.
query = table.delete().where(table.c.id == obj.id)
query = self._apply_tenant_criteria(context, table, query)
query = self._apply_deleted_criteria(context, table, query)
resultproxy = self.session.execute(query)
if resultproxy.rowcount != 1:
raise exc_notfound()
# Refetch the row, for generated columns etc
query = select([table]).where(table.c.id == obj.id)
resultproxy = self.session.execute(query)
return _set_object_from_model(obj, resultproxy.fetchone())
def get_name(self):
return self.name
# CRUD for our resources (quota, server, tsigkey, tenant, domain & record)
# R - get_*, find_*s

View File

@ -51,6 +51,11 @@ cfg.CONF.import_opt('auth_strategy', 'designate.api',
group='service:api')
cfg.CONF.import_opt('connection', 'designate.storage.impl_sqlalchemy',
group='storage:sqlalchemy')
cfg.CONF.import_opt('cache_driver', 'designate.pool_manager',
group='service:pool_manager')
cfg.CONF.import_opt('connection',
'designate.pool_manager.cache.impl_sqlalchemy',
group='pool_manager_cache:sqlalchemy')
class NotifierFixture(fixtures.Fixture):
@ -244,6 +249,18 @@ class TestCase(base.BaseTestCase):
'pattern': 'blacklisted.org.'
}]
pool_manager_status_fixtures = [{
'server_id': '1d7a26e6-e604-4aa0-bbc5-d01081bf1f45',
'status': 'SUCCESS',
'serial_number': 1,
'action': 'CREATE',
}, {
'server_id': '1d7a26e6-e604-4aa0-bbc5-d01081bf1f45',
'status': 'ERROR',
'serial_number': 2,
'action': 'DELETE'
}]
def setUp(self):
super(TestCase, self).setUp()
@ -288,6 +305,8 @@ class TestCase(base.BaseTestCase):
group='storage:sqlalchemy'
)
self._setup_pool_manager_cache()
self.config(network_api='fake')
self.config(
managed_resource_tenant_id='managing_tenant',
@ -303,6 +322,25 @@ class TestCase(base.BaseTestCase):
self.admin_context = self.get_admin_context()
def _setup_pool_manager_cache(self):
self.config(
cache_driver='sqlalchemy',
group='service:pool_manager')
repository = os.path.abspath(os.path.join(os.path.dirname(__file__),
'..',
'pool_manager',
'cache',
'impl_sqlalchemy',
'migrate_repo'))
db_fixture = self.useFixture(
DatabaseFixture.get_fixture(repository))
self.config(
connection=db_fixture.url,
connection_debug=50,
group='pool_manager_cache:sqlalchemy')
# Config Methods
def config(self, **kwargs):
group = kwargs.pop('group', None)
@ -429,6 +467,13 @@ class TestCase(base.BaseTestCase):
_values.update(values)
return _values
def get_pool_manager_status_fixture(self, fixture=0, values=None):
values = values or {}
_values = copy.copy(self.pool_manager_status_fixtures[fixture])
_values.update(values)
return _values
def create_server(self, **kwargs):
context = kwargs.pop('context', self.admin_context)
fixture = kwargs.pop('fixture', 0)

View File

@ -0,0 +1,238 @@
# Copyright 2014 eBay Inc.
#
# Author: Ron Rickard <rrickard@ebaysf.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import testtools
from designate import exceptions
from designate import objects
from designate.pool_manager.cache.base import PoolManagerCache
class PoolManagerCacheTestCase(object):
def create_pool_manager_status(self, **kwargs):
context = kwargs.pop('context', self.admin_context)
fixture = kwargs.pop('fixture', 0)
values = kwargs.pop('values', {})
values = self.get_pool_manager_status_fixture(
fixture=fixture, values=values)
return self.cache.create_pool_manager_status(
context, objects.PoolManagerStatus(**values))
# Interface Tests
def test_interface(self):
self._ensure_interface(PoolManagerCache, self.cache.__class__)
# Pool manager status tests
def test_create_pool_manager_status(self):
domain = self.create_domain()
values = {
'server_id': '896aa661-198c-4379-bccd-5d8de7007030',
'domain_id': domain['id'],
'status': 'SUCCESS',
'serial_number': 1,
'action': 'CREATE'
}
expected = objects.PoolManagerStatus(**values)
actual = self.cache.create_pool_manager_status(
self.admin_context, expected)
self.assertIsNotNone(actual['id'])
self.assertIsNotNone(actual['created_at'])
self.assertEqual(expected['server_id'], actual['server_id'])
self.assertEqual(expected['domain_id'], actual['domain_id'])
self.assertEqual(expected['status'], actual['status'])
self.assertEqual(expected['serial_number'], actual['serial_number'])
self.assertEqual(expected['action'], actual['action'])
def test_create_pool_manager_status_duplicate(self):
domain = self.create_domain()
self.create_pool_manager_status(
fixture=0, values={'domain_id': domain['id']})
with testtools.ExpectedException(
exceptions.DuplicatePoolManagerStatus):
self.create_pool_manager_status(
fixture=0, values={'domain_id': domain['id']})
def test_find_pool_manager_statuses(self):
# Verify that there are no pool manager statuses created
actual = self.cache.find_pool_manager_statuses(self.admin_context)
self.assertEqual(0, len(actual))
# Create a Pool manager status
domain = self.create_domain()
expected = self.create_pool_manager_status(
fixture=0, values={'domain_id': domain['id']})
actual = self.cache.find_pool_manager_statuses(self.admin_context)
self.assertEqual(1, len(actual))
self.assertEqual(expected['server_id'], actual[0]['server_id'])
self.assertEqual(expected['domain_id'], actual[0]['domain_id'])
self.assertEqual(expected['status'], actual[0]['status'])
self.assertEqual(expected['serial_number'], actual[0]['serial_number'])
self.assertEqual(expected['action'], actual[0]['action'])
def test_find_pool_manager_statuses_with_criterion(self):
# Create two pool manager statuses
domain = self.create_domain()
expected_one = self.create_pool_manager_status(
fixture=0, values={'domain_id': domain['id']})
expected_two = self.create_pool_manager_status(
fixture=1, values={'domain_id': domain['id']})
# Verify pool_manager_status_one
criterion = dict(action=expected_one['action'])
actuals = self.cache.find_pool_manager_statuses(
self.admin_context, criterion)
self.assertEqual(len(actuals), 1)
self.assertEqual(expected_one['server_id'], actuals[0]['server_id'])
self.assertEqual(expected_one['domain_id'], actuals[0]['domain_id'])
self.assertEqual(expected_one['status'], actuals[0]['status'])
self.assertEqual(
expected_one['serial_number'], actuals[0]['serial_number'])
self.assertEqual(expected_one['action'], actuals[0]['action'])
# Verify pool_manager_status_two
criterion = dict(action=expected_two['action'])
actuals = self.cache.find_pool_manager_statuses(
self.admin_context, criterion)
self.assertEqual(len(actuals), 1)
self.assertEqual(expected_two['server_id'], actuals[0]['server_id'])
self.assertEqual(expected_two['domain_id'], actuals[0]['domain_id'])
self.assertEqual(expected_two['status'], actuals[0]['status'])
self.assertEqual(
expected_two['serial_number'], actuals[0]['serial_number'])
self.assertEqual(expected_two['action'], actuals[0]['action'])
def test_get_pool_manager_status(self):
domain = self.create_domain()
expected = self.create_pool_manager_status(
fixture=0, values={'domain_id': domain['id']})
actual = self.cache.get_pool_manager_status(
self.admin_context, expected['id'])
self.assertEqual(expected['server_id'], actual['server_id'])
self.assertEqual(expected['domain_id'], actual['domain_id'])
self.assertEqual(expected['status'], actual['status'])
self.assertEqual(expected['serial_number'], actual['serial_number'])
self.assertEqual(expected['action'], actual['action'])
def test_get_pool_manager_status_missing(self):
with testtools.ExpectedException(exceptions.PoolManagerStatusNotFound):
uuid = '2c102ffd-7146-4b4e-ad62-b530ee0873fb'
self.cache.get_pool_manager_status(self.admin_context, uuid)
def test_find_pool_manager_status_criterion(self):
# Create two pool manager statuses
domain = self.create_domain()
expected_one = self.create_pool_manager_status(
fixture=0, values={'domain_id': domain['id']})
expected_two = self.create_pool_manager_status(
fixture=1, values={'domain_id': domain['id']})
# Verify pool_manager_status_one
criterion = dict(action=expected_one['action'])
actual = self.cache.find_pool_manager_status(
self.admin_context, criterion)
self.assertEqual(expected_one['server_id'], actual['server_id'])
self.assertEqual(expected_one['domain_id'], actual['domain_id'])
self.assertEqual(expected_one['status'], actual['status'])
self.assertEqual(
expected_one['serial_number'], actual['serial_number'])
self.assertEqual(expected_one['action'], actual['action'])
# Verify pool_manager_status_two
criterion = dict(action=expected_two['action'])
actual = self.cache.find_pool_manager_status(
self.admin_context, criterion)
self.assertEqual(expected_two['server_id'], actual['server_id'])
self.assertEqual(expected_two['domain_id'], actual['domain_id'])
self.assertEqual(expected_two['status'], actual['status'])
self.assertEqual(
expected_two['serial_number'], actual['serial_number'])
self.assertEqual(expected_two['action'], actual['action'])
def test_find_pool_manager_status_criterion_missing(self):
criterion = dict(action='CREATE')
with testtools.ExpectedException(exceptions.PoolManagerStatusNotFound):
self.cache.find_pool_manager_status(
self.admin_context, criterion)
def test_update_pool_manager_status(self):
domain = self.create_domain()
expected = self.create_pool_manager_status(
fixture=0, values={'domain_id': domain['id']})
# Update the blacklist
expected.action = 'UPDATE'
actual = self.cache.update_pool_manager_status(
self.admin_context, expected)
# Verify the new values
self.assertEqual(expected.action, actual['action'])
def test_update_pool_manager_status_duplicate(self):
# Create two pool manager statuses
domain = self.create_domain()
pool_manager_status_one = self.create_pool_manager_status(
fixture=0, values={'domain_id': domain['id']})
pool_manager_status_two = self.create_pool_manager_status(
fixture=1, values={'domain_id': domain['id']})
# Update the second one to be a duplicate of the first
pool_manager_status_two.status = pool_manager_status_one.status
pool_manager_status_two.serial_number = \
pool_manager_status_one.serial_number
pool_manager_status_two.action = pool_manager_status_one.action
with testtools.ExpectedException(
exceptions.DuplicatePoolManagerStatus):
self.cache.update_pool_manager_status(
self.admin_context, pool_manager_status_two)
def test_update_pool_manager_status_missing(self):
pool_manager_status = objects.PoolManagerStatus(
id='e8cee063-3a26-42d6-b181-bdbdc2c99d08')
with testtools.ExpectedException(exceptions.PoolManagerStatusNotFound):
self.cache.update_pool_manager_status(
self.admin_context, pool_manager_status)
def test_delete_pool_manager_status(self):
domain = self.create_domain()
pool_manager_status = self.create_pool_manager_status(
fixture=0, values={'domain_id': domain['id']})
self.cache.delete_pool_manager_status(
self.admin_context, pool_manager_status['id'])
with testtools.ExpectedException(exceptions.PoolManagerStatusNotFound):
self.cache.get_pool_manager_status(
self.admin_context, pool_manager_status['id'])
def test_delete_pool_manager_status_missing(self):
with testtools.ExpectedException(exceptions.PoolManagerStatusNotFound):
uuid = '97f57960-f41b-4e93-8e22-8fd6c7e2c183'
self.cache.delete_pool_manager_status(self.admin_context, uuid)

View File

@ -0,0 +1,25 @@
# Copyright 2014 eBay Inc.
#
# Author: Ron Rickard <rrickard@ebaysf.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from designate.pool_manager import cache
from designate.tests import TestCase
from designate.tests.test_pool_manager.cache import PoolManagerCacheTestCase
class SqlalchemyPoolManagerCacheTest(PoolManagerCacheTestCase, TestCase):
def setUp(self):
super(SqlalchemyPoolManagerCacheTest, self).setUp()
self.cache = cache.get_pool_manager_cache('sqlalchemy')

View File

@ -36,6 +36,7 @@ console_scripts =
designate-central = designate.cmd.central:main
designate-manage = designate.cmd.manage:main
designate-mdns = designate.cmd.mdns:main
designate-pool-manager = designate.cmd.pool_manager:main
designate-sink = designate.cmd.sink:main
designate.api.v1 =
@ -59,12 +60,16 @@ designate.api.v2.extensions =
designate.storage =
sqlalchemy = designate.storage.impl_sqlalchemy:SQLAlchemyStorage
designate.pool_manager.cache =
sqlalchemy = designate.pool_manager.cache.impl_sqlalchemy:SQLAlchemyPoolManagerCache
designate.notification.handler =
nova_fixed = designate.notification_handler.nova:NovaFixedHandler
neutron_floatingip = designate.notification_handler.neutron:NeutronFloatingHandler
designate.backend =
bind9 = designate.backend.impl_bind9:Bind9Backend
bind9_pool = designate.backend.impl_bind9_pool:Bind9PoolBackend
powerdns = designate.backend.impl_powerdns:PowerDNSBackend
powerdns_mdns = designate.backend.impl_powerdns_mdns:PowerDNSMDNSBackend
rpc = designate.backend.impl_rpc:RPCBackend
@ -84,6 +89,7 @@ designate.quota =
designate.manage =
database = designate.manage.database:DatabaseCommands
pool-manager-cache = designate.manage.pool_manager_cache:DatabaseCommands
powerdns = designate.manage.powerdns:DatabaseCommands
powerdns-mdns = designate.manage.powerdns_mdns:DatabaseCommands
tlds = designate.manage.tlds:TLDCommands