Launch Congress with distributed_architecture flag is true

Check codes for distributed_arch flag is written on global scope in
each file. The flag is loaded at python module loading time. The file
doesn't use a value specified in congress.conf since initialization
steps of config values are done after loading config file.

This patch enables Congress to use distributed_architecture flag
with True in the devstack tests and in local tests.

Partial-Bug: #1541019

Change-Id: Ia276e9606ddc45b5c94c8495bdbd34a58a9a8769
This commit is contained in:
Masahito Muroi 2016-02-16 15:47:09 +00:00
parent a72d4d03dd
commit 8adcb0a952
8 changed files with 111 additions and 61 deletions

View File

@ -24,7 +24,6 @@ from oslo_log import log as logging
from oslo_middleware import cors
from oslo_policy import opts as policy_opts
from congress.managers import datasource as datasource_mgr
from congress import version
LOG = logging.getLogger(__name__)
@ -47,8 +46,12 @@ core_opts = [
cfg.StrOpt('policy_path',
help="The path to the latest policy dump"),
cfg.StrOpt('datasource_file',
deprecated_for_removal=True,
help="The file containing datasource configuration"),
cfg.StrOpt('root_path',
deprecated_for_removal=True,
deprecated_reason='automatically calculated its path in '
'initializing steps.',
help="The absolute path to the congress repo"),
cfg.IntOpt('api_workers', default=1,
help='The number of worker processes to serve the congress '
@ -91,7 +94,6 @@ def init(args, **kwargs):
cfg.CONF(args=args, project='congress',
version='%%(prog)s %s' % version.version_info.release_string(),
**kwargs)
datasource_mgr.DataSourceManager.validate_configured_drivers()
def setup_logging():

View File

@ -29,9 +29,14 @@ import sys
import eventlet
import eventlet.wsgi
import greenlet
from oslo_config import cfg
from oslo_log import log as logging
from oslo_service import service
from paste import deploy
from congress.dse2 import dse_node
from congress import exception
from congress.tests import helper
LOG = logging.getLogger(__name__)
@ -59,12 +64,17 @@ class EventletFilteringLogger(object):
self.logger.log(self.level, msg.rstrip())
class Server(service.ServiceBase):
"""Server class to manage multiple WSGI sockets and applications."""
class APIServer(service.ServiceBase):
"""Server class to Data Service Node with API services.
def __init__(self, application, host=None, port=None, threads=1000,
This server has All API services in itself.
"""
def __init__(self, app_conf, name, host=None, port=None, threads=1000,
keepalive=False, keepidle=None):
self.application = application
self.app_conf = app_conf
self.name = name
self.application = None
self.host = host or '0.0.0.0'
self.port = port or 0
self.pool = eventlet.GreenPool(threads)
@ -75,6 +85,13 @@ class Server(service.ServiceBase):
self.keepalive = keepalive
self.keepidle = keepidle
self.socket = None
self.node = None
if cfg.CONF.distributed_architecture:
messaging_config = helper.generate_messaging_config()
messaging_config.rpc_response_timeout = 10
self.node = dse_node.DseNode(messaging_config, self.name, [])
def start(self, key=None, backlog=128):
"""Run a WSGI server with the given application."""
@ -82,6 +99,14 @@ class Server(service.ServiceBase):
if self.socket is None:
self.listen(key=key, backlog=backlog)
try:
kwargs = {'global_conf': {'node_obj': [self.node]}}
self.application = deploy.loadapp('config:%s' % self.app_conf,
name='congress', **kwargs)
except Exception:
raise exception.CongressException(
'Failed to Start initializing %s server' % self.node.node_id)
self.greenthread = self.pool.spawn(self._run,
self.application,
self.socket)
@ -152,6 +177,8 @@ class Server(service.ServiceBase):
def stop(self):
self.kill()
if cfg.CONF.distributed_architecture:
self.node.stop()
def reset(self):
LOG.info("reset() not implemented yet")

View File

@ -195,6 +195,9 @@ class DseNode(object):
self._running = True
def stop(self):
if self._running is False:
return
LOG.info("Stopping DSE node '%s'" % self.node_id)
for s in self._services:
s.stop()

View File

@ -68,13 +68,13 @@ def create(rootdir, config_override=None):
cage = d6cage.d6Cage()
# read in datasource configurations
cage.config = config_override or {}
# path to congress source dir
src_path = os.path.join(rootdir, "congress")
datasource_mgr = datasource_manager.DataSourceManager()
datasource_mgr.validate_configured_drivers()
# add policy engine
engine_path = os.path.join(src_path, "policy_engines/agnostic.py")
@ -269,19 +269,12 @@ def create(rootdir, config_override=None):
return cage
def create2(config_override=None, node=None):
"""Get Congress up and running when src is installed in rootdir.
i.e. ROOTDIR=/path/to/congress/congress.
CONFIG_OVERRIDE is a dictionary of dictionaries with configuration
values that overrides those provided in CONFIG_FILE. The top-level
dictionary has keys for the CONFIG_FILE sections, and the second-level
dictionaries store values for that section.
def create2(node=None):
"""Get Congress up.
:param node is a DseNode
"""
LOG.debug("Starting Congress with config_override=%s",
config_override)
LOG.debug("Starting Congress")
# create message bus and attach services
if node:
@ -298,7 +291,6 @@ def create2(config_override=None, node=None):
services['datasources'] = create_datasources(
bus, services[ENGINE_SERVICE_NAME])
bus.config = config_override or {}
bus.register_service(services[ENGINE_SERVICE_NAME])
for ds in services['datasources']:
bus.register_service(ds)

View File

@ -26,9 +26,16 @@ eventlet.monkey_patch()
from oslo_config import cfg
from oslo_log import log as logging
from oslo_service import service
from paste import deploy
from congress.common import config
# FIXME It has to initialize distributed_architecture flag basing on the
# config file before the python interpreter imports python file which has
# if-statement for deepsix. Since the default value of the flag is False
# in current implementation, so it will import dse.deepsix as deepsix
# even if you set it to True in congress.conf.
# After changing the default to True, remove following one line and unncoment
# "Initialize config here!!"
config.init(sys.argv[1:])
from congress.common import eventlet_server
LOG = logging.getLogger(__name__)
@ -51,10 +58,9 @@ class ServerWrapper(object):
launcher.launch_service(self.server)
def create_api_server(conf, name, host, port, workers):
app = deploy.loadapp('config:%s' % conf, name=name)
congress_api_server = eventlet_server.Server(
app, host=host, port=port,
def create_api_server(conf_path, name, host, port, workers):
congress_api_server = eventlet_server.APIServer(
conf_path, name, host=host, port=port,
keepalive=cfg.CONF.tcp_keepalive,
keepidle=cfg.CONF.tcp_keepidle)
@ -81,13 +87,7 @@ def serve(*servers):
LOG.info("Congress server stopped by interrupt.")
def main():
config.init(sys.argv[1:])
if not cfg.CONF.config_file:
sys.exit("ERROR: Unable to find configuration file via default "
"search paths ~/.congress/, ~/, /etc/congress/, /etc/) and "
"the '--config-file' option!")
config.setup_logging()
def launch_api_server():
LOG.info("Starting congress server on port %d", cfg.CONF.bind_port)
# API resource runtime encapsulation:
@ -97,10 +97,24 @@ def main():
config.set_config_defaults()
servers = []
servers.append(create_api_server(paste_config,
"congress",
cfg.CONF.dse.node_id,
cfg.CONF.bind_host,
cfg.CONF.bind_port,
cfg.CONF.api_workers))
return servers
def main():
# Initialize config here!! after completing to migrate the new architecture
# config.init(args)
if not cfg.CONF.config_file:
sys.exit("ERROR: Unable to find configuration file via default "
"search paths ~/.congress/, ~/, /etc/congress/, /etc/) and "
"the '--config-file' option!")
config.setup_logging()
servers = launch_api_server()
serve(*servers)

View File

@ -15,7 +15,6 @@ from __future__ import division
from __future__ import absolute_import
import functools
import os
import sys
from oslo_config import cfg
@ -24,6 +23,7 @@ from oslo_log import log as logging
from congress.api import application
from congress.api import router
from congress import harness
from congress import utils
LOG = logging.getLogger(__name__)
@ -43,22 +43,19 @@ def fail_gracefully(f):
@fail_gracefully
def congress_app_factory(global_conf, **local_conf):
root_path = cfg.CONF.root_path
if root_path is None:
root_path = os.path.dirname(__file__) # drop filename
root_path = os.path.dirname(root_path) # drop to congress src dir
data_path = cfg.CONF.datasource_file
if data_path is None:
data_path = os.path.join(root_path, 'etc', 'datasources.conf')
# After changing a distriubted architecture following logic will be
# replated with new API model creation method. If All API models can
# be generated without any argument, we don't need to make dict here
# and API process instantiate all API model in APIRouterV1().
if getattr(cfg.CONF, "distributed_architecture", False):
services = harness.create2(root_path, data_path)
# global_conf only accepts an iteratable value as a its dict value
dse_node = global_conf['node_obj'][0]
services = harness.create2(node=dse_node)
return application.ApiApplication(services['api_service'])
else:
if cfg.CONF.root_path:
root_path = cfg.CONF.root_path
else:
root_path = utils.get_root_path()
data_path = cfg.CONF.datasource_file
cage = harness.create(root_path, data_path)
api_process_dict = dict([[name, service_obj['object']]
for name, service_obj

View File

@ -26,54 +26,63 @@ import testtools
from congress.common import eventlet_server
class ServerTest(testtools.TestCase):
class APIServerTest(testtools.TestCase):
@mock.patch('paste.deploy.loadapp')
@mock.patch('eventlet.listen')
@mock.patch('socket.getaddrinfo')
def test_keepalive_unset(self, mock_getaddrinfo, mock_listen):
def test_keepalive_unset(self, mock_getaddrinfo, mock_listen, mock_app):
mock_getaddrinfo.return_value = [(1, 2, 3, 4, 5)]
mock_sock = mock.Mock()
mock_sock.setsockopt = mock.Mock()
mock_app.return_value = mock.MagicMock()
mock_listen.return_value = mock_sock
server = eventlet_server.Server(mock.MagicMock(),
host=cfg.CONF.bind_host,
port=cfg.CONF.bind_port)
server = eventlet_server.APIServer('/path/to/paste', 'api-server',
host=cfg.CONF.bind_host,
port=cfg.CONF.bind_port)
server.start()
self.assertTrue(mock_listen.called)
self.assertTrue(mock_app.called)
self.assertFalse(mock_sock.setsockopt.called)
@mock.patch('paste.deploy.loadapp')
@mock.patch('eventlet.listen')
@mock.patch('socket.getaddrinfo')
def test_keepalive_set(self, mock_getaddrinfo, mock_listen):
def test_keepalive_set(self, mock_getaddrinfo, mock_listen, mock_app):
mock_getaddrinfo.return_value = [(1, 2, 3, 4, 5)]
mock_sock = mock.Mock()
mock_sock.setsockopt = mock.Mock()
mock_app.return_value = mock.MagicMock()
mock_listen.return_value = mock_sock
server = eventlet_server.Server(mock.MagicMock(),
host=cfg.CONF.bind_host,
port=cfg.CONF.bind_port,
keepalive=True)
server = eventlet_server.APIServer('/path/to/paste', 'api-server',
host=cfg.CONF.bind_host,
port=cfg.CONF.bind_port,
keepalive=True)
server.start()
mock_sock.setsockopt.assert_called_once_with(socket.SOL_SOCKET,
socket.SO_KEEPALIVE,
1)
self.assertTrue(mock_listen.called)
self.assertTrue(mock_app.called)
@mock.patch('paste.deploy.loadapp')
@mock.patch('eventlet.listen')
@mock.patch('socket.getaddrinfo')
def test_keepalive_and_keepidle_set(self, mock_getaddrinfo, mock_listen):
def test_keepalive_and_keepidle_set(self, mock_getaddrinfo, mock_listen,
mock_app):
mock_getaddrinfo.return_value = [(1, 2, 3, 4, 5)]
mock_sock = mock.Mock()
mock_sock.setsockopt = mock.Mock()
mock_app.return_value = mock.MagicMock()
mock_listen.return_value = mock_sock
server = eventlet_server.Server(mock.MagicMock(),
host=cfg.CONF.bind_host,
port=cfg.CONF.bind_port,
keepalive=True,
keepidle=1)
server = eventlet_server.APIServer('/path/to/paste', 'api-server',
host=cfg.CONF.bind_host,
port=cfg.CONF.bind_port,
keepalive=True,
keepidle=1)
server.start()
# keepidle isn't available in the OS X version of eventlet
@ -88,3 +97,4 @@ class ServerTest(testtools.TestCase):
self.assertEqual(mock_sock.setsockopt.call_count, 1)
self.assertTrue(mock_listen.called)
self.assertTrue(mock_app.called)

View File

@ -22,6 +22,7 @@ from __future__ import absolute_import
import contextlib
import json
import os
import shutil
import tempfile
@ -83,6 +84,10 @@ def create_datasource_policy(bus, datasource, engine):
bus.rpc(engine, 'initialize_datasource', args)
def get_root_path():
return os.path.dirname(os.path.dirname(__file__))
class Location (object):
"""A location in the program source code."""