Add datasource synchronizer

The datasource synchronizer reads from the configuration database on a
periodic basis and checks whether the active datasources match the
configuration in the database.  If the two do to match, the
synchronizer changes the active datasources to match the database.

By default, datasource synchronization is off.  To enable it, add a
line like this
   datasource_sync_period = 60
to the congress.conf configuration file.

This change includes a change to the datasource status API semantics.
The previous version of the status handler first checks that the
datasource exists in the database; if the datasource does not the
handler responds to the client with and error because the running
state in the server always matches the database content because there
can be just one Congress server modifying the database.  With the
basic high-availability solution, one congress server replica can
modify the database without a second replica knowing, so the running
state of the datasources can become out of sync with the database.

The new version of the datasource status API call returns the status
of the datasource if the datasource is running in the server, even if
the database does not contain the datasource.

This change also makes the list-datasources API call update the
running datasources to match the database after fetching the
datasource configuration from the database.

This also adds a tempest test to verify that datasource
synchronization works.

Implements blueprint: basic-high-availability
Change-Id: I732c965c616f0f11d4d30f7f90b0cce980f9342d
This commit is contained in:
Alexander Yip 2015-03-11 17:23:57 -07:00
parent e362c7dbe2
commit edfbbe4c21
15 changed files with 521 additions and 76 deletions

View File

@ -30,11 +30,12 @@ def d6service(name, keys, inbox, datapath, args):
class DatasourceModel(deepsix.deepSix):
"""Model for handling API requests about Datasources."""
def __init__(self, name, keys, inbox=None, dataPath=None,
policy_engine=None):
policy_engine=None, synchronizer=None):
super(DatasourceModel, self).__init__(name, keys, inbox=inbox,
dataPath=dataPath)
self.engine = policy_engine
self.datasource_mgr = datasource_manager.DataSourceManager()
self.synchronizer = synchronizer
def get_items(self, params, context=None):
"""Get items in model.
@ -51,6 +52,13 @@ class DatasourceModel(deepsix.deepSix):
datasources = self.datasource_mgr.get_datasources(filter_secret=True)
results = [self.datasource_mgr.make_datasource_dict(datasource)
for datasource in datasources]
# Check that running datasources match the datasources in the
# database since this is going to tell the client about those
# datasources, and the running datasources should match the
# datasources we show the client.
if self.synchronizer:
self.synchronizer.synchronize()
return {"results": results}
def add_item(self, item, params, id_=None, context=None):

View File

@ -14,8 +14,9 @@
#
from congress.api import webservice
from congress.dse import d6cage
from congress.dse import deepsix
from congress.managers import datasource as datasource_manager
from congress.exception import NotFound
def d6service(name, keys, inbox, datapath, args):
@ -28,8 +29,7 @@ class StatusModel(deepsix.deepSix):
policy_engine=None):
super(StatusModel, self).__init__(name, keys, inbox=inbox,
dataPath=dataPath)
self.engine = policy_engine
self.datasource_mgr = datasource_manager.DataSourceManager()
self.cage = d6cage.d6Cage()
def get_item(self, id_, params, context=None):
"""Retrieve item with id id_ from model.
@ -49,13 +49,11 @@ class StatusModel(deepsix.deepSix):
"The only element that currently has a status is datasource "
"but ds-id does not exist in context: " + str(context))
datasource = context.get('ds_id')
try:
datasource = self.datasource_mgr.get_datasource(
datasource)
except datasource_manager.DatasourceNotFound as e:
raise webservice.DataModelException(e.code, e.message,
http_status_code=e.code)
service = self.cage.getservice(id_=context['ds_id'],
type_='datasource_driver')
if service:
return service['object'].get_status()
service_obj = self.engine.d6cage.service_object(datasource['name'])
return service_obj.get_status()
raise webservice.DataModelException(NotFound.code,
'Could not find service %s' % id_,
http_status_code=NotFound.code)

View File

