Introduces the ZoneManager to the Scheduler which polls the child zones and caches their availability and capabilities.

This commit is contained in:
Sandy Walsh 2011-03-10 14:49:59 +00:00 committed by Tarmac
commit b33b6d1a52
9 changed files with 441 additions and 20 deletions

View File

@ -77,8 +77,8 @@ class APIRouter(wsgi.Router):
server_members['pause'] = 'POST'
server_members['unpause'] = 'POST'
server_members["diagnostics"] = "GET"
server_members["actions"] = "GET"
server_members['diagnostics'] = 'GET'
server_members['actions'] = 'GET'
server_members['suspend'] = 'POST'
server_members['resume'] = 'POST'
server_members['rescue'] = 'POST'
@ -87,7 +87,7 @@ class APIRouter(wsgi.Router):
server_members['inject_network_info'] = 'POST'
mapper.resource("zone", "zones", controller=zones.Controller(),
collection={'detail': 'GET'})
collection={'detail': 'GET', 'info': 'GET'}),
mapper.resource("server", "servers", controller=servers.Controller(),
collection={'detail': 'GET'},

View File

@ -1,4 +1,4 @@
# Copyright 2010 OpenStack LLC.
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -18,6 +18,7 @@ import common
from nova import flags
from nova import wsgi
from nova import db
from nova.scheduler import api
FLAGS = flags.FLAGS
@ -32,6 +33,10 @@ def _filter_keys(item, keys):
return dict((k, v) for k, v in item.iteritems() if k in keys)
def _exclude_keys(item, keys):
return dict((k, v) for k, v in item.iteritems() if k not in keys)
def _scrub_zone(zone):
return _filter_keys(zone, ('id', 'api_url'))
@ -41,19 +46,30 @@ class Controller(wsgi.Controller):
_serialization_metadata = {
'application/xml': {
"attributes": {
"zone": ["id", "api_url"]}}}
"zone": ["id", "api_url", "name", "capabilities"]}}}
def index(self, req):
"""Return all zones in brief"""
items = db.zone_get_all(req.environ['nova.context'])
# Ask the ZoneManager in the Scheduler for most recent data,
# or fall-back to the database ...
items = api.API().get_zone_list(req.environ['nova.context'])
if not items:
items = db.zone_get_all(req.environ['nova.context'])
items = common.limited(items, req)
items = [_scrub_zone(item) for item in items]
items = [_exclude_keys(item, ['username', 'password'])
for item in items]
return dict(zones=items)
def detail(self, req):
"""Return all zones in detail"""
return self.index(req)
def info(self, req):
"""Return name and capabilities for this zone."""
return dict(zone=dict(name=FLAGS.zone_name,
capabilities=FLAGS.zone_capabilities))
def show(self, req, id):
"""Return data about the given zone id"""
zone_id = int(id)

View File

