Merge "Push orchestrator config into the appliance"

This commit is contained in:
Jenkins 2016-01-25 22:38:16 +00:00 committed by Gerrit Code Review
commit 652c7e03ef
12 changed files with 93 additions and 41 deletions

View File

@ -45,16 +45,17 @@ SERVICE_DHCP = 'dhcp'
SERVICE_RA = 'ra'
def build_config(client, router, management_port, interfaces):
def build_config(worker_context, router, management_port, interfaces):
provider_rules = load_provider_rules(cfg.CONF.provider_rules_path)
networks = generate_network_config(
client,
worker_context.neutron,
router,
management_port,
interfaces
)
gateway = get_default_v4_gateway(client, router, networks)
gateway = get_default_v4_gateway(
worker_context.neutron, router, networks)
return {
'asn': cfg.CONF.asn,
@ -64,7 +65,8 @@ def build_config(client, router, management_port, interfaces):
'labels': provider_rules.get('labels', {}),
'floating_ips': generate_floating_config(router),
'tenant_id': router.tenant_id,
'hostname': 'ak-%s' % router.tenant_id
'hostname': 'ak-%s' % router.tenant_id,
'orchestrator': worker_context.config,
}

View File

@ -143,7 +143,7 @@ class Router(BaseDriver):
"""
self._ensure_cache(worker_context)
return configuration.build_config(
worker_context.neutron,
worker_context,
self._router,
mgt_port,
iface_map

View File

@ -402,6 +402,7 @@ class InstanceManager(object):
mgt_port,
iface_map
)
self.log.debug('preparing to update config to %r', config)
for i in six.moves.range(attempts):

View File

@ -185,7 +185,8 @@ def main(argv=sys.argv[1:]):
# run.
worker_factory = functools.partial(
worker.Worker,
notifier=publisher
notifier=publisher,
management_address=mgt_ip_address,
)
# Set up the scheduler that knows how to manage the routers and

View File

@ -14,7 +14,6 @@
# License for the specific language governing permissions and limitations
# under the License.
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright 2012 New Dream Network, LLC (DreamHost)
@ -62,15 +61,15 @@ from astara.common.i18n import _, _LE, _LI, _LW
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
RUG_META_PORT = 9697
METADATA_OPTS = [
cfg.StrOpt('nova_metadata_ip', default='127.0.0.1',
help="IP address used by Nova metadata server."),
cfg.IntOpt('nova_metadata_port',
default=8775,
help="TCP Port used by Nova metadata server."),
cfg.IntOpt('astara_metadata_port',
default=9697,
help="TCP listening port used by Astara metadata proxy."),
cfg.StrOpt('neutron_metadata_proxy_shared_secret',
default='',
help='Shared secret to sign instance-id request',
@ -180,7 +179,7 @@ class MetadataProxy(object):
"""
self.pool = eventlet.GreenPool(1000)
def run(self, ip_address, port=RUG_META_PORT):
def run(self, ip_address, port=cfg.CONF.astara_metadata_port):
"""Run the MetadataProxy.
:param ip_address: the ip address to bind to for incoming requests

View File

@ -40,8 +40,17 @@ class TestAstaraClient(unittest.TestCase):
'generate_floating_config': mock.DEFAULT,
'get_default_v4_gateway': mock.DEFAULT,
}
fake_orchestrator = {
'host': 'foohost',
'adddress': '10.0.0.1',
'metadata_port': 80,
}
mock_client = mock.Mock()
mock_context = mock.Mock(
neutron=mock_client,
config=fake_orchestrator,
)
ifaces = []
provider_rules = {'labels': {'ext': ['192.168.1.1']}}
network_config = [
@ -72,7 +81,7 @@ class TestAstaraClient(unittest.TestCase):
mocks['generate_floating_config'].return_value = 'floating_config'
mocks['get_default_v4_gateway'].return_value = 'default_gw'
config = conf_mod.build_config(mock_client, fakes.fake_router,
config = conf_mod.build_config(mock_context, fakes.fake_router,
fakes.fake_mgt_port, ifaces)
expected = {
@ -83,7 +92,12 @@ class TestAstaraClient(unittest.TestCase):
'asn': 64512,
'neighbor_asn': 64512,
'tenant_id': 'tenant_id',
'hostname': 'ak-tenant_id'
'hostname': 'ak-tenant_id',
'orchestrator': {
'host': 'foohost',
'adddress': '10.0.0.1',
'metadata_port': 80,
}
}
self.assertEqual(config, expected)

View File

@ -105,7 +105,7 @@ class RouterDriverTest(base.RugTestBase):
self.assertTrue(mock_ensure_cache.called)
mock_build_config.return_value = 'fake_config'
mock_build_config.assert_called_with(
self.ctx.neutron, rtr._router, fake_mgt_port, fake_iface_map)
self.ctx, rtr._router, fake_mgt_port, fake_iface_map)
self.assertEqual(res, 'fake_config')
@mock.patch('astara.api.astara_client.update_config')

View File

@ -18,6 +18,8 @@ from astara.drivers import base
from astara.api import neutron, nova
from astara import worker
FAKE_MGT_ADDR = '10.10.1.13'
def fake_loadbalancer():
lb_dict = {
@ -170,4 +172,4 @@ def fake_worker_context():
nova, 'Nova', autospec=True).start()
mock.patch.object(
nova, 'Nova', return_value=fake_nova_obj).start()
return worker.WorkerContext()
return worker.WorkerContext(FAKE_MGT_ADDR)

View File

@ -69,7 +69,8 @@ class TestInstanceManager(base.RugTestBase):
super(TestInstanceManager, self).setUp()
self.conf = cfg.CONF
self.fake_driver = fakes.fake_driver()
self.ctx = mock.Mock()
self.ctx = fakes.fake_worker_context()
self.neutron = self.ctx.neutron
self.config(boot_timeout=30)
self.config(astara_mgt_service_port=5000)
@ -103,8 +104,6 @@ class TestInstanceManager(base.RugTestBase):
self.ctx.nova_client.get_instance_info.return_value = (
self.INSTANCE_INFO)
self.ctx.nova_client.get_instance_info_for_obj.return_value = (
self.INSTANCE_INFO)
self.ctx.neutron.get_ports_for_instance.return_value = (
fake_mgt_port, [fake_int_port, fake_ext_port])
@ -145,8 +144,6 @@ class TestInstanceManager(base.RugTestBase):
def test_update_state_instance_no_ports_still_booting(self):
self.update_state_p.stop()
self.ctx.nova_client.get_instance_info_for_obj.return_value = \
self.INSTANCE_INFO
self.ctx.neutron.get_ports_for_instance.return_value = (None, [])
self.assertEqual(self.instance_mgr.update_state(self.ctx),
@ -193,6 +190,7 @@ class TestInstanceManager(base.RugTestBase):
state='up',
)
self.fake_driver.synchronize_state.reset_mock()
self.fake_driver.build_config.return_value = {}
# Configure the router and make sure state is synchronized as ACTIVE
with mock.patch.object(self.instance_mgr,
@ -373,7 +371,6 @@ class TestInstanceManager(base.RugTestBase):
rtr = mock.sentinel.router
instance = mock.sentinel.instance
self.ctx.neutron.get_router_detail.return_value = rtr
self.ctx.nova_client.get_instance.return_value = instance
self.ctx.nova_client.boot_instance.side_effect = RuntimeError
rtr.id = 'ROUTER1'
instance.id = 'INSTANCE1'
@ -480,7 +477,6 @@ class TestInstanceManager(base.RugTestBase):
time.side_effect = side_effects
self.config(boot_timeout=30)
self.instance_mgr.state = states.UP
self.ctx.nova_client.get_router_instance_status.return_value = 'UP'
self.instance_mgr.stop(self.ctx)
self.assertEqual(self.instance_mgr.state, states.UP)
self.ctx.nova_client.destroy_instance.assert_called_once_with(
@ -517,7 +513,10 @@ class TestInstanceManager(base.RugTestBase):
self.assertEqual(self.instance_mgr.state, states.DOWN)
def test_configure_success(self):
self.fake_driver.build_config.return_value = 'fake_config'
fake_config_dict = {'fake_config': 'foo'}
self.fake_driver.build_config.return_value = dict(fake_config_dict)
self.config(astara_metadata_port=4321)
self.config(host='foobarhost')
with mock.patch.object(self.instance_mgr,
'_verify_interfaces') as verify:
verify.return_value = True
@ -531,9 +530,9 @@ class TestInstanceManager(base.RugTestBase):
self.ctx,
self.INSTANCE_INFO.management_port,
{'ext-net': 'ge1', 'int-net': 'ge2', 'mgt-net': 'ge0'})
self.fake_driver.update_config.assert_called_once_with(
self.INSTANCE_INFO.management_address, 'fake_config',
)
self.fake_driver.update_config.assert_called_with(
self.INSTANCE_INFO.management_address, fake_config_dict)
self.assertEqual(self.instance_mgr.state,
states.CONFIGURED)
@ -552,9 +551,10 @@ class TestInstanceManager(base.RugTestBase):
@mock.patch('time.sleep')
def test_configure_failure(self, sleep):
fake_config_dict = {'fake_config': 'foo'}
self.fake_driver.update_config.side_effect = Exception
self.fake_driver.build_config.return_value = 'fake_config'
self.fake_driver.build_config.return_value = fake_config_dict
with mock.patch.object(self.instance_mgr,
'_verify_interfaces') as verify:
@ -567,7 +567,7 @@ class TestInstanceManager(base.RugTestBase):
expected_calls = [
mock.call(self.INSTANCE_INFO.management_address,
'fake_config')
fake_config_dict)
for i in range(0, 2)]
self.fake_driver.update_config.assert_has_calls(expected_calls)
self.assertEqual(self.instance_mgr.state, states.RESTART)

View File

@ -30,6 +30,7 @@ from astara import worker
from astara.common.hash_ring import DC_KEY
from astara.test.unit import fakes
from astara.test.unit.db import base
@ -59,7 +60,7 @@ class WorkerTestBase(base.DbTestCase):
self.fake_neutron = mock.patch.object(
neutron, 'Neutron', return_value=fake_neutron_obj).start()
self.w = worker.Worker(mock.Mock())
self.w = worker.Worker(mock.Mock(), fakes.FAKE_MGT_ADDR)
self.addCleanup(mock.patch.stopall)
self.target = self.tenant_id
@ -355,12 +356,24 @@ class TestWorker(WorkerTestBase):
# just ensure we dont raise
self.w._release_resource_lock(fake_sm)
def test_worker_context_config(self):
self.config(astara_metadata_port=1234)
self.config(host='foohost')
ctxt = worker.WorkerContext(fakes.FAKE_MGT_ADDR)
self.assertEqual(
ctxt.config,
{
'host': 'foohost',
'metadata_port': 1234,
'address': fakes.FAKE_MGT_ADDR,
})
class TestResourceCache(WorkerTestBase):
def setUp(self):
super(TestResourceCache, self).setUp()
self.resource_cache = worker.TenantResourceCache()
self.worker_context = worker.WorkerContext()
self.worker_context = worker.WorkerContext(fakes.FAKE_MGT_ADDR)
def test_resource_cache_hit(self):
self.resource_cache._tenant_resources = {
@ -454,7 +467,8 @@ class TestCreatingResource(WorkerTestBase):
def test_message_enqueued(self):
self.w.handle_message(self.tenant_id, self.msg)
trm = self.w.tenant_managers[self.tenant_id]
sm = trm.get_state_machines(self.msg, worker.WorkerContext())[0]
sm = trm.get_state_machines(self.msg, worker.WorkerContext(
fakes.FAKE_MGT_ADDR))[0]
self.assertEqual(len(sm._queue), 1)
@ -518,7 +532,7 @@ class TestShutdown(WorkerTestBase):
@mock.patch('kombu.Producer')
def test_stop_threads_notifier(self, producer, exchange, broker):
notifier = notifications.Publisher('topic')
w = worker.Worker(notifier)
w = worker.Worker(notifier, fakes.FAKE_MGT_ADDR)
self.assertTrue(notifier)
w._shutdown()
self.assertFalse(w.notifier._t)
@ -527,7 +541,7 @@ class TestShutdown(WorkerTestBase):
class TestUpdateStateMachine(WorkerTestBase):
def setUp(self):
super(TestUpdateStateMachine, self).setUp()
self.worker_context = worker.WorkerContext()
self.worker_context = worker.WorkerContext(fakes.FAKE_MGT_ADDR)
self.w._should_process_message = mock.MagicMock(return_value=self.msg)
def _test(self, fake_hash, negative=False):
@ -666,7 +680,8 @@ class TestDebugRouters(WorkerTestBase):
# Create the router manager and state machine so we can
# replace the send_message() method with a mock.
trm = self.w._get_trms(tenant_id)[0]
sm = trm.get_state_machines(msg, worker.WorkerContext())[0]
sm = trm.get_state_machines(msg, worker.WorkerContext(
fakes.FAKE_MGT_ADDR))[0]
with mock.patch.object(sm, 'send_message') as meth:
# The router id is being ignored, so the send_message()
# method shouldn't ever be invoked.
@ -718,7 +733,8 @@ class TestDebugTenants(WorkerTestBase):
# Create the router manager and state machine so we can
# replace the send_message() method with a mock.
trm = self.w._get_trms(tenant_id)[0]
sm = trm.get_state_machines(msg, worker.WorkerContext())[0]
sm = trm.get_state_machines(msg, worker.WorkerContext(
fakes.FAKE_MGT_ADDR))[0]
with mock.patch.object(sm, 'send_message') as meth:
# The tenant id is being ignored, so the send_message()
# method shouldn't ever be invoked.
@ -772,7 +788,8 @@ class TestGlobalDebug(WorkerTestBase):
# Create the router manager and state machine so we can
# replace the send_message() method with a mock.
trm = self.w._get_trms(tenant_id)[0]
sm = trm.get_state_machines(msg, worker.WorkerContext())[0]
sm = trm.get_state_machines(msg, worker.WorkerContext(
fakes.FAKE_MGT_ADDR))[0]
with mock.patch.object(sm, 'send_message') as meth:
# The tenant id is being ignored, so the send_message()
# method shouldn't ever be invoked.
@ -795,7 +812,8 @@ class TestRebalance(WorkerTestBase):
body={'key': 'value'},
)
trm = self.w._get_trms(tenant_id)[0]
sm = trm.get_state_machines(msg, worker.WorkerContext())[0]
sm = trm.get_state_machines(msg, worker.WorkerContext(
fakes.FAKE_MGT_ADDR))[0]
self.w.hash_ring_mgr.rebalance(['foo'])
self.assertEqual(self.w.hash_ring_mgr.hosts, set(['foo']))

View File

@ -126,9 +126,18 @@ class WorkerContext(object):
"""Holds resources owned by the worker and used by the Automaton.
"""
def __init__(self):
def __init__(self, management_address):
self.neutron = neutron.Neutron(cfg.CONF)
self.nova_client = nova.Nova(cfg.CONF)
self.management_address = management_address
@property
def config(self):
return {
'address': self.management_address,
'metadata_port': cfg.CONF.astara_metadata_port,
'host': cfg.CONF.host,
}
class Worker(object):
@ -138,7 +147,7 @@ class Worker(object):
track of a bunch of the state machines, so the callable is a
method of an instance of this class instead of a simple function.
"""
def __init__(self, notifier):
def __init__(self, notifier, management_address):
self._ignore_directory = cfg.CONF.ignored_router_directory
self._queue_warning_threshold = cfg.CONF.queue_warning_threshold
self._reboot_error_threshold = cfg.CONF.reboot_error_threshold
@ -147,11 +156,12 @@ class Worker(object):
self.lock = threading.Lock()
self._keep_going = True
self.tenant_managers = {}
self.management_address = management_address
self.resource_cache = TenantResourceCache()
# This process-global context should not be used in the
# threads, since the clients are not thread-safe.
self._context = WorkerContext()
self._context = WorkerContext(self.management_address)
self.notifier = notifier
# The notifier needs to be started here to ensure that it
# happens inside the worker process and not the parent.
@ -191,7 +201,7 @@ class Worker(object):
# messages and talking to the tenant router manager because we
# are in a different thread and the clients are not
# thread-safe.
context = WorkerContext()
context = WorkerContext(self.management_address)
while self._keep_going:
try:
# Try to get a state machine from the work queue. If

View File

@ -0,0 +1,5 @@
---
features:
- >
The orchestrator now pushes local orchestrator-specific configuration into the appliance, allowing
services like the metadata proxy to be configured specifically for current cluster layout.