@ -54,6 +54,9 @@ core_opts = [
cfg.ListOpt('drivers',
default=[],
help=_("List of driver class paths to import.")),
cfg.IntOpt('datasource_sync_period', default=0,
help='The number of seconds to wait between synchronizing '
'datasource config from the database '),
]
# Register the configuration options

View File

@ -107,6 +107,8 @@ class d6Cage(deepsix.deepSix):
self.services[self.name]['description'] = cageDesc
self.services[self.name]['inbox'] = self.inbox
self.services[self.name]['keys'] = self.keys
self.services[self.name]['type'] = None
self.services[self.name]['id'] = None
self.subscribe(
"local.d6cage",
@ -114,9 +116,6 @@ class d6Cage(deepsix.deepSix):
callback=self.updateRoutes,
interval=5)
# Set of service names that we deem special
self.system_service_names = set([self.name])
def __del__(self):
# This function gets called when the interpreter deletes the object
# by the automatic garbage cleanup
@ -203,6 +202,7 @@ class d6Cage(deepsix.deepSix):
self.loadModule(section, filename)
def deleteservice(self, name):
self.log_info("deleting service: %s", name)
eventlet.greenthread.kill(self.services[name]['object'])
self.greenThreads.remove(self.services[name]['object'])
self.table.remove(name, self.services[name]['inbox'])
@ -217,7 +217,9 @@ class d6Cage(deepsix.deepSix):
description="",
moduleName="",
args={},
module_driver=False):
module_driver=False,
type_=None,
id_=None):
self.log_info("creating service %s with module %s and args %s",
name, moduleName, args)
@ -235,7 +237,7 @@ class d6Cage(deepsix.deepSix):
if not module_driver and moduleName not in sys.modules:
raise DataServiceError(
"error loading service" + name +
"error loading service " + name +
": module " + moduleName + " does not exist")
if not module_driver and name in self.services:
@ -274,6 +276,8 @@ class d6Cage(deepsix.deepSix):
self.services[name]['args'] = args
self.services[name]['object'] = svcObject
self.services[name]['inbox'] = inbox
self.services[name]['type'] = type_
self.services[name]['id'] = id_
try:
self.table.add(name, inbox)
@ -292,6 +296,21 @@ class d6Cage(deepsix.deepSix):
raise DataServiceError(
"error starting service '%s': %s" % (name, errmsg))
def getservices(self):
return self.services
def getservice(self, id_=None, type_=None, name=None):
# Returns the first service that matches all non-None parameters.
for name_, service in self.services.items():
if (id_ and service.get('id', None) and id_ != service['id']):
continue
if type_ and type_ != service['type']:
continue
if name and name_ != name:
continue
return service
return None
def updateRoutes(self, msg):
keyData = self.getSubData(msg.correlationId, sender=msg.replyTo)
currentKeys = set(keyData.data)

View File

