Merge "Push orchestrator config into the appliance"
This commit is contained in:
commit
652c7e03ef
|
@ -45,16 +45,17 @@ SERVICE_DHCP = 'dhcp'
|
|||
SERVICE_RA = 'ra'
|
||||
|
||||
|
||||
def build_config(client, router, management_port, interfaces):
|
||||
def build_config(worker_context, router, management_port, interfaces):
|
||||
provider_rules = load_provider_rules(cfg.CONF.provider_rules_path)
|
||||
|
||||
networks = generate_network_config(
|
||||
client,
|
||||
worker_context.neutron,
|
||||
router,
|
||||
management_port,
|
||||
interfaces
|
||||
)
|
||||
gateway = get_default_v4_gateway(client, router, networks)
|
||||
gateway = get_default_v4_gateway(
|
||||
worker_context.neutron, router, networks)
|
||||
|
||||
return {
|
||||
'asn': cfg.CONF.asn,
|
||||
|
@ -64,7 +65,8 @@ def build_config(client, router, management_port, interfaces):
|
|||
'labels': provider_rules.get('labels', {}),
|
||||
'floating_ips': generate_floating_config(router),
|
||||
'tenant_id': router.tenant_id,
|
||||
'hostname': 'ak-%s' % router.tenant_id
|
||||
'hostname': 'ak-%s' % router.tenant_id,
|
||||
'orchestrator': worker_context.config,
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -143,7 +143,7 @@ class Router(BaseDriver):
|
|||
"""
|
||||
self._ensure_cache(worker_context)
|
||||
return configuration.build_config(
|
||||
worker_context.neutron,
|
||||
worker_context,
|
||||
self._router,
|
||||
mgt_port,
|
||||
iface_map
|
||||
|
|
|
@ -402,6 +402,7 @@ class InstanceManager(object):
|
|||
mgt_port,
|
||||
iface_map
|
||||
)
|
||||
|
||||
self.log.debug('preparing to update config to %r', config)
|
||||
|
||||
for i in six.moves.range(attempts):
|
||||
|
|
|
@ -185,7 +185,8 @@ def main(argv=sys.argv[1:]):
|
|||
# run.
|
||||
worker_factory = functools.partial(
|
||||
worker.Worker,
|
||||
notifier=publisher
|
||||
notifier=publisher,
|
||||
management_address=mgt_ip_address,
|
||||
)
|
||||
|
||||
# Set up the scheduler that knows how to manage the routers and
|
||||
|
|
|
@ -14,7 +14,6 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
#
|
||||
# Copyright 2012 New Dream Network, LLC (DreamHost)
|
||||
|
@ -62,15 +61,15 @@ from astara.common.i18n import _, _LE, _LI, _LW
|
|||
LOG = logging.getLogger(__name__)
|
||||
CONF = cfg.CONF
|
||||
|
||||
RUG_META_PORT = 9697
|
||||
|
||||
|
||||
METADATA_OPTS = [
|
||||
cfg.StrOpt('nova_metadata_ip', default='127.0.0.1',
|
||||
help="IP address used by Nova metadata server."),
|
||||
cfg.IntOpt('nova_metadata_port',
|
||||
default=8775,
|
||||
help="TCP Port used by Nova metadata server."),
|
||||
cfg.IntOpt('astara_metadata_port',
|
||||
default=9697,
|
||||
help="TCP listening port used by Astara metadata proxy."),
|
||||
cfg.StrOpt('neutron_metadata_proxy_shared_secret',
|
||||
default='',
|
||||
help='Shared secret to sign instance-id request',
|
||||
|
@ -180,7 +179,7 @@ class MetadataProxy(object):
|
|||
"""
|
||||
self.pool = eventlet.GreenPool(1000)
|
||||
|
||||
def run(self, ip_address, port=RUG_META_PORT):
|
||||
def run(self, ip_address, port=cfg.CONF.astara_metadata_port):
|
||||
"""Run the MetadataProxy.
|
||||
|
||||
:param ip_address: the ip address to bind to for incoming requests
|
||||
|
|
|
@ -40,8 +40,17 @@ class TestAstaraClient(unittest.TestCase):
|
|||
'generate_floating_config': mock.DEFAULT,
|
||||
'get_default_v4_gateway': mock.DEFAULT,
|
||||
}
|
||||
fake_orchestrator = {
|
||||
'host': 'foohost',
|
||||
'adddress': '10.0.0.1',
|
||||
'metadata_port': 80,
|
||||
}
|
||||
|
||||
mock_client = mock.Mock()
|
||||
mock_context = mock.Mock(
|
||||
neutron=mock_client,
|
||||
config=fake_orchestrator,
|
||||
)
|
||||
ifaces = []
|
||||
provider_rules = {'labels': {'ext': ['192.168.1.1']}}
|
||||
network_config = [
|
||||
|
@ -72,7 +81,7 @@ class TestAstaraClient(unittest.TestCase):
|
|||
mocks['generate_floating_config'].return_value = 'floating_config'
|
||||
mocks['get_default_v4_gateway'].return_value = 'default_gw'
|
||||
|
||||
config = conf_mod.build_config(mock_client, fakes.fake_router,
|
||||
config = conf_mod.build_config(mock_context, fakes.fake_router,
|
||||
fakes.fake_mgt_port, ifaces)
|
||||
|
||||
expected = {
|
||||
|
@ -83,7 +92,12 @@ class TestAstaraClient(unittest.TestCase):
|
|||
'asn': 64512,
|
||||
'neighbor_asn': 64512,
|
||||
'tenant_id': 'tenant_id',
|
||||
'hostname': 'ak-tenant_id'
|
||||
'hostname': 'ak-tenant_id',
|
||||
'orchestrator': {
|
||||
'host': 'foohost',
|
||||
'adddress': '10.0.0.1',
|
||||
'metadata_port': 80,
|
||||
}
|
||||
}
|
||||
|
||||
self.assertEqual(config, expected)
|
||||
|
|
|
@ -105,7 +105,7 @@ class RouterDriverTest(base.RugTestBase):
|
|||
self.assertTrue(mock_ensure_cache.called)
|
||||
mock_build_config.return_value = 'fake_config'
|
||||
mock_build_config.assert_called_with(
|
||||
self.ctx.neutron, rtr._router, fake_mgt_port, fake_iface_map)
|
||||
self.ctx, rtr._router, fake_mgt_port, fake_iface_map)
|
||||
self.assertEqual(res, 'fake_config')
|
||||
|
||||
@mock.patch('astara.api.astara_client.update_config')
|
||||
|
|
|
@ -18,6 +18,8 @@ from astara.drivers import base
|
|||
from astara.api import neutron, nova
|
||||
from astara import worker
|
||||
|
||||
FAKE_MGT_ADDR = '10.10.1.13'
|
||||
|
||||
|
||||
def fake_loadbalancer():
|
||||
lb_dict = {
|
||||
|
@ -170,4 +172,4 @@ def fake_worker_context():
|
|||
nova, 'Nova', autospec=True).start()
|
||||
mock.patch.object(
|
||||
nova, 'Nova', return_value=fake_nova_obj).start()
|
||||
return worker.WorkerContext()
|
||||
return worker.WorkerContext(FAKE_MGT_ADDR)
|
||||
|
|
|
@ -69,7 +69,8 @@ class TestInstanceManager(base.RugTestBase):
|
|||
super(TestInstanceManager, self).setUp()
|
||||
self.conf = cfg.CONF
|
||||
self.fake_driver = fakes.fake_driver()
|
||||
self.ctx = mock.Mock()
|
||||
self.ctx = fakes.fake_worker_context()
|
||||
|
||||
self.neutron = self.ctx.neutron
|
||||
self.config(boot_timeout=30)
|
||||
self.config(astara_mgt_service_port=5000)
|
||||
|
@ -103,8 +104,6 @@ class TestInstanceManager(base.RugTestBase):
|
|||
|
||||
self.ctx.nova_client.get_instance_info.return_value = (
|
||||
self.INSTANCE_INFO)
|
||||
self.ctx.nova_client.get_instance_info_for_obj.return_value = (
|
||||
self.INSTANCE_INFO)
|
||||
self.ctx.neutron.get_ports_for_instance.return_value = (
|
||||
fake_mgt_port, [fake_int_port, fake_ext_port])
|
||||
|
||||
|
@ -145,8 +144,6 @@ class TestInstanceManager(base.RugTestBase):
|
|||
|
||||
def test_update_state_instance_no_ports_still_booting(self):
|
||||
self.update_state_p.stop()
|
||||
self.ctx.nova_client.get_instance_info_for_obj.return_value = \
|
||||
self.INSTANCE_INFO
|
||||
self.ctx.neutron.get_ports_for_instance.return_value = (None, [])
|
||||
|
||||
self.assertEqual(self.instance_mgr.update_state(self.ctx),
|
||||
|
@ -193,6 +190,7 @@ class TestInstanceManager(base.RugTestBase):
|
|||
state='up',
|
||||
)
|
||||
self.fake_driver.synchronize_state.reset_mock()
|
||||
self.fake_driver.build_config.return_value = {}
|
||||
|
||||
# Configure the router and make sure state is synchronized as ACTIVE
|
||||
with mock.patch.object(self.instance_mgr,
|
||||
|
@ -373,7 +371,6 @@ class TestInstanceManager(base.RugTestBase):
|
|||
rtr = mock.sentinel.router
|
||||
instance = mock.sentinel.instance
|
||||
self.ctx.neutron.get_router_detail.return_value = rtr
|
||||
self.ctx.nova_client.get_instance.return_value = instance
|
||||
self.ctx.nova_client.boot_instance.side_effect = RuntimeError
|
||||
rtr.id = 'ROUTER1'
|
||||
instance.id = 'INSTANCE1'
|
||||
|
@ -480,7 +477,6 @@ class TestInstanceManager(base.RugTestBase):
|
|||
time.side_effect = side_effects
|
||||
self.config(boot_timeout=30)
|
||||
self.instance_mgr.state = states.UP
|
||||
self.ctx.nova_client.get_router_instance_status.return_value = 'UP'
|
||||
self.instance_mgr.stop(self.ctx)
|
||||
self.assertEqual(self.instance_mgr.state, states.UP)
|
||||
self.ctx.nova_client.destroy_instance.assert_called_once_with(
|
||||
|
@ -517,7 +513,10 @@ class TestInstanceManager(base.RugTestBase):
|
|||
self.assertEqual(self.instance_mgr.state, states.DOWN)
|
||||
|
||||
def test_configure_success(self):
|
||||
self.fake_driver.build_config.return_value = 'fake_config'
|
||||
fake_config_dict = {'fake_config': 'foo'}
|
||||
self.fake_driver.build_config.return_value = dict(fake_config_dict)
|
||||
self.config(astara_metadata_port=4321)
|
||||
self.config(host='foobarhost')
|
||||
with mock.patch.object(self.instance_mgr,
|
||||
'_verify_interfaces') as verify:
|
||||
verify.return_value = True
|
||||
|
@ -531,9 +530,9 @@ class TestInstanceManager(base.RugTestBase):
|
|||
self.ctx,
|
||||
self.INSTANCE_INFO.management_port,
|
||||
{'ext-net': 'ge1', 'int-net': 'ge2', 'mgt-net': 'ge0'})
|
||||
self.fake_driver.update_config.assert_called_once_with(
|
||||
self.INSTANCE_INFO.management_address, 'fake_config',
|
||||
)
|
||||
|
||||
self.fake_driver.update_config.assert_called_with(
|
||||
self.INSTANCE_INFO.management_address, fake_config_dict)
|
||||
self.assertEqual(self.instance_mgr.state,
|
||||
states.CONFIGURED)
|
||||
|
||||
|
@ -552,9 +551,10 @@ class TestInstanceManager(base.RugTestBase):
|
|||
|
||||
@mock.patch('time.sleep')
|
||||
def test_configure_failure(self, sleep):
|
||||
fake_config_dict = {'fake_config': 'foo'}
|
||||
|
||||
self.fake_driver.update_config.side_effect = Exception
|
||||
self.fake_driver.build_config.return_value = 'fake_config'
|
||||
self.fake_driver.build_config.return_value = fake_config_dict
|
||||
|
||||
with mock.patch.object(self.instance_mgr,
|
||||
'_verify_interfaces') as verify:
|
||||
|
@ -567,7 +567,7 @@ class TestInstanceManager(base.RugTestBase):
|
|||
|
||||
expected_calls = [
|
||||
mock.call(self.INSTANCE_INFO.management_address,
|
||||
'fake_config')
|
||||
fake_config_dict)
|
||||
for i in range(0, 2)]
|
||||
self.fake_driver.update_config.assert_has_calls(expected_calls)
|
||||
self.assertEqual(self.instance_mgr.state, states.RESTART)
|
||||
|
|
|
@ -30,6 +30,7 @@ from astara import worker
|
|||
|
||||
from astara.common.hash_ring import DC_KEY
|
||||
|
||||
from astara.test.unit import fakes
|
||||
from astara.test.unit.db import base
|
||||
|
||||
|
||||
|
@ -59,7 +60,7 @@ class WorkerTestBase(base.DbTestCase):
|
|||
self.fake_neutron = mock.patch.object(
|
||||
neutron, 'Neutron', return_value=fake_neutron_obj).start()
|
||||
|
||||
self.w = worker.Worker(mock.Mock())
|
||||
self.w = worker.Worker(mock.Mock(), fakes.FAKE_MGT_ADDR)
|
||||
self.addCleanup(mock.patch.stopall)
|
||||
|
||||
self.target = self.tenant_id
|
||||
|
@ -355,12 +356,24 @@ class TestWorker(WorkerTestBase):
|
|||
# just ensure we dont raise
|
||||
self.w._release_resource_lock(fake_sm)
|
||||
|
||||
def test_worker_context_config(self):
|
||||
self.config(astara_metadata_port=1234)
|
||||
self.config(host='foohost')
|
||||
ctxt = worker.WorkerContext(fakes.FAKE_MGT_ADDR)
|
||||
self.assertEqual(
|
||||
ctxt.config,
|
||||
{
|
||||
'host': 'foohost',
|
||||
'metadata_port': 1234,
|
||||
'address': fakes.FAKE_MGT_ADDR,
|
||||
})
|
||||
|
||||
|
||||
class TestResourceCache(WorkerTestBase):
|
||||
def setUp(self):
|
||||
super(TestResourceCache, self).setUp()
|
||||
self.resource_cache = worker.TenantResourceCache()
|
||||
self.worker_context = worker.WorkerContext()
|
||||
self.worker_context = worker.WorkerContext(fakes.FAKE_MGT_ADDR)
|
||||
|
||||
def test_resource_cache_hit(self):
|
||||
self.resource_cache._tenant_resources = {
|
||||
|
@ -454,7 +467,8 @@ class TestCreatingResource(WorkerTestBase):
|
|||
def test_message_enqueued(self):
|
||||
self.w.handle_message(self.tenant_id, self.msg)
|
||||
trm = self.w.tenant_managers[self.tenant_id]
|
||||
sm = trm.get_state_machines(self.msg, worker.WorkerContext())[0]
|
||||
sm = trm.get_state_machines(self.msg, worker.WorkerContext(
|
||||
fakes.FAKE_MGT_ADDR))[0]
|
||||
self.assertEqual(len(sm._queue), 1)
|
||||
|
||||
|
||||
|
@ -518,7 +532,7 @@ class TestShutdown(WorkerTestBase):
|
|||
@mock.patch('kombu.Producer')
|
||||
def test_stop_threads_notifier(self, producer, exchange, broker):
|
||||
notifier = notifications.Publisher('topic')
|
||||
w = worker.Worker(notifier)
|
||||
w = worker.Worker(notifier, fakes.FAKE_MGT_ADDR)
|
||||
self.assertTrue(notifier)
|
||||
w._shutdown()
|
||||
self.assertFalse(w.notifier._t)
|
||||
|
@ -527,7 +541,7 @@ class TestShutdown(WorkerTestBase):
|
|||
class TestUpdateStateMachine(WorkerTestBase):
|
||||
def setUp(self):
|
||||
super(TestUpdateStateMachine, self).setUp()
|
||||
self.worker_context = worker.WorkerContext()
|
||||
self.worker_context = worker.WorkerContext(fakes.FAKE_MGT_ADDR)
|
||||
self.w._should_process_message = mock.MagicMock(return_value=self.msg)
|
||||
|
||||
def _test(self, fake_hash, negative=False):
|
||||
|
@ -666,7 +680,8 @@ class TestDebugRouters(WorkerTestBase):
|
|||
# Create the router manager and state machine so we can
|
||||
# replace the send_message() method with a mock.
|
||||
trm = self.w._get_trms(tenant_id)[0]
|
||||
sm = trm.get_state_machines(msg, worker.WorkerContext())[0]
|
||||
sm = trm.get_state_machines(msg, worker.WorkerContext(
|
||||
fakes.FAKE_MGT_ADDR))[0]
|
||||
with mock.patch.object(sm, 'send_message') as meth:
|
||||
# The router id is being ignored, so the send_message()
|
||||
# method shouldn't ever be invoked.
|
||||
|
@ -718,7 +733,8 @@ class TestDebugTenants(WorkerTestBase):
|
|||
# Create the router manager and state machine so we can
|
||||
# replace the send_message() method with a mock.
|
||||
trm = self.w._get_trms(tenant_id)[0]
|
||||
sm = trm.get_state_machines(msg, worker.WorkerContext())[0]
|
||||
sm = trm.get_state_machines(msg, worker.WorkerContext(
|
||||
fakes.FAKE_MGT_ADDR))[0]
|
||||
with mock.patch.object(sm, 'send_message') as meth:
|
||||
# The tenant id is being ignored, so the send_message()
|
||||
# method shouldn't ever be invoked.
|
||||
|
@ -772,7 +788,8 @@ class TestGlobalDebug(WorkerTestBase):
|
|||
# Create the router manager and state machine so we can
|
||||
# replace the send_message() method with a mock.
|
||||
trm = self.w._get_trms(tenant_id)[0]
|
||||
sm = trm.get_state_machines(msg, worker.WorkerContext())[0]
|
||||
sm = trm.get_state_machines(msg, worker.WorkerContext(
|
||||
fakes.FAKE_MGT_ADDR))[0]
|
||||
with mock.patch.object(sm, 'send_message') as meth:
|
||||
# The tenant id is being ignored, so the send_message()
|
||||
# method shouldn't ever be invoked.
|
||||
|
@ -795,7 +812,8 @@ class TestRebalance(WorkerTestBase):
|
|||
body={'key': 'value'},
|
||||
)
|
||||
trm = self.w._get_trms(tenant_id)[0]
|
||||
sm = trm.get_state_machines(msg, worker.WorkerContext())[0]
|
||||
sm = trm.get_state_machines(msg, worker.WorkerContext(
|
||||
fakes.FAKE_MGT_ADDR))[0]
|
||||
|
||||
self.w.hash_ring_mgr.rebalance(['foo'])
|
||||
self.assertEqual(self.w.hash_ring_mgr.hosts, set(['foo']))
|
||||
|
|
|
@ -126,9 +126,18 @@ class WorkerContext(object):
|
|||
"""Holds resources owned by the worker and used by the Automaton.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
def __init__(self, management_address):
|
||||
self.neutron = neutron.Neutron(cfg.CONF)
|
||||
self.nova_client = nova.Nova(cfg.CONF)
|
||||
self.management_address = management_address
|
||||
|
||||
@property
|
||||
def config(self):
|
||||
return {
|
||||
'address': self.management_address,
|
||||
'metadata_port': cfg.CONF.astara_metadata_port,
|
||||
'host': cfg.CONF.host,
|
||||
}
|
||||
|
||||
|
||||
class Worker(object):
|
||||
|
@ -138,7 +147,7 @@ class Worker(object):
|
|||
track of a bunch of the state machines, so the callable is a
|
||||
method of an instance of this class instead of a simple function.
|
||||
"""
|
||||
def __init__(self, notifier):
|
||||
def __init__(self, notifier, management_address):
|
||||
self._ignore_directory = cfg.CONF.ignored_router_directory
|
||||
self._queue_warning_threshold = cfg.CONF.queue_warning_threshold
|
||||
self._reboot_error_threshold = cfg.CONF.reboot_error_threshold
|
||||
|
@ -147,11 +156,12 @@ class Worker(object):
|
|||
self.lock = threading.Lock()
|
||||
self._keep_going = True
|
||||
self.tenant_managers = {}
|
||||
self.management_address = management_address
|
||||
self.resource_cache = TenantResourceCache()
|
||||
|
||||
# This process-global context should not be used in the
|
||||
# threads, since the clients are not thread-safe.
|
||||
self._context = WorkerContext()
|
||||
self._context = WorkerContext(self.management_address)
|
||||
self.notifier = notifier
|
||||
# The notifier needs to be started here to ensure that it
|
||||
# happens inside the worker process and not the parent.
|
||||
|
@ -191,7 +201,7 @@ class Worker(object):
|
|||
# messages and talking to the tenant router manager because we
|
||||
# are in a different thread and the clients are not
|
||||
# thread-safe.
|
||||
context = WorkerContext()
|
||||
context = WorkerContext(self.management_address)
|
||||
while self._keep_going:
|
||||
try:
|
||||
# Try to get a state machine from the work queue. If
|
||||
|
|
|
@ -0,0 +1,5 @@
|
|||
---
|
||||
features:
|
||||
- >
|
||||
The orchestrator now pushes local orchestrator-specific configuration into the appliance, allowing
|
||||
services like the metadata proxy to be configured specifically for current cluster layout.
|
Loading…
Reference in New Issue