# Copyright 2015 OpenStack Foundation # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. import os import socket import subprocess import tempfile from oslo_log import log as logging import six from tempest.common import credentials_factory as credentials from tempest import config from tempest.lib.common.utils import test_utils from tempest.lib import decorators from tempest.lib import exceptions from tempest import manager as tempestmanager from urllib3 import exceptions as urllib3_exceptions from congress_tempest_tests.services.policy import policy_client from congress_tempest_tests.tests.scenario import helper from congress_tempest_tests.tests.scenario import manager_congress CONF = config.CONF LOG = logging.getLogger(__name__) class TestHA(manager_congress.ScenarioPolicyBase): def setUp(self): super(TestHA, self).setUp() self.keypairs = {} self.servers = [] self.replicas = {} self.services_client = self.os_admin.identity_services_v3_client self.endpoints_client = self.os_admin.endpoints_v3_client self.client = self.os_admin.congress_client def _prepare_replica(self, port_num): replica_url = "http://127.0.0.1:%d" % port_num resp = self.services_client.create_service( name='congressha', type=CONF.congressha.replica_type, description='policy ha service') self.replica_service_id = resp['service']['id'] resp = self.endpoints_client.create_endpoint( service_id=self.replica_service_id, region=CONF.identity.region, interface='public', url=replica_url) self.replica_endpoint_id = resp['endpoint']['id'] def _cleanup_replica(self): self.endpoints_client.delete_endpoint(self.replica_endpoint_id) self.services_client.delete_service(self.replica_service_id) def start_replica(self, port_num): self._prepare_replica(port_num) f = tempfile.NamedTemporaryFile(mode='w', suffix='.conf', prefix='congress%d-' % port_num, dir='/tmp', delete=False) conf_file = f.name template = open('/etc/congress/congress.conf') conf = template.read() # Add 'bind_port' and 'datasource_sync_period' to conf file. index = conf.find('[DEFAULT]') + len('[DEFAULT]\n') conf = (conf[:index] + 'bind_port = %d\n' % port_num + conf[index:]) # set datasource sync period interval to 5 conf = conf.replace('datasource_sync_period = 30', 'datasource_sync_period = 5') sindex = conf.find('signing_dir') conf = conf[:sindex] + '#' + conf[sindex:] conf = conf + '\n[dse]\nbus_id = replica-node\n' LOG.debug("Configuration file for replica: %s\n", conf) f.write(conf) f.close() # start all services on replica node api = self.start_service('api', conf_file) pe = self.start_service('policy-engine', conf_file) data = self.start_service('datasources', conf_file) assert port_num not in self.replicas LOG.debug("successfully started replica services\n") self.replicas[port_num] = ({'API': api, 'PE': pe, 'DS': data}, conf_file) def start_service(self, name, conf_file): service = '--' + name node = name + '-replica-node' args = ['bin/congress-server', service, '--node-id', node, '--config-file', conf_file] p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, cwd=helper.root_path()) return p def stop_replica(self, port_num): procs, conf_file = self.replicas[port_num] # Using proc.terminate() will block at proc.wait(), no idea why yet # kill all processes for p in procs.values(): p.kill() p.wait() os.unlink(conf_file) self.replicas[port_num] = (None, conf_file) self._cleanup_replica() def create_client(self, client_type): creds = credentials.get_configured_admin_credentials('identity_admin') auth_prov = tempestmanager.get_auth_provider(creds) return policy_client.PolicyClient( auth_prov, client_type, CONF.identity.region) def _check_replica_server_status(self, client): try: LOG.debug("Check replica server status") client.list_policy() LOG.debug("replica server ready") return True except exceptions.Unauthorized: LOG.debug("connection refused") return False except (socket.error, urllib3_exceptions.MaxRetryError): LOG.debug("Replica server not ready") return False except Exception: raise return False def find_fake(self, client): datasources = client.list_datasources() for r in datasources['results']: if r['name'] == 'fake': LOG.debug('existing fake driver: %s', str(r['id'])) return r['id'] return None def _check_resource_exists(self, client, resource): try: body = None if resource == 'datasource': LOG.debug("Check datasource exists") body = self.client.list_datasource_status('fake') else: LOG.debug("Check policy exists") body = self.client.list_policy_status('fake') LOG.debug("resource status: %s", str(body)) except exceptions.NotFound: LOG.debug("resource 'fake' not found") return False return True def _check_resource_missing(self, client, resource): return not self._check_resource_exists(client, resource) def create_fake(self, client): # Create fake datasource if it does not exist. Returns the # fake datasource id. fake_id = self.find_fake(client) if fake_id: return fake_id item = {'id': None, 'name': 'fake', 'driver': 'fake_datasource', 'config': {"username": "fakeu", "tenant_name": "faket", "password": "fakep", "auth_url": "http://127.0.0.1:5000/v2"}, 'description': 'bar', 'enabled': True} ret = client.create_datasource(item) LOG.debug('created fake driver: %s', str(ret['id'])) return ret['id'] @decorators.attr(type='smoke') def test_datasource_db_sync_add_remove(self): # Verify that a replica adds a datasource when a datasource # appears in the database. replica_server = False try: # Check fake if exists. else create fake_id = self.create_fake(self.client) # Start replica self.start_replica(CONF.congressha.replica_port) replica_client = self.create_client(CONF.congressha.replica_type) # Check replica server status if not test_utils.call_until_true( func=lambda: self._check_replica_server_status( replica_client), duration=60, sleep_for=1): for port_num in self.replicas: procs = self.replicas[port_num][0] for service_key in procs: output, error = procs[service_key].communicate() LOG.debug("Replica port %s service %s logs: %s", port_num, service_key, six.StringIO(output.decode()).getvalue()) raise exceptions.TimeoutException("Replica Server not ready") # Relica server is up replica_server = True # primary server might sync later than replica server due to # diff in datasource sync interval(P-30, replica-5). So checking # replica first # Verify that replica server synced fake dataservice and policy if not test_utils.call_until_true( func=lambda: self._check_resource_exists( replica_client, 'datasource'), duration=60, sleep_for=1): raise exceptions.TimeoutException( "replica doesn't have fake dataservice, data sync failed") if not test_utils.call_until_true( func=lambda: self._check_resource_exists( replica_client, 'policy'), duration=60, sleep_for=1): raise exceptions.TimeoutException( "replica doesn't have fake policy, policy sync failed") # Verify that primary server synced fake dataservice and policy if not test_utils.call_until_true( func=lambda: self._check_resource_exists( self.client, 'datasource'), duration=90, sleep_for=1): raise exceptions.TimeoutException( "primary doesn't have fake dataservice, data sync failed") if not test_utils.call_until_true( func=lambda: self._check_resource_exists( self.client, 'policy'), duration=90, sleep_for=1): raise exceptions.TimeoutException( "primary doesn't have fake policy, policy sync failed") # Remove fake from primary server instance. LOG.debug("removing fake datasource %s", str(fake_id)) self.client.delete_datasource(fake_id) # Verify that replica server has no fake datasource and fake policy if not test_utils.call_until_true( func=lambda: self._check_resource_missing( replica_client, 'datasource'), duration=60, sleep_for=1): raise exceptions.TimeoutException( "replica still has fake dataservice, sync failed") if not test_utils.call_until_true( func=lambda: self._check_resource_missing( replica_client, 'policy'), duration=60, sleep_for=1): raise exceptions.TimeoutException( "replica still fake policy, policy synchronizer failed") LOG.debug("removed fake datasource from replica instance") # Verify that primary server has no fake datasource and fake policy if not test_utils.call_until_true( func=lambda: self._check_resource_missing( self.client, 'datasource'), duration=90, sleep_for=1): raise exceptions.TimeoutException( "primary still has fake dataservice, sync failed") if not test_utils.call_until_true( func=lambda: self._check_resource_missing( self.client, 'policy'), duration=90, sleep_for=1): raise exceptions.TimeoutException( "primary still fake policy, policy synchronizer failed") LOG.debug("removed fake datasource from primary instance") finally: if replica_server: self.stop_replica(CONF.congressha.replica_port)