@ -26,6 +26,8 @@ from congress import exception
from congress.managers import datasource as datasource_manager
from congress.openstack.common import log as logging
from oslo.config import cfg
LOG = logging.getLogger(__name__)
@ -45,7 +47,6 @@ def create(rootdir, statedir, config_override=None):
# create message bus
cage = d6cage.d6Cage()
cage.system_service_names.add(cage.name)
# read in datasource configurations
@ -67,7 +68,6 @@ def create(rootdir, statedir, config_override=None):
if statedir is not None:
engine.load_dir(statedir)
engine.initialize_table_subscriptions()
cage.system_service_names.add(engine.name)
engine.debug_mode() # should take this out for production
# add policy api
@ -79,7 +79,6 @@ def create(rootdir, statedir, config_override=None):
moduleName="API-policy",
description="API-policy DSE instance",
args={'policy_engine': engine})
cage.system_service_names.add('api-policy')
# add rule api
api_path = os.path.join(src_path, "api/rule_model.py")
@ -90,7 +89,6 @@ def create(rootdir, statedir, config_override=None):
moduleName="API-rule",
description="API-rule DSE instance",
args={'policy_engine': engine})
cage.system_service_names.add('api-rule')
# add table api
api_path = os.path.join(src_path, "api/table_model.py")
@ -101,7 +99,6 @@ def create(rootdir, statedir, config_override=None):
moduleName="API-table",
description="API-table DSE instance",
args={'policy_engine': engine})
cage.system_service_names.add('api-table')
# add row api
api_path = os.path.join(src_path, "api/row_model.py")
@ -112,18 +109,6 @@ def create(rootdir, statedir, config_override=None):
moduleName="API-row",
description="API-row DSE instance",
args={'policy_engine': engine})
cage.system_service_names.add('api-row')
# add datasource api
api_path = os.path.join(src_path, "api/datasource_model.py")
LOG.info("main::start() api_path: %s", api_path)
cage.loadModule("API-datasource", api_path)
cage.createservice(
name="api-datasource",
moduleName="API-datasource",
description="API-datasource DSE instance",
args={'policy_engine': engine})
cage.system_service_names.add('api-datasource')
# add status api
api_path = os.path.join(src_path, "api/status_model.py")
@ -134,7 +119,6 @@ def create(rootdir, statedir, config_override=None):
moduleName="API-status",
description="API-status DSE instance",
args={'policy_engine': engine})
cage.system_service_names.add('api-status')
# add schema api
api_path = os.path.join(src_path, "api/schema_model.py")
@ -145,7 +129,6 @@ def create(rootdir, statedir, config_override=None):
moduleName="API-schema",
description="API-schema DSE instance",
args={'policy_engine': engine})
cage.system_service_names.add('api-schema')
# add datasource/config api
api_path = os.path.join(src_path, "api/datasource_config_model.py")
@ -156,7 +139,6 @@ def create(rootdir, statedir, config_override=None):
moduleName="API-config",
description="API-config DSE instance",
args={'policy_engine': engine})
cage.system_service_names.add('api-config')
# add path for system/datasource-drivers
api_path = os.path.join(src_path, "api/system/driver_model.py")
@ -167,7 +149,6 @@ def create(rootdir, statedir, config_override=None):
moduleName="API-system",
description="API-system DSE instance",
args={'policy_engine': engine})
cage.system_service_names.add('api-system')
# Load policies from database
for policy in db_policy_rules.get_policies():
@ -221,7 +202,9 @@ def create(rootdir, statedir, config_override=None):
cage.createservice(name=driver['name'],
moduleName=driver_info['module'],
args=driver['config'],
module_driver=True)
module_driver=True,
type_='datasource_driver',
id_=driver['id'])
except d6cage.DataServiceError:
# FIXME(arosen): If createservice raises congress-server
# dies here. So we catch this exception so the server does
@ -241,10 +224,33 @@ def create(rootdir, statedir, config_override=None):
parsed_rule,
{'policy_id': rule.policy_name})
# Start datasource synchronizer after explicitly starting the
# datasources, because the explicit call to create a datasource
# will crash if the synchronizer creates the datasource first.
synchronizer_path = os.path.join(src_path, "synchronizer.py")
LOG.info("main::start() synchronizer: %s", synchronizer_path)
cage.loadModule("Synchronizer", synchronizer_path)
cage.createservice(
name="synchronizer",
moduleName="Synchronizer",
description="DB synchronizer instance",
args={'poll_time': cfg.CONF.datasource_sync_period})
synchronizer = cage.service_object('synchronizer')
# add datasource api
api_path = os.path.join(src_path, "api/datasource_model.py")
LOG.info("main::start() api_path: %s", api_path)
cage.loadModule("API-datasource", api_path)
cage.createservice(
name="api-datasource",
moduleName="API-datasource",
description="API-datasource DSE instance",
args={'policy_engine': engine, 'synchronizer': synchronizer})
return cage
def load_data_service(service_name, config, cage, rootdir):
def load_data_service(service_name, config, cage, rootdir, id_):
"""Load service.
Load a service if not already loaded. Also loads its
@ -253,6 +259,7 @@ def load_data_service(service_name, config, cage, rootdir):
CONFIG: dictionary of configuration values
CAGE: instance to load service into
ROOTDIR: dir for start of module paths
ID: UUID of the service.
"""
config = copy.copy(config)
if service_name in cage.services:
@ -271,4 +278,4 @@ def load_data_service(service_name, config, cage, rootdir):
LOG.info("Trying to create service %s with module %s",
service_name, module_name)
cage.createservice(name=service_name, moduleName=module_name,
args=config)
args=config, type_='datasource_driver', id_=id_)

