congress-tempest-plugin/tests/scenario/congress_ha/test_ha.py

298 lines
12 KiB
Python

# Copyright 2015 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import socket
import subprocess
import tempfile
from oslo_log import log as logging
import six
from tempest.common import credentials_factory as credentials
from tempest import config
from tempest.lib.common.utils import test_utils
from tempest.lib import decorators
from tempest.lib import exceptions
from tempest import manager as tempestmanager
from urllib3 import exceptions as urllib3_exceptions
from congress_tempest_tests.services.policy import policy_client
from congress_tempest_tests.tests.scenario import helper
from congress_tempest_tests.tests.scenario import manager_congress
CONF = config.CONF
LOG = logging.getLogger(__name__)
class TestHA(manager_congress.ScenarioPolicyBase):
def setUp(self):
super(TestHA, self).setUp()
self.keypairs = {}
self.servers = []
self.replicas = {}
self.services_client = self.os_admin.identity_services_v3_client
self.endpoints_client = self.os_admin.endpoints_v3_client
self.client = self.os_admin.congress_client
def _prepare_replica(self, port_num):
replica_url = "http://127.0.0.1:%d" % port_num
resp = self.services_client.create_service(
name='congressha',
type=CONF.congressha.replica_type,
description='policy ha service')
self.replica_service_id = resp['service']['id']
resp = self.endpoints_client.create_endpoint(
service_id=self.replica_service_id,
region=CONF.identity.region,
interface='public',
url=replica_url)
self.replica_endpoint_id = resp['endpoint']['id']
def _cleanup_replica(self):
self.endpoints_client.delete_endpoint(self.replica_endpoint_id)
self.services_client.delete_service(self.replica_service_id)
def start_replica(self, port_num):
self._prepare_replica(port_num)
f = tempfile.NamedTemporaryFile(mode='w', suffix='.conf',
prefix='congress%d-' % port_num,
dir='/tmp', delete=False)
conf_file = f.name
template = open('/etc/congress/congress.conf')
conf = template.read()
# Add 'bind_port' and 'datasource_sync_period' to conf file.
index = conf.find('[DEFAULT]') + len('[DEFAULT]\n')
conf = (conf[:index] +
'bind_port = %d\n' % port_num +
conf[index:])
# set datasource sync period interval to 5
conf = conf.replace('datasource_sync_period = 30',
'datasource_sync_period = 5')
sindex = conf.find('signing_dir')
conf = conf[:sindex] + '#' + conf[sindex:]
conf = conf + '\n[dse]\nbus_id = replica-node\n'
LOG.debug("Configuration file for replica: %s\n", conf)
f.write(conf)
f.close()
# start all services on replica node
api = self.start_service('api', conf_file)
pe = self.start_service('policy-engine', conf_file)
data = self.start_service('datasources', conf_file)
assert port_num not in self.replicas
LOG.debug("successfully started replica services\n")
self.replicas[port_num] = ({'API': api, 'PE': pe, 'DS': data},
conf_file)
def start_service(self, name, conf_file):
service = '--' + name
node = name + '-replica-node'
args = ['bin/congress-server', service,
'--node-id', node, '--config-file', conf_file]
p = subprocess.Popen(args, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
cwd=helper.root_path())
return p
def stop_replica(self, port_num):
procs, conf_file = self.replicas[port_num]
# Using proc.terminate() will block at proc.wait(), no idea why yet
# kill all processes
for p in procs.values():
p.kill()
p.wait()
os.unlink(conf_file)
self.replicas[port_num] = (None, conf_file)
self._cleanup_replica()
def create_client(self, client_type):
creds = credentials.get_configured_admin_credentials('identity_admin')
auth_prov = tempestmanager.get_auth_provider(creds)
return policy_client.PolicyClient(
auth_prov, client_type,
CONF.identity.region)
def _check_replica_server_status(self, client):
try:
LOG.debug("Check replica server status")
client.list_policy()
LOG.debug("replica server ready")
return True
except exceptions.Unauthorized:
LOG.debug("connection refused")
return False
except (socket.error, urllib3_exceptions.MaxRetryError):
LOG.debug("Replica server not ready")
return False
except Exception:
raise
return False
def find_fake(self, client):
datasources = client.list_datasources()
for r in datasources['results']:
if r['name'] == 'fake':
LOG.debug('existing fake driver: %s', str(r['id']))
return r['id']
return None
def _check_resource_exists(self, client, resource):
try:
body = None
if resource == 'datasource':
LOG.debug("Check datasource exists")
body = self.client.list_datasource_status('fake')
else:
LOG.debug("Check policy exists")
body = self.client.list_policy_status('fake')
LOG.debug("resource status: %s", str(body))
except exceptions.NotFound:
LOG.debug("resource 'fake' not found")
return False
return True
def _check_resource_missing(self, client, resource):
return not self._check_resource_exists(client, resource)
def create_fake(self, client):
# Create fake datasource if it does not exist. Returns the
# fake datasource id.
fake_id = self.find_fake(client)
if fake_id:
return fake_id
item = {'id': None,
'name': 'fake',
'driver': 'fake_datasource',
'config': {"username": "fakeu",
"tenant_name": "faket",
"password": "fakep",
"auth_url": "http://127.0.0.1:5000/v2"},
'description': 'bar',
'enabled': True}
ret = client.create_datasource(item)
LOG.debug('created fake driver: %s', str(ret['id']))
return ret['id']
@decorators.attr(type='smoke')
def test_datasource_db_sync_add_remove(self):
# Verify that a replica adds a datasource when a datasource
# appears in the database.
replica_server = False
try:
# Check fake if exists. else create
fake_id = self.create_fake(self.client)
# Start replica
self.start_replica(CONF.congressha.replica_port)
replica_client = self.create_client(CONF.congressha.replica_type)
# Check replica server status
if not test_utils.call_until_true(
func=lambda: self._check_replica_server_status(
replica_client),
duration=60, sleep_for=1):
for port_num in self.replicas:
procs = self.replicas[port_num][0]
for service_key in procs:
output, error = procs[service_key].communicate()
LOG.debug("Replica port %s service %s logs: %s",
port_num,
service_key,
six.StringIO(output.decode()).getvalue())
raise exceptions.TimeoutException("Replica Server not ready")
# Relica server is up
replica_server = True
# primary server might sync later than replica server due to
# diff in datasource sync interval(P-30, replica-5). So checking
# replica first
# Verify that replica server synced fake dataservice and policy
if not test_utils.call_until_true(
func=lambda: self._check_resource_exists(
replica_client, 'datasource'),
duration=60, sleep_for=1):
raise exceptions.TimeoutException(
"replica doesn't have fake dataservice, data sync failed")
if not test_utils.call_until_true(
func=lambda: self._check_resource_exists(
replica_client, 'policy'),
duration=60, sleep_for=1):
raise exceptions.TimeoutException(
"replica doesn't have fake policy, policy sync failed")
# Verify that primary server synced fake dataservice and policy
if not test_utils.call_until_true(
func=lambda: self._check_resource_exists(
self.client, 'datasource'),
duration=90, sleep_for=1):
raise exceptions.TimeoutException(
"primary doesn't have fake dataservice, data sync failed")
if not test_utils.call_until_true(
func=lambda: self._check_resource_exists(
self.client, 'policy'),
duration=90, sleep_for=1):
raise exceptions.TimeoutException(
"primary doesn't have fake policy, policy sync failed")
# Remove fake from primary server instance.
LOG.debug("removing fake datasource %s", str(fake_id))
self.client.delete_datasource(fake_id)
# Verify that replica server has no fake datasource and fake policy
if not test_utils.call_until_true(
func=lambda: self._check_resource_missing(
replica_client, 'datasource'),
duration=60, sleep_for=1):
raise exceptions.TimeoutException(
"replica still has fake dataservice, sync failed")
if not test_utils.call_until_true(
func=lambda: self._check_resource_missing(
replica_client, 'policy'),
duration=60, sleep_for=1):
raise exceptions.TimeoutException(
"replica still fake policy, policy synchronizer failed")
LOG.debug("removed fake datasource from replica instance")
# Verify that primary server has no fake datasource and fake policy
if not test_utils.call_until_true(
func=lambda: self._check_resource_missing(
self.client, 'datasource'),
duration=90, sleep_for=1):
raise exceptions.TimeoutException(
"primary still has fake dataservice, sync failed")
if not test_utils.call_until_true(
func=lambda: self._check_resource_missing(
self.client, 'policy'),
duration=90, sleep_for=1):
raise exceptions.TimeoutException(
"primary still fake policy, policy synchronizer failed")
LOG.debug("removed fake datasource from primary instance")
finally:
if replica_server:
self.stop_replica(CONF.congressha.replica_port)