Push orchestrator config into the appliance

This pushes a couple of flags into the appliance that are specific to the
individual orchestrator instance managing that appliance. Initially, we use
it to tell the appliance where the metadata proxy is listening.  Previously,
this was hard-coded to a known address on the network.  With multiple
orchestrators in a clustered env, this will allow each to run their own
metadata proxy and have only their managed appliances querying that.

Another patch will follow that will ensure this is up to date when rebalances
occur and orchestrators take over new appliances.

Change-Id: Ib502507b29f17146da81f61f34957cd96a1548f4
Partial-bug: #1524068
This commit is contained in:
Adam Gandelman 2015-12-09 16:48:15 -08:00
parent 3219e39c4d
commit 568ea90f5a
12 changed files with 93 additions and 41 deletions

View File

@ -45,16 +45,17 @@ SERVICE_DHCP = 'dhcp'
SERVICE_RA = 'ra'
def build_config(client, router, management_port, interfaces):
def build_config(worker_context, router, management_port, interfaces):
provider_rules = load_provider_rules(cfg.CONF.provider_rules_path)
networks = generate_network_config(
client,
worker_context.neutron,
router,
management_port,
interfaces
)
gateway = get_default_v4_gateway(client, router, networks)
gateway = get_default_v4_gateway(
worker_context.neutron, router, networks)
return {
'asn': cfg.CONF.asn,
@ -64,7 +65,8 @@ def build_config(client, router, management_port, interfaces):
'labels': provider_rules.get('labels', {}),
'floating_ips': generate_floating_config(router),
'tenant_id': router.tenant_id,
'hostname': 'ak-%s' % router.tenant_id
'hostname': 'ak-%s' % router.tenant_id,
'orchestrator': worker_context.config,
}

View File