View File

@ -24,47 +24,61 @@ from congress.db import api as db
from congress.db import datasources as datasources_db
from congress.dse import d6cage
from congress import exception
from congress.openstack.common import log as logging
from congress.openstack.common import uuidutils
LOG = logging.getLogger(__name__)
class DataSourceManager(object):
loaded_drivers = {}
@classmethod
def add_datasource(cls, item, deleted=False):
def add_datasource(cls, item, deleted=False, update_db=True):
req = cls.make_datasource_dict(item)
driver_info = cls.validate_create_datasource(req)
# If update_db is True, new_id will get a new value from the db.
new_id = req['id']
driver_info = cls.get_driver_info(item['driver'])
session = db.get_session()
try:
with session.begin(subtransactions=True):
datasource = datasources_db.add_datasource(
id_=req['id'],
name=req['name'],
driver=req['driver'],
config=req['config'],
description=req['description'],
enabled=req['enabled'],
session=session)
datasource = cls.make_datasource_dict(datasource)
LOG.debug("adding datasource %s", req['name'])
if update_db:
LOG.debug("updating db")
datasource = datasources_db.add_datasource(
id_=req['id'],
name=req['name'],
driver=req['driver'],
config=req['config'],
description=req['description'],
enabled=req['enabled'],
session=session)
new_id = datasource['id']
cage = d6cage.d6Cage()
engine = cage.service_object('engine')
try:
engine.create_policy(datasource['name'])
LOG.debug("creating policy %s", req['name'])
engine.create_policy(req['name'])
except KeyError:
# FIXME(arosen): we need a better exception then
# key error being raised here
raise DatasourceNameInUse(name=req['name'])
cage.createservice(name=datasource['name'],
cage.createservice(name=req['name'],
moduleName=driver_info['module'],
args=datasource['config'],
module_driver=True)
args=item['config'],
module_driver=True,
type_='datasource_driver',
id_=new_id)
service = cage.service_object(req['name'])
engine.set_schema(req['name'], service.get_schema())
except db_exc.DBDuplicateEntry:
raise DatasourceNameInUse(name=req['name'])
return cls.make_datasource_dict(datasource)
new_item = dict(item)
new_item['id'] = new_id
return cls.make_datasource_dict(new_item)
@classmethod
def validate_configured_drivers(cls):
@ -106,7 +120,12 @@ class DataSourceManager(object):
@classmethod
def get_datasources(cls, filter_secret=False):
"""Return the created datasources."""
"""Return the created datasources.
This returns what datasources the database contains, not the
datasources that this server instance is running.
"""
results = []
for datasouce_driver in datasources_db.get_datasources():
result = cls.make_datasource_dict(datasouce_driver)
@ -152,7 +171,7 @@ class DataSourceManager(object):
return obj.get_schema()
@classmethod
def delete_datasource(cls, datasource_id):
def delete_datasource(cls, datasource_id, update_db=True):
datasource = cls.get_datasource(datasource_id)
session = db.get_session()
with session.begin(subtransactions=True):
@ -165,10 +184,11 @@ class DataSourceManager(object):
raise e
except KeyError:
raise DatasourceNotFound(id=datasource_id)
result = datasources_db.delete_datasource(
datasource_id, session)
if not result:
raise DatasourceNotFound(id=datasource_id)
if update_db:
result = datasources_db.delete_datasource(
datasource_id, session)
if not result:
raise DatasourceNotFound(id=datasource_id)
cage.deleteservice(datasource['name'])
@classmethod