@ -356,3 +356,7 @@ DEFINE_string('host', socket.gethostname(),
DEFINE_string('node_availability_zone', 'nova',
'availability zone of this node')
DEFINE_string('zone_name', 'nova', 'name of this zone')
DEFINE_string('zone_capabilities', 'kypervisor:xenserver;os:linux',
'Key/Value tags which represent capabilities of this zone')

49
nova/scheduler/api.py Normal file
View File

@ -0,0 +1,49 @@
# Copyright (c) 2011 Openstack, LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Handles all requests relating to schedulers.
"""
from nova import flags
from nova import log as logging
from nova import rpc
FLAGS = flags.FLAGS
LOG = logging.getLogger('nova.scheduler.api')
class API(object):
"""API for interacting with the scheduler."""
def _call_scheduler(self, method, context, params=None):
"""Generic handler for RPC calls to the scheduler.
:param params: Optional dictionary of arguments to be passed to the
scheduler worker
:retval: Result returned by scheduler worker
"""
if not params:
params = {}
queue = FLAGS.scheduler_topic
kwargs = {'method': method, 'args': params}
return rpc.call(context, queue, kwargs)
def get_zone_list(self, context):
items = self._call_scheduler('get_zone_list', context)
for item in items:
item['api_url'] = item['api_url'].replace('\\/', '/')
return items

View File

@ -29,6 +29,7 @@ from nova import log as logging
from nova import manager
from nova import rpc
from nova import utils
from nova.scheduler import zone_manager
LOG = logging.getLogger('nova.scheduler.manager')
FLAGS = flags.FLAGS
@ -43,12 +44,21 @@ class SchedulerManager(manager.Manager):
if not scheduler_driver:
scheduler_driver = FLAGS.scheduler_driver
self.driver = utils.import_object(scheduler_driver)
self.zone_manager = zone_manager.ZoneManager()
super(SchedulerManager, self).__init__(*args, **kwargs)
def __getattr__(self, key):
"""Converts all method calls to use the schedule method"""
return functools.partial(self._schedule, key)
def periodic_tasks(self, context=None):
"""Poll child zones periodically to get status."""
self.zone_manager.ping(context)
def get_zone_list(self, context=None):
"""Get a list of zones from the ZoneManager."""
return self.zone_manager.get_zone_list()
def _schedule(self, method, context, topic, *args, **kwargs):
"""Tries to call schedule_* method on the driver to retrieve host.

View File

@ -0,0 +1,143 @@
# Copyright (c) 2011 Openstack, LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
ZoneManager oversees all communications with child Zones.
"""
import novaclient
import thread
import traceback
from datetime import datetime
from eventlet import greenpool
from nova import db
from nova import flags
from nova import log as logging
FLAGS = flags.FLAGS
flags.DEFINE_integer('zone_db_check_interval', 60,
'Seconds between getting fresh zone info from db.')
flags.DEFINE_integer('zone_failures_to_offline', 3,
'Number of consecutive errors before marking zone offline')
class ZoneState(object):
"""Holds the state of all connected child zones."""
def __init__(self):
self.is_active = True
self.name = None
self.capabilities = None
self.attempt = 0
self.last_seen = datetime.min
self.last_exception = None
self.last_exception_time = None
def update_credentials(self, zone):
"""Update zone credentials from db"""
self.zone_id = zone.id
self.api_url = zone.api_url
self.username = zone.username
self.password = zone.password
def update_metadata(self, zone_metadata):
"""Update zone metadata after successful communications with
child zone."""
self.last_seen = datetime.now()
self.attempt = 0
self.name = zone_metadata["name"]
self.capabilities = zone_metadata["capabilities"]
self.is_active = True
def to_dict(self):
return dict(name=self.name, capabilities=self.capabilities,
is_active=self.is_active, api_url=self.api_url,
id=self.zone_id)
def log_error(self, exception):
"""Something went wrong. Check to see if zone should be
marked as offline."""
self.last_exception = exception
self.last_exception_time = datetime.now()
api_url = self.api_url
logging.warning(_("'%(exception)s' error talking to "
"zone %(api_url)s") % locals())
max_errors = FLAGS.zone_failures_to_offline
self.attempt += 1
if self.attempt >= max_errors:
self.is_active = False
logging.error(_("No answer from zone %(api_url)s "
"after %(max_errors)d "
"attempts. Marking inactive.") % locals())
def _call_novaclient(zone):
"""Call novaclient. Broken out for testing purposes."""
client = novaclient.OpenStack(zone.username, zone.password, zone.api_url)
return client.zones.info()._info
def _poll_zone(zone):
"""Eventlet worker to poll a zone."""
logging.debug(_("Polling zone: %s") % zone.api_url)
try:
zone.update_metadata(_call_novaclient(zone))
except Exception, e:
zone.log_error(traceback.format_exc())
class ZoneManager(object):
"""Keeps the zone states updated."""
def __init__(self):
self.last_zone_db_check = datetime.min
self.zone_states = {}
self.green_pool = greenpool.GreenPool()
def get_zone_list(self):
"""Return the list of zones we know about."""
return [zone.to_dict() for zone in self.zone_states.values()]
def _refresh_from_db(self, context):
"""Make our zone state map match the db."""
# Add/update existing zones ...
zones = db.zone_get_all(context)
existing = self.zone_states.keys()
db_keys = []
for zone in zones:
db_keys.append(zone.id)
if zone.id not in existing:
self.zone_states[zone.id] = ZoneState()
self.zone_states[zone.id].update_credentials(zone)
# Cleanup zones removed from db ...
keys = self.zone_states.keys() # since we're deleting
for zone_id in keys:
if zone_id not in db_keys:
del self.zone_states[zone_id]
def _poll_zones(self, context):
"""Try to connect to each child zone and get update."""
self.green_pool.imap(_poll_zone, self.zone_states.values())
def ping(self, context=None):
"""Ping should be called periodically to update zone status."""
diff = datetime.now() - self.last_zone_db_check
if diff.seconds >= FLAGS.zone_db_check_interval:
logging.debug(_("Updating zone cache from db."))
self.last_zone_db_check = datetime.now()
self._refresh_from_db(context)
self._poll_zones(context)

View File

@ -1,4 +1,4 @@
# Copyright 2010 OpenStack LLC.
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -24,6 +24,7 @@ from nova import flags
from nova import test
from nova.api.openstack import zones
from nova.tests.api.openstack import fakes
from nova.scheduler import api
FLAGS = flags.FLAGS
@ -31,7 +32,7 @@ FLAGS.verbose = True
def zone_get(context, zone_id):
return dict(id=1, api_url='http://foo.com', username='bob',
return dict(id=1, api_url='http://example.com', username='bob',
password='xxx')
@ -42,7 +43,7 @@ def zone_create(context, values):
def zone_update(context, zone_id, values):
zone = dict(id=zone_id, api_url='http://foo.com', username='bob',
zone = dict(id=zone_id, api_url='http://example.com', username='bob',
password='xxx')
zone.update(values)
return zone
@ -52,12 +53,26 @@ def zone_delete(context, zone_id):
pass
def zone_get_all(context):
def zone_get_all_scheduler(*args):
return [
dict(id=1, api_url='http://foo.com', username='bob',
dict(id=1, api_url='http://example.com', username='bob',
password='xxx'),
dict(id=2, api_url='http://blah.com', username='alice',
password='qwerty')]
dict(id=2, api_url='http://example.org', username='alice',
password='qwerty')
]
def zone_get_all_scheduler_empty(*args):
return []
def zone_get_all_db(context):
return [
dict(id=1, api_url='http://example.com', username='bob',
password='xxx'),
dict(id=2, api_url='http://example.org', username='alice',
password='qwerty')
]
class ZonesTest(test.TestCase):
@ -74,7 +89,6 @@ class ZonesTest(test.TestCase):
FLAGS.allow_admin_api = True
self.stubs.Set(nova.db, 'zone_get', zone_get)
self.stubs.Set(nova.db, 'zone_get_all', zone_get_all)
self.stubs.Set(nova.db, 'zone_update', zone_update)
self.stubs.Set(nova.db, 'zone_create', zone_create)
self.stubs.Set(nova.db, 'zone_delete', zone_delete)
@ -84,7 +98,19 @@ class ZonesTest(test.TestCase):
FLAGS.allow_admin_api = self.allow_admin
super(ZonesTest, self).tearDown()
def test_get_zone_list(self):
def test_get_zone_list_scheduler(self):
self.stubs.Set(api.API, '_call_scheduler', zone_get_all_scheduler)
req = webob.Request.blank('/v1.0/zones')
res = req.get_response(fakes.wsgi_app())
res_dict = json.loads(res.body)
self.assertEqual(res.status_int, 200)
self.assertEqual(len(res_dict['zones']), 2)
def test_get_zone_list_db(self):
self.stubs.Set(api.API, '_call_scheduler',
zone_get_all_scheduler_empty)
self.stubs.Set(nova.db, 'zone_get_all', zone_get_all_db)
req = webob.Request.blank('/v1.0/zones')
req.headers["Content-Type"] = "application/json"
res = req.get_response(fakes.wsgi_app())
@ -101,7 +127,7 @@ class ZonesTest(test.TestCase):
self.assertEqual(res.status_int, 200)
res_dict = json.loads(res.body)
self.assertEqual(res_dict['zone']['id'], 1)
self.assertEqual(res_dict['zone']['api_url'], 'http://foo.com')
self.assertEqual(res_dict['zone']['api_url'], 'http://example.com')
self.assertFalse('password' in res_dict['zone'])
def test_zone_delete(self):
@ -112,7 +138,7 @@ class ZonesTest(test.TestCase):
self.assertEqual(res.status_int, 200)
def test_zone_create(self):
body = dict(zone=dict(api_url='http://blah.zoo', username='fred',
body = dict(zone=dict(api_url='http://example.com', username='fred',
password='fubar'))
req = webob.Request.blank('/v1.0/zones')
req.headers["Content-Type"] = "application/json"
@ -124,7 +150,7 @@ class ZonesTest(test.TestCase):
self.assertEqual(res.status_int, 200)
res_dict = json.loads(res.body)
self.assertEqual(res_dict['zone']['id'], 1)
self.assertEqual(res_dict['zone']['api_url'], 'http://blah.zoo')
self.assertEqual(res_dict['zone']['api_url'], 'http://example.com')
self.assertFalse('username' in res_dict['zone'])
def test_zone_update(self):
@ -139,5 +165,5 @@ class ZonesTest(test.TestCase):
self.assertEqual(res.status_int, 200)
res_dict = json.loads(res.body)
self.assertEqual(res_dict['zone']['id'], 1)
self.assertEqual(res_dict['zone']['api_url'], 'http://foo.com')
self.assertEqual(res_dict['zone']['api_url'], 'http://example.com')
self.assertFalse('username' in res_dict['zone'])

172
nova/tests/test_zones.py Normal file
View File

@ -0,0 +1,172 @@
# Copyright 2010 United States Government as represented by the
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Tests For ZoneManager
"""
import datetime
import mox
import novaclient
from nova import context
from nova import db
from nova import flags
from nova import service
from nova import test
from nova import rpc
from nova import utils
from nova.auth import manager as auth_manager
from nova.scheduler import zone_manager
FLAGS = flags.FLAGS
class FakeZone:
"""Represents a fake zone from the db"""
def __init__(self, *args, **kwargs):
for k, v in kwargs.iteritems():
setattr(self, k, v)
def exploding_novaclient(zone):
"""Used when we want to simulate a novaclient call failing."""
raise Exception("kaboom")
class ZoneManagerTestCase(test.TestCase):
"""Test case for zone manager"""
def test_ping(self):
zm = zone_manager.ZoneManager()
self.mox.StubOutWithMock(zm, '_refresh_from_db')
self.mox.StubOutWithMock(zm, '_poll_zones')
zm._refresh_from_db(mox.IgnoreArg())
zm._poll_zones(mox.IgnoreArg())
self.mox.ReplayAll()
zm.ping(None)
self.mox.VerifyAll()
def test_refresh_from_db_new(self):
zm = zone_manager.ZoneManager()
self.mox.StubOutWithMock(db, 'zone_get_all')
db.zone_get_all(mox.IgnoreArg()).AndReturn([
FakeZone(id=1, api_url='http://foo.com', username='user1',
password='pass1'),
])
self.assertEquals(len(zm.zone_states), 0)
self.mox.ReplayAll()
zm._refresh_from_db(None)
self.mox.VerifyAll()
self.assertEquals(len(zm.zone_states), 1)
self.assertEquals(zm.zone_states[1].username, 'user1')
def test_refresh_from_db_replace_existing(self):
zm = zone_manager.ZoneManager()
zone_state = zone_manager.ZoneState()
zone_state.update_credentials(FakeZone(id=1, api_url='http://foo.com',
username='user1', password='pass1'))
zm.zone_states[1] = zone_state
self.mox.StubOutWithMock(db, 'zone_get_all')
db.zone_get_all(mox.IgnoreArg()).AndReturn([
FakeZone(id=1, api_url='http://foo.com', username='user2',
password='pass2'),
])
self.assertEquals(len(zm.zone_states), 1)
self.mox.ReplayAll()
zm._refresh_from_db(None)
self.mox.VerifyAll()
self.assertEquals(len(zm.zone_states), 1)
self.assertEquals(zm.zone_states[1].username, 'user2')
def test_refresh_from_db_missing(self):
zm = zone_manager.ZoneManager()
zone_state = zone_manager.ZoneState()
zone_state.update_credentials(FakeZone(id=1, api_url='http://foo.com',
username='user1', password='pass1'))
zm.zone_states[1] = zone_state
self.mox.StubOutWithMock(db, 'zone_get_all')
db.zone_get_all(mox.IgnoreArg()).AndReturn([])
self.assertEquals(len(zm.zone_states), 1)
self.mox.ReplayAll()
zm._refresh_from_db(None)
self.mox.VerifyAll()
self.assertEquals(len(zm.zone_states), 0)
def test_refresh_from_db_add_and_delete(self):
zm = zone_manager.ZoneManager()
zone_state = zone_manager.ZoneState()
zone_state.update_credentials(FakeZone(id=1, api_url='http://foo.com',
username='user1', password='pass1'))
zm.zone_states[1] = zone_state
self.mox.StubOutWithMock(db, 'zone_get_all')
db.zone_get_all(mox.IgnoreArg()).AndReturn([
FakeZone(id=2, api_url='http://foo.com', username='user2',
password='pass2'),
])
self.assertEquals(len(zm.zone_states), 1)
self.mox.ReplayAll()
zm._refresh_from_db(None)
self.mox.VerifyAll()
self.assertEquals(len(zm.zone_states), 1)
self.assertEquals(zm.zone_states[2].username, 'user2')
def test_poll_zone(self):
self.mox.StubOutWithMock(zone_manager, '_call_novaclient')
zone_manager._call_novaclient(mox.IgnoreArg()).AndReturn(
dict(name='zohan', capabilities='hairdresser'))
zone_state = zone_manager.ZoneState()
zone_state.update_credentials(FakeZone(id=2,
api_url='http://foo.com', username='user2',
password='pass2'))
zone_state.attempt = 1
self.mox.ReplayAll()
zone_manager._poll_zone(zone_state)
self.mox.VerifyAll()
self.assertEquals(zone_state.attempt, 0)
self.assertEquals(zone_state.name, 'zohan')
def test_poll_zone_fails(self):
self.stubs.Set(zone_manager, "_call_novaclient", exploding_novaclient)
zone_state = zone_manager.ZoneState()
zone_state.update_credentials(FakeZone(id=2,
api_url='http://foo.com', username='user2',
password='pass2'))
zone_state.attempt = FLAGS.zone_failures_to_offline - 1
self.mox.ReplayAll()
zone_manager._poll_zone(zone_state)
self.mox.VerifyAll()
self.assertEquals(zone_state.attempt, 3)
self.assertFalse(zone_state.is_active)
self.assertEquals(zone_state.name, None)

View File

@ -10,6 +10,7 @@ boto==1.9b
carrot==0.10.5
eventlet==0.9.12
lockfile==0.8
python-novaclient==2.3
python-daemon==1.5.5
python-gflags==1.3
redis==2.0.0