@ -143,7 +143,7 @@ class Router(BaseDriver):
"""
self._ensure_cache(worker_context)
return configuration.build_config(
worker_context.neutron,
worker_context,
self._router,
mgt_port,
iface_map

View File

@ -402,6 +402,7 @@ class InstanceManager(object):
mgt_port,
iface_map
)
self.log.debug('preparing to update config to %r', config)
for i in six.moves.range(attempts):

View File

@ -185,7 +185,8 @@ def main(argv=sys.argv[1:]):
# run.
worker_factory = functools.partial(
worker.Worker,
notifier=publisher
notifier=publisher,
management_address=mgt_ip_address,
)
# Set up the scheduler that knows how to manage the routers and

View File

@ -14,7 +14,6 @@
# License for the specific language governing permissions and limitations
# under the License.
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright 2012 New Dream Network, LLC (DreamHost)
@ -62,15 +61,15 @@ from astara.common.i18n import _, _LE, _LI, _LW
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
RUG_META_PORT = 9697
METADATA_OPTS = [
cfg.StrOpt('nova_metadata_ip', default='127.0.0.1',
help="IP address used by Nova metadata server."),
cfg.IntOpt('nova_metadata_port',
default=8775,
help="TCP Port used by Nova metadata server."),
cfg.IntOpt('astara_metadata_port',
default=9697,
help="TCP listening port used by Astara metadata proxy."),
cfg.StrOpt('neutron_metadata_proxy_shared_secret',
default='',
help='Shared secret to sign instance-id request',
@ -180,7 +179,7 @@ class MetadataProxy(object):
"""
self.pool = eventlet.GreenPool(1000)
def run(self, ip_address, port=RUG_META_PORT):
def run(self, ip_address, port=cfg.CONF.astara_metadata_port):
"""Run the MetadataProxy.
:param ip_address: the ip address to bind to for incoming requests

View File

@ -40,8 +40,17 @@ class TestAstaraClient(unittest.TestCase):
'generate_floating_config': mock.DEFAULT,
'get_default_v4_gateway': mock.DEFAULT,
}
fake_orchestrator = {
'host': 'foohost',
'adddress': '10.0.0.1',
'metadata_port': 80,
}
mock_client = mock.Mock()
mock_context = mock.Mock(
neutron=mock_client,
config=fake_orchestrator,
)
ifaces = []
provider_rules = {'labels': {'ext': ['192.168.1.1']}}
network_config = [
@ -72,7 +81,7 @@ class TestAstaraClient(unittest.TestCase):
mocks['generate_floating_config'].return_value = 'floating_config'
mocks['get_default_v4_gateway'].return_value = 'default_gw'
config = conf_mod.build_config(mock_client, fakes.fake_router,
config = conf_mod.build_config(mock_context, fakes.fake_router,
fakes.fake_mgt_port, ifaces)
expected = {
@ -83,7 +92,12 @@ class TestAstaraClient(unittest.TestCase):
'asn': 64512,
'neighbor_asn': 64512,
'tenant_id': 'tenant_id',
'hostname': 'ak-tenant_id'
'hostname': 'ak-tenant_id',
'orchestrator': {
'host': 'foohost',
'adddress': '10.0.0.1',
'metadata_port': 80,
}
}
self.assertEqual(config, expected)

View File

@ -105,7 +105,7 @@ class RouterDriverTest(base.RugTestBase):
self.assertTrue(mock_ensure_cache.called)
mock_build_config.return_value = 'fake_config'
mock_build_config.assert_called_with(
self.ctx.neutron, rtr._router, fake_mgt_port, fake_iface_map)
self.ctx, rtr._router, fake_mgt_port, fake_iface_map)
self.assertEqual(res, 'fake_config')
@mock.patch('astara.api.astara_client.update_config')

View File

@ -18,6 +18,8 @@ from astara.drivers import base
from astara.api import neutron, nova
from astara import worker
FAKE_MGT_ADDR = '10.10.1.13'
def fake_loadbalancer():
lb_dict = {
@ -170,4 +172,4 @@ def fake_worker_context():
nova, 'Nova', autospec=True).start()
mock.patch.object(
nova, 'Nova', return_value=fake_nova_obj).start()
return worker.WorkerContext()
return worker.WorkerContext(FAKE_MGT_ADDR)

View File

@ -69,7 +69,8 @@ class TestInstanceManager(base.RugTestBase):
super(TestInstanceManager, self).setUp()
self.conf = cfg.CONF
self.fake_driver = fakes.fake_driver()
self.ctx = mock.Mock()
self.ctx = fakes.fake_worker_context()
self.neutron = self.ctx.neutron
self.config(boot_timeout=30)
self.config(astara_mgt_service_port=5000)
@ -103,8 +104,6 @@ class TestInstanceManager(base.RugTestBase):
self.ctx.nova_client.get_instance_info.return_value = (
self.INSTANCE_INFO)
self.ctx.nova_client.get_instance_info_for_obj.return_value = (
self.INSTANCE_INFO)
self.ctx.neutron.get_ports_for_instance.return_value = (
fake_mgt_port, [fake_int_port, fake_ext_port])
@ -145,8 +144,6 @@ class TestInstanceManager(base.RugTestBase):
def test_update_state_instance_no_ports_still_booting(self):
self.update_state_p.stop()
self.ctx.nova_client.get_instance_info_for_obj.return_value = \
self.INSTANCE_INFO
self.ctx.neutron.get_ports_for_instance.return_value = (None, [])
self.assertEqual(self.instance_mgr.update_state(self.ctx),
@ -193,6 +190,7 @@ class TestInstanceManager(base.RugTestBase):
state='up',
)
self.fake_driver.synchronize_state.reset_mock()
self.fake_driver.build_config.return_value = {}
# Configure the router and make sure state is synchronized as ACTIVE
with mock.patch.object(self.instance_mgr,
@ -373,7 +371,6 @@ class TestInstanceManager(base.RugTestBase):
rtr = mock.sentinel.router
instance = mock.sentinel.instance
self.ctx.neutron.get_router_detail.return_value = rtr
self.ctx.nova_client.get_instance.return_value = instance
self.ctx.nova_client.boot_instance.side_effect = RuntimeError
rtr.id = 'ROUTER1'
instance.id = 'INSTANCE1'
@ -480,7 +477,6 @@ class TestInstanceManager(base.RugTestBase):
time.side_effect = side_effects
self.config(boot_timeout=30)
self.instance_mgr.state = states.UP
self.ctx.nova_client.get_router_instance_status.return_value = 'UP'
self.instance_mgr.stop(self.ctx)
self.assertEqual(self.instance_mgr.state, states.UP)
self.ctx.nova_client.destroy_instance.assert_called_once_with(
@ -517,7 +513,10 @@ class TestInstanceManager(base.RugTestBase):
self.assertEqual(self.instance_mgr.state, states.DOWN)
def test_configure_success(self):
self.fake_driver.build_config.return_value = 'fake_config'
fake_config_dict = {'fake_config': 'foo'}
self.fake_driver.build_config.return_value = dict(fake_config_dict)
self.config(astara_metadata_port=4321)
self.config(host='foobarhost')
with mock.patch.object(self.instance_mgr,
'_verify_interfaces') as verify:
verify.return_value = True
@ -531,9 +530,9 @@ class TestInstanceManager(base.RugTestBase):
self.ctx,
self.INSTANCE_INFO.management_port,
{'ext-net': 'ge1', 'int-net': 'ge2', 'mgt-net': 'ge0'})
self.fake_driver.update_config.assert_called_once_with(
self.INSTANCE_INFO.management_address, 'fake_config',
)
self.fake_driver.update_config.assert_called_with(
self.INSTANCE_INFO.management_address, fake_config_dict)
self.assertEqual(self.instance_mgr.state,
states.CONFIGURED)
@ -552,9 +551,10 @@ class TestInstanceManager(base.RugTestBase):
@mock.patch('time.sleep')
def test_configure_failure(self, sleep):
fake_config_dict = {'fake_config': 'foo'}
self.fake_driver.update_config.side_effect = Exception
self.fake_driver.build_config.return_value = 'fake_config'
self.fake_driver.build_config.return_value = fake_config_dict
with mock.patch.object(self.instance_mgr,
'_verify_interfaces') as verify:
@ -567,7 +567,7 @@ class TestInstanceManager(base.RugTestBase):
expected_calls = [
mock.call(self.INSTANCE_INFO.management_address,
'fake_config')
fake_config_dict)
for i in range(0, 2)]
self.fake_driver.update_config.assert_has_calls(expected_calls)
self.assertEqual(self.instance_mgr.state, states.RESTART)

View File

@ -30,6 +30,7 @@ from astara import worker
from astara.common.hash_ring import DC_KEY
from astara.test.unit import fakes
from astara.test.unit.db import base
@ -59,7 +60,7 @@ class WorkerTestBase(base.DbTestCase):
self.fake_neutron = mock.patch.object(
neutron, 'Neutron', return_value=fake_neutron_obj).start()
self.w = worker.Worker(mock.Mock())
self.w = worker.Worker(mock.Mock(), fakes.FAKE_MGT_ADDR)
self.addCleanup(mock.patch.stopall)
self.target = self.tenant_id
@ -355,12 +356,24 @@ class TestWorker(WorkerTestBase):
# just ensure we dont raise
self.w._release_resource_lock(fake_sm)
def test_worker_context_config(self):
self.config(astara_metadata_port=1234)
self.config(host='foohost')
ctxt = worker.WorkerContext(fakes.FAKE_MGT_ADDR)
self.assertEqual(
ctxt.config,
{
'host': 'foohost',
'metadata_port': 1234,
'address': fakes.FAKE_MGT_ADDR,
})
class TestResourceCache(WorkerTestBase):
def setUp(self):
super(TestResourceCache, self).setUp()
self.resource_cache = worker.TenantResourceCache()
self.worker_context = worker.WorkerContext()
self.worker_context = worker.WorkerContext(fakes.FAKE_MGT_ADDR)
def test_resource_cache_hit(self):
self.resource_cache._tenant_resources = {
@ -431,7 +444,8 @@ class TestCreatingResource(WorkerTestBase):
def test_message_enqueued(self):
self.w.handle_message(self.tenant_id, self.msg)
trm = self.w.tenant_managers[self.tenant_id]
sm = trm.get_state_machines(self.msg, worker.WorkerContext())[0]
sm = trm.get_state_machines(self.msg, worker.WorkerContext(
fakes.FAKE_MGT_ADDR))[0]
self.assertEqual(len(sm._queue), 1)
@ -495,7 +509,7 @@ class TestShutdown(WorkerTestBase):
@mock.patch('kombu.Producer')
def test_stop_threads_notifier(self, producer, exchange, broker):
notifier = notifications.Publisher('topic')
w = worker.Worker(notifier)
w = worker.Worker(notifier, fakes.FAKE_MGT_ADDR)
self.assertTrue(notifier)
w._shutdown()
self.assertFalse(w.notifier._t)
@ -504,7 +518,7 @@ class TestShutdown(WorkerTestBase):
class TestUpdateStateMachine(WorkerTestBase):
def setUp(self):
super(TestUpdateStateMachine, self).setUp()
self.worker_context = worker.WorkerContext()
self.worker_context = worker.WorkerContext(fakes.FAKE_MGT_ADDR)
self.w._should_process_message = mock.MagicMock(return_value=self.msg)
def _test(self, fake_hash, negative=False):
@ -643,7 +657,8 @@ class TestDebugRouters(WorkerTestBase):
# Create the router manager and state machine so we can
# replace the send_message() method with a mock.
trm = self.w._get_trms(tenant_id)[0]
sm = trm.get_state_machines(msg, worker.WorkerContext())[0]
sm = trm.get_state_machines(msg, worker.WorkerContext(
fakes.FAKE_MGT_ADDR))[0]
with mock.patch.object(sm, 'send_message') as meth:
# The router id is being ignored, so the send_message()
# method shouldn't ever be invoked.
@ -695,7 +710,8 @@ class TestDebugTenants(WorkerTestBase):
# Create the router manager and state machine so we can
# replace the send_message() method with a mock.
trm = self.w._get_trms(tenant_id)[0]
sm = trm.get_state_machines(msg, worker.WorkerContext())[0]
sm = trm.get_state_machines(msg, worker.WorkerContext(
fakes.FAKE_MGT_ADDR))[0]
with mock.patch.object(sm, 'send_message') as meth:
# The tenant id is being ignored, so the send_message()
# method shouldn't ever be invoked.
@ -749,7 +765,8 @@ class TestGlobalDebug(WorkerTestBase):
# Create the router manager and state machine so we can
# replace the send_message() method with a mock.
trm = self.w._get_trms(tenant_id)[0]
sm = trm.get_state_machines(msg, worker.WorkerContext())[0]
sm = trm.get_state_machines(msg, worker.WorkerContext(
fakes.FAKE_MGT_ADDR))[0]
with mock.patch.object(sm, 'send_message') as meth:
# The tenant id is being ignored, so the send_message()
# method shouldn't ever be invoked.
@ -772,7 +789,8 @@ class TestRebalance(WorkerTestBase):
body={'key': 'value'},
)
trm = self.w._get_trms(tenant_id)[0]
sm = trm.get_state_machines(msg, worker.WorkerContext())[0]
sm = trm.get_state_machines(msg, worker.WorkerContext(
fakes.FAKE_MGT_ADDR))[0]
self.w.hash_ring_mgr.rebalance(['foo'])
self.assertEqual(self.w.hash_ring_mgr.hosts, set(['foo']))

View File

@ -116,9 +116,18 @@ class WorkerContext(object):
"""Holds resources owned by the worker and used by the Automaton.
"""
def __init__(self):
def __init__(self, management_address):
self.neutron = neutron.Neutron(cfg.CONF)
self.nova_client = nova.Nova(cfg.CONF)
self.management_address = management_address
@property
def config(self):
return {
'address': self.management_address,
'metadata_port': cfg.CONF.astara_metadata_port,
'host': cfg.CONF.host,
}
class Worker(object):
@ -128,7 +137,7 @@ class Worker(object):
track of a bunch of the state machines, so the callable is a
method of an instance of this class instead of a simple function.
"""
def __init__(self, notifier):
def __init__(self, notifier, management_address):
self._ignore_directory = cfg.CONF.ignored_router_directory
self._queue_warning_threshold = cfg.CONF.queue_warning_threshold
self._reboot_error_threshold = cfg.CONF.reboot_error_threshold
@ -137,11 +146,12 @@ class Worker(object):
self.lock = threading.Lock()
self._keep_going = True
self.tenant_managers = {}
self.management_address = management_address
self.resource_cache = TenantResourceCache()
# This process-global context should not be used in the
# threads, since the clients are not thread-safe.
self._context = WorkerContext()
self._context = WorkerContext(self.management_address)
self.notifier = notifier
# The notifier needs to be started here to ensure that it
# happens inside the worker process and not the parent.
@ -181,7 +191,7 @@ class Worker(object):
# messages and talking to the tenant router manager because we
# are in a different thread and the clients are not
# thread-safe.
context = WorkerContext()
context = WorkerContext(self.management_address)
while self._keep_going:
try:
# Try to get a state machine from the work queue. If

View File

@ -0,0 +1,5 @@
---
features:
- >
The orchestrator now pushes local orchestrator-specific configuration into the appliance, allowing
services like the metadata proxy to be configured specifically for current cluster layout.