View File

@ -88,7 +88,7 @@ def main():
"search paths ~/.congress/, ~/, /etc/congress/, /etc/) and "
"the '--config-file' option!")
config.setup_logging()
LOG.info("Starting congress server")
LOG.info("Starting congress server on port %d", cfg.CONF.bind_port)
# API resource runtime encapsulation:
# event loop -> wsgi server -> webapp -> resource manager

111
congress/synchronizer.py Normal file
View File

@ -0,0 +1,111 @@
# Copyright (c) 2015 VMware, Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# The Synchronizer class performs periodic polling of the database to
# keep datasources in the Congress server in sync with the
# configuration in the database. This is important because there may
# be more than one replica of the Congress server, each of which is
# able to modify the datasource configuration in the database.
import datetime
from congress.dse import d6cage
from congress.dse import deepsix
from congress.managers import datasource as datasource_manager
from congress.openstack.common import log as logging
LOG = logging.getLogger(__name__)
def d6service(name, keys, inbox, datapath, args):
return Synchronizer(name, keys, inbox, datapath, args)
class Synchronizer(deepsix.deepSix):
def __init__(self, name, keys, inbox, datapath, args):
super(Synchronizer, self).__init__(name, keys, inbox, datapath)
LOG.debug("init")
if 'poll_time' in args:
self.poll_time = int(args['poll_time'])
else:
self.poll_time = 0
self.last_poll_time = None
self.last_update = None
self.datasource_mgr = datasource_manager.DataSourceManager()
def set_poll_time(self, time):
self.poll_time = time
def d6run(self):
if self.poll_time:
if self.last_poll_time is None:
self.synchronize()
self.last_poll_time = datetime.datetime.now()
else:
now = datetime.datetime.now()
diff = now - self.last_poll_time
seconds = diff.seconds + diff.days * 24 * 3600
if seconds > self.poll_time:
self.synchronize()
self.last_poll_time = datetime.datetime.now()
def synchronize(self):
LOG.debug("Synchronizing running datasources")
cage = d6cage.d6Cage()
datasources = self.datasource_mgr.get_datasources(filter_secret=False)
# Look for datasources in the db, but not in the cage.
for configured_ds in datasources:
active_ds = cage.service_object(configured_ds['name'])
if active_ds is not None:
active_config = cage.getservice(name=configured_ds['name'])
if not self._config_eq(configured_ds, active_config):
LOG.debug('configured and active disagree: (%s) %s %s',
str(active_ds), str(configured_ds),
str(active_config))
LOG.info('Reloading datasource: %s', str(configured_ds))
self.datasource_mgr.delete_datasource(configured_ds['id'],
update_db=False)
self.datasource_mgr.add_datasource(
configured_ds,
update_db=False)
else:
LOG.info('Configured datasource is not active, adding: %s',
str(configured_ds))
self.datasource_mgr.add_datasource(configured_ds,
update_db=False)
# Look for datasources in the cage, but not in the db. This
# need not compare the configuration, because the above
# comparison would have already checked the configuration.
configured_dicts = dict((ds['name'], ds) for ds in datasources)
LOG.debug("configured dicts: %s", str(configured_dicts))
LOG.debug("active services: %s", str(cage.getservices()))
for name, service in cage.getservices().items():
LOG.debug('active datasource: %s', service['name'])
if (service['type'] == 'datasource_driver' and
not configured_dicts.get(service['name'], None)):
LOG.info('Active datasource is not configured, removing: %s',
service['name'])
cage.deleteservice(service['name'])
engine = cage.service_object('engine')
engine.delete_policy(service['name'])
def _config_eq(self, db_config, active_config):
return (db_config['name'] == active_config['name'] and
db_config['config'] == active_config['args'])

View File

@ -242,8 +242,8 @@ def retry_check_db_equal(policy, query, correct, target=None):
else:
actual = policy.select(query, target=target)
if not db_equal(actual, correct, output_diff=False):
raise Exception("Query {} does not produce {}".format(
str(query), str(correct)))
raise Exception("Query {} produces {}, should produce {}".format(
str(query), str(actual), str(correct)))
@retrying.retry(stop_max_attempt_number=1000, wait_fixed=100)

View File

@ -70,9 +70,9 @@ class TestSetPolicy(base.TestCase):
'module': "congress/tests/fake_datasource.py"}}
harness.load_data_service("vmplace", config['vmplace'],
self.cage, helper.root_path())
self.cage, helper.root_path(), 1)
harness.load_data_service("fake", config['fake'],
self.cage, helper.root_path())
self.cage, helper.root_path(), 2)
self.vmplace = self.cage.service_object('vmplace')
self.vmplace.debug_mode()

View File

@ -74,6 +74,11 @@ class TestCongress(base.SqlTestCase):
cage = harness.create(helper.root_path(), helper.state_path(),
config_override)
# Disable synchronizer because the this test creates
# datasources without also inserting them into the database.
# The synchronizer would delete these datasources.
cage.service_object('synchronizer').set_poll_time(0)
engine = cage.service_object('engine')
api = {'policy': cage.service_object('api-policy'),
@ -85,9 +90,9 @@ class TestCongress(base.SqlTestCase):
'schema': cage.service_object('api-schema')}
config = {'username': 'demo',
'auth_url': 'http://127.0.0.1:5000/v2.0',
'tenant_name': 'demo',
'password': 'password',
'auth_url': 'http://127.0.0.1:5000/v2.0',
'tenant_name': 'demo',
'password': 'password',
'module': 'datasources/neutron_driver.py',
'poll_time': 0}
@ -98,18 +103,18 @@ class TestCongress(base.SqlTestCase):
engine.create_policy('nova')
harness.load_data_service(
'neutron', config, cage,
os.path.join(helper.root_path(), "congress"))
os.path.join(helper.root_path(), "congress"), 1)
service = cage.service_object('neutron')
engine.set_schema('neutron', service.get_schema())
harness.load_data_service(
'neutron2', config, cage,
os.path.join(helper.root_path(), "congress"))
os.path.join(helper.root_path(), "congress"), 2)
engine.set_schema('neutron2', service.get_schema())
config['module'] = 'datasources/nova_driver.py'
harness.load_data_service(
'nova', config, cage,
os.path.join(helper.root_path(), "congress"))
os.path.join(helper.root_path(), "congress"), 3)
engine.set_schema('nova', service.get_schema())
cage.service_object('neutron').neutron = neutron_mock

View File

@ -125,7 +125,7 @@ function configure_congress {
CONGRESS_DRIVERS+="congress.datasources.cloudfoundryv2_driver.CloudFoundryV2Driver,"
CONGRESS_DRIVERS+="congress.datasources.murano_driver.MuranoDriver,"
CONGRESS_DRIVERS+="congress.datasources.ironic_driver.IronicDriver"
CONGRESS_DRIVERS+="congress.tests.fake_datasource.FakeDataSource"
iniset $CONGRESS_CONF DEFAULT drivers $CONGRESS_DRIVERS

View File

@ -0,0 +1,271 @@
# Copyright 2015 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from congressclient.v1 import client as congress_client
import keystoneclient
from tempest import config
from tempest import exceptions
from tempest.openstack.common import log as logging
from tempest.scenario import manager_congress
from tempest import test
import os
import subprocess
import tempfile
CONF = config.CONF
LOG = logging.getLogger(__name__)
class TestHA(manager_congress.ScenarioPolicyBase):
def setUp(self):
super(TestHA, self).setUp()
self.keypairs = {}
self.servers = []
self.replicas = {}
def start_replica(self, port_num):
f = tempfile.NamedTemporaryFile(mode='w', suffix='.conf',
prefix='congress%d-' % port_num,
dir='/tmp', delete=False)
conf_file = f.name
template = open('/etc/congress/congress.conf')
conf = template.read()
conf = conf.replace('# bind_port = 1789',
'bind_port = %d\n' % port_num)
conf = conf.replace('# datasource_sync_period = 60',
'datasource_sync_period = 5')
f.write(conf)
f.close()
args = ['/usr/bin/python',
'bin/congress-server',
'--config-file',
conf_file]
out = tempfile.NamedTemporaryFile(mode='w', suffix='.out',
prefix='congress%d-' % port_num,
dir='/tmp', delete=False)
err = tempfile.NamedTemporaryFile(mode='w', suffix='.err',
prefix='congress%d-' % port_num,
dir='/tmp', delete=False)
p = subprocess.Popen(args, stdout=out, stderr=err,
cwd='/opt/stack/congress')
assert port_num not in self.replicas
self.replicas[port_num] = (p, conf_file)
def stop_replica(self, port_num):
proc, conf_file = self.replicas[port_num]
proc.terminate()
proc.wait()
os.unlink(conf_file)
self.replicas[port_num] = (None, conf_file)
def create_client(self, port_num):
creds = self.admin_credentials()
auth = keystoneclient.auth.identity.v2.Password(
auth_url=CONF.identity.uri,
username=creds.username,
password=creds.password,
tenant_name=creds.tenant_name)
session = keystoneclient.session.Session(auth=auth)
return congress_client.Client(
session=session,
auth=None,
endpoint_override='http://127.0.0.1:%d' % port_num,
region_name=CONF.identity.region)
def datasource_exists(self, client, datasource_id):
try:
LOG.debug("datasource_exists begin")
body = client.list_datasource_status(datasource_id)
LOG.debug("list_datasource_status: %s", str(body))
except Exception as e:
if hasattr(e, 'http_status') and e.http_status == 404:
LOG.debug("not found")
return False
elif isinstance(e, keystoneclient.exceptions.ConnectionRefused):
LOG.debug("connection refused")
return False
raise
return True
def datasource_missing(self, client, datasource_id):
try:
LOG.debug("datasource_missing begin")
body = client.list_datasource_status(datasource_id)
LOG.debug("list_datasource_status: %s", str(body))
except Exception as e:
if hasattr(e, 'http_status') and e.http_status == 404:
LOG.debug("not found")
return True
elif isinstance(e, keystoneclient.exceptions.ConnectionRefused):
LOG.debug("connection refused")
return False
raise
return False
def find_fake(self, client):
datasources = client.list_datasources()
for r in datasources['results']:
if r['name'] == 'fake':
LOG.debug('existing fake driver: %s', str(r['id']))
return r['id']
return None
def create_fake(self, client):
# Create fake datasource if it does not exist. Returns the
# fake datasource id.
fake_id = self.find_fake(client)
if fake_id:
return fake_id
item = {'id': None,
'name': 'fake',
'driver': 'fake_datasource',
'config': '{}',
'description': 'bar',
'enabled': True}
ret = client.create_datasource(item)
LOG.debug('created fake driver: %s', str(ret['id']))
return ret['id']
@test.attr(type='smoke')
@test.services('compute',)
def test_datasource_db_sync_add(self):
# Verify that a replica adds a datasource when a datasource
# appears in the database.
CLIENT2_PORT = 4001
client1 = self.admin_manager.congress_client
# delete fake if it exists.
old_fake_id = self.find_fake(client1)
if old_fake_id:
client1.delete_datasource(old_fake_id)
# Verify that primary server has no fake datasource
if not test.call_until_true(
func=lambda: self.datasource_missing(client1, old_fake_id),
duration=60, sleep_for=1):
raise exceptions.TimeoutException(
"primary should not have fake, but does")
need_to_delete_fake = False
try:
# Create a new fake datasource
fake_id = self.create_fake(client1)
need_to_delete_fake = True
# Verify that primary server has fake datasource
if not test.call_until_true(
func=lambda: self.datasource_exists(client1, fake_id),
duration=60, sleep_for=1):
raise exceptions.TimeoutException(
"primary should have fake, but does not")
# start replica
self.start_replica(CLIENT2_PORT)
# Create session for second server.
client2 = self.create_client(CLIENT2_PORT)
# Verify that second server has fake datasource
if not test.call_until_true(
func=lambda: self.datasource_exists(client2, fake_id),
duration=60, sleep_for=1):
raise exceptions.TimeoutException(
"replica should have fake, but does not")
# Remove fake from primary server instance.
LOG.debug("removing fake datasource %s", str(fake_id))
client1.delete_datasource(fake_id)
need_to_delete_fake = False
# Confirm that fake is gone from primary server instance.
if not test.call_until_true(
func=lambda: self.datasource_missing(client1, fake_id),
duration=60, sleep_for=1):
self.stop_replica(CLIENT2_PORT)
raise exceptions.TimeoutException(
"primary instance still has fake")
LOG.debug("removed fake datasource from primary instance")
# Confirm that second service instance removes fake.
if not test.call_until_true(
func=lambda: self.datasource_missing(client2, fake_id),
duration=60, sleep_for=1):
raise exceptions.TimeoutException(
"replica should remove fake, but still has it")
finally:
self.stop_replica(CLIENT2_PORT)
if need_to_delete_fake:
self.admin_manager.congress_client.delete_datasource(fake_id)
@test.attr(type='smoke')
@test.services('compute',)
def test_datasource_db_sync_remove(self):
# Verify that a replica removes a datasource when a datasource
# disappears from the database.
CLIENT2_PORT = 4001
client1 = self.admin_manager.congress_client
fake_id = self.create_fake(client1)
need_to_delete_fake = True
try:
self.start_replica(CLIENT2_PORT)
# Verify that primary server has fake datasource
if not test.call_until_true(
func=lambda: self.datasource_exists(client1, fake_id),
duration=60, sleep_for=1):
raise exceptions.TimeoutException(
"primary should have fake, but does not")
# Create session for second server.
client2 = self.create_client(CLIENT2_PORT)
# Verify that second server has fake datasource
if not test.call_until_true(
func=lambda: self.datasource_exists(client2, fake_id),
duration=60, sleep_for=1):
raise exceptions.TimeoutException(
"replica should have fake, but does not")
# Remove fake from primary server instance.
LOG.debug("removing fake datasource %s", str(fake_id))
client1.delete_datasource(fake_id)
need_to_delete_fake = False
# Confirm that fake is gone from primary server instance.
if not test.call_until_true(
func=lambda: self.datasource_missing(client1, fake_id),
duration=60, sleep_for=1):
self.stop_replica(CLIENT2_PORT)
raise exceptions.TimeoutException(
"primary instance still has fake")
LOG.debug("removed fake datasource from primary instance")
# Confirm that second service instance removes fake.
if not test.call_until_true(
func=lambda: self.datasource_missing(client2, fake_id),
duration=60, sleep_for=1):
raise exceptions.TimeoutException(
"replica should remove fake, but still has it")
finally:
self.stop_replica(CLIENT2_PORT)
if need_to_delete_fake:
self.admin_manager.congress_client.delete_datasource(fake_id)

View File

@ -44,6 +44,9 @@
# For example: congress.datasources.neutronv2_driver.NeutronV2Driver, etc
# datasource_drivers = []
# Seconds between polling database to sync datasources.
# datasource_sync_period = 60
[keystone_authtoken]
auth_host = 127.0.0.1
auth_port = 35357