Use shade for all OpenStack interactions

We wrote shade as an extraction of the logic we had in nodepool, and
have since expanded it to support more clouds. It's time to start
using it in nodepool, since that will allow us to add more clouds
and also to handle a wider variety of them.

Making a patch series was too tricky because of the way fakes and
threading work, so this is everything in one stab.

Depends-On: I557694b3931d81a3524c781ab5dabfb5995557f5
Change-Id: I423716d619aafb2eca5c1748bc65b38603a97b6a
Co-Authored-By: James E. Blair <jeblair@linux.vnet.ibm.com>
Co-Authored-By: David Shrewsbury <shrewsbury.dave@gmail.com>
Co-Authored-By: Yolanda Robla <yolanda.robla-mota@hpe.com>
This commit is contained in:
Monty Taylor 2015-09-22 21:37:54 -05:00 committed by Yolanda Robla
parent afdd58c10a
commit e1f4a12949
11 changed files with 310 additions and 626 deletions

View File

@ -97,7 +97,7 @@ function nodepool_write_config {
keys=simple
[loggers]
keys=root,nodepool
keys=root,nodepool,shade
[handlers]
keys=console
@ -112,6 +112,12 @@ handlers=console
qualname=nodepool
propagate=0
[logger_shade]
level=DEBUG
handlers=console
qualname=shade
propagate=0
[handler_console]
level=DEBUG
class=StreamHandler

View File

@ -347,7 +347,7 @@ class NodePoolBuilder(object):
image_file = image_files[0]
filename = image_file.to_path(self._config.imagesdir,
with_extension=False)
with_extension=True)
dummy_image = type('obj', (object,),
{'name': image_name})
@ -364,13 +364,11 @@ class NodePoolBuilder(object):
"Could not find matching provider image for %s", image_name
)
image_meta = provider_image.meta
external_id = manager.uploadImage(ext_image_name, filename,
image_file.extension, 'bare',
image_meta)
self.log.info("Saving image id: %s with external id %s" %
(image_id, external_id))
# It can take a _very_ long time for Rackspace 1.0 to save an image
manager.waitForImage(external_id, IMAGE_TIMEOUT)
# uploadImage is synchronous
external_id = manager.uploadImage(
ext_image_name, filename,
image_type=image_file.extension,
meta=image_meta)
if self.statsd:
dt = int((time.time() - start_time) * 1000)

View File

@ -145,7 +145,7 @@ def loadConfig(config_path):
p.region_name = provider.get('region-name')
p.max_servers = provider['max-servers']
p.keypair = provider.get('keypair', None)
p.pool = provider.get('pool')
p.pool = provider.get('pool', None)
p.rate = provider.get('rate', 1.0)
p.api_timeout = provider.get('api-timeout')
p.boot_timeout = provider.get('boot-timeout', 60)
@ -171,7 +171,8 @@ def loadConfig(config_path):
'template-hostname',
'template-{image.name}-{timestamp}'
)
p.image_type = provider.get('image-type', 'qcow2')
p.image_type = provider.get(
'image-type', p.cloud_config.config['image_format'])
p.images = {}
for image in provider['images']:
i = ProviderImage()
@ -195,7 +196,7 @@ def loadConfig(config_path):
# custom properties when the image is uploaded.
i.meta = image.get('meta', {})
# 5 elements, and no key or value can be > 255 chars
# per novaclient.servers.create() rules
# per Nova API rules
if i.meta:
if len(i.meta) > 5 or \
any([len(k) > 255 or len(v) > 255

View File

@ -39,3 +39,7 @@ class IPAddTimeoutException(TimeoutException):
class ServerDeleteException(TimeoutException):
statsd_key = 'error.serverdelete'
class ImageCreateException(TimeoutException):
statsd_key = 'error.imagetimeout'

View File

@ -16,27 +16,33 @@
import StringIO
import logging
import novaclient
import requests.exceptions
import threading
import time
import uuid
import exceptions
from jenkins import JenkinsException
import shade
import exceptions
class Dummy(object):
IMAGE = 'Image'
INSTANCE = 'Instance'
FLAVOR = 'Flavor'
KEYPAIR = 'Keypair'
def __init__(self, kind, **kw):
self.__kind = kind
self.__kw = kw
for k, v in kw.items():
setattr(self, k, v)
try:
if self.should_fail:
raise shade.OpenStackCloudException('This image has '
'SHOULD_FAIL set to True.')
except AttributeError:
pass
def __repr__(self):
args = []
@ -45,16 +51,17 @@ class Dummy(object):
args = ' '.join(args)
return '<%s %s %s>' % (self.__kind, id(self), args)
def delete(self):
self.manager.delete(self)
def __getitem__(self, key, default=None):
return getattr(self, key, default)
def update(self, data):
try:
if self.should_fail:
raise shade.OpenStackCloudException('This image has '
'SHOULD_FAIL set to True.')
except AttributeError:
pass
def __setitem__(self, key, value):
setattr(self, key, value)
def get(self, key, default=None):
return getattr(self, key, default)
def set(self, key, value):
setattr(self, key, value)
def fake_get_one_cloud(cloud_config, cloud_kwargs):
@ -62,47 +69,47 @@ def fake_get_one_cloud(cloud_config, cloud_kwargs):
return cloud_config.get_one_cloud(**cloud_kwargs)
class FakeList(object):
log = logging.getLogger("nodepool.FakeList")
class FakeOpenStackCloud(object):
log = logging.getLogger("nodepool.FakeOpenStackCloud")
def __init__(self, l):
self._list = l
def __init__(self, images=None, networks=None):
self._image_list = images
if self._image_list is None:
self._image_list = [
Dummy(
Dummy.IMAGE,
id='fake-image-id',
status='READY',
name='Fake Precise',
metadata={})
]
if networks is None:
networks = [dict(id='fake-public-network-uuid',
name='fake-public-network-name'),
dict(id='fake-private-network-uuid',
name='fake-private-network-name')]
self.networks = networks
self._flavor_list = [
Dummy(Dummy.FLAVOR, id='f1', ram=8192, name='Fake Flavor'),
Dummy(Dummy.FLAVOR, id='f2', ram=8192, name='Unreal Flavor'),
]
self._server_list = []
self._keypair_list = []
def list(self):
self.log.debug("List %s" % repr(self._list))
return self._list
def _get(self, name_or_id, instance_list):
self.log.debug("Get %s in %s" % (name_or_id, repr(instance_list)))
for instance in instance_list:
if instance.name == name_or_id or instance.id == name_or_id:
return instance
return None
def find(self, name):
for x in self._list:
if x.name == name:
return x
def get_network(self, name_or_id, filters=None):
return dict(id='fake-network-uuid',
name='fake-network-name')
def get(self, image=None):
if image:
id = image
self.log.debug("Get %s in %s" % (id, repr(self._list)))
for x in self._list:
if x.id == id:
return x
raise novaclient.exceptions.NotFound(404)
def _finish(self, obj, delay, status):
time.sleep(delay)
obj.status = status
def delete(self, *args, **kw):
self.log.debug("Delete from %s" % repr(self._list))
if 'image' in kw:
self._list.remove(self.get(kw['image']))
else:
obj = args[0]
if hasattr(obj, 'id'):
self._list.remove(obj)
else:
self._list.remove(self.get(obj))
self.log.debug("Deleted from %s" % repr(self._list))
def create(self, **kw):
def _create(
self, instance_list, instance_type=Dummy.INSTANCE,
done_status='ACTIVE', **kw):
should_fail = kw.get('SHOULD_FAIL', '').lower() == 'true'
nics = kw.get('nics', [])
addresses = None
@ -116,112 +123,114 @@ class FakeList(object):
dict(version=6, addr='fake_v6')],
private=[dict(version=4, addr='fake')]
)
public_v6 = 'fake_v6'
public_v4 = 'fake'
private_v4 = 'fake'
break
if not addresses:
addresses = dict(
public=[dict(version=4, addr='fake')],
private=[dict(version=4, addr='fake')]
)
s = Dummy(Dummy.INSTANCE,
public_v6 = ''
public_v4 = 'fake'
private_v4 = 'fake'
s = Dummy(instance_type,
id=uuid.uuid4().hex,
name=kw['name'],
status='BUILD',
adminPass='fake',
addresses=addresses,
public_v4=public_v4,
public_v6=public_v6,
private_v4=private_v4,
metadata=kw.get('meta', {}),
manager=self,
key_name=kw.get('key_name', None),
should_fail=should_fail)
self._list.append(s)
instance_list.append(s)
t = threading.Thread(target=self._finish,
name='FakeProvider create',
args=(s, 0.1, 'ACTIVE'))
args=(s, 0.1, done_status))
t.start()
return s
def create_image(self, server, image_name, metadata):
def _delete(self, name_or_id, instance_list):
self.log.debug("Delete from %s" % (repr(instance_list),))
instance = None
for maybe in instance_list:
if maybe.name == name_or_id or maybe.id == name_or_id:
instance = maybe
if instance:
instance_list.remove(instance)
self.log.debug("Deleted from %s" % (repr(instance_list),))
def _finish(self, obj, delay, status):
time.sleep(delay)
obj.status = status
def create_image(self, **kwargs):
return self._create(
self._image_list, instance_type=Dummy.IMAGE,
done_status='READY', **kwargs)
def get_image(self, name_or_id):
return self._get(name_or_id, self._image_list)
def list_images(self):
return self._image_list
def delete_image(self, name_or_id):
self._delete(name_or_id, self._image_list)
def create_image_snapshot(self, server, image_name, **metadata):
# XXX : validate metadata?
x = self.api.images.create(name=image_name)
return x.id
return self._create(
self._image_list, instance_type=Dummy.IMAGE,
name=image_name, **metadata)
def list_flavors(self):
return self._flavor_list
class FakeHTTPClient(object):
def get(self, path):
if path == '/extensions':
return None, dict(extensions=dict())
def create_keypair(self, name, public_key):
return self._create(
self._image_list, instance_type=Dummy.KEYPAIR,
name=name, public_key=public_key)
def list_keypairs(self):
return self._keypair_list
class BadHTTPClient(object):
'''Always raises a ProxyError'''
def get(self, path):
raise requests.exceptions.ProxyError
def delete_keypair(self, name):
self._delete(name, self._keypair_list)
def get_openstack_vars(self, server):
server.public_v4 = 'fake'
server.public_v6 = 'fake'
server.private_v4 = 'fake'
return server
class FakeClient(object):
def __init__(self, images, *args, **kwargs):
self.flavors = FakeList([
Dummy(Dummy.FLAVOR, id='f1', ram=8192, name='Fake Flavor'),
Dummy(Dummy.FLAVOR, id='f2', ram=8192, name='Unreal Flavor'),
])
self.images = images
self.client = FakeHTTPClient()
self.servers = FakeList([])
self.servers.api = self
def create_server(self, **kw):
return self._create(self._server_list, **kw)
def get_server(self, name_or_id):
result = self._get(name_or_id, self._server_list)
return result
class BadClient(FakeClient):
def __init__(self, images):
super(BadClient, self).__init__(images)
self.client = BadHTTPClient()
def wait_for_server(self, server, **kwargs):
server.status = 'ACTIVE'
return server
def list_servers(self):
return self._server_list
class BadOpenstackCloud(object):
def __init__(self, images=None):
if images is None:
images = FakeList([Dummy(Dummy.IMAGE,
id='fake-image-id',
status='READY',
name='Fake Precise',
metadata={})])
self.nova_client = BadClient(images)
class FakeGlanceClient(object):
def __init__(self, images, **kwargs):
self.kwargs = kwargs
self.images = images
class FakeNeutronClient(object):
def __init__(self, networks=None):
if networks is None:
networks = [dict(id='fake-public-network-uuid',
name='fake-public-network-name'),
dict(id='fake-private-network-uuid',
name='fake-private-network-name')]
self.networks = networks
def delete_server(self, name_or_id, delete_ips=True):
self._delete(name_or_id, self._server_list)
def list_networks(self):
return dict(networks=self.networks)
class FakeOpenStackCloud(object):
def __init__(self, images=None):
if images is None:
images = FakeList([Dummy(Dummy.IMAGE,
id='fake-image-id',
status='READY',
name='Fake Precise',
metadata={})])
self.nova_client = FakeClient(images)
self._glance_client = FakeGlanceClient(images)
self.neutron_client = FakeNeutronClient()
def create_image(self, **kwargs):
image = self._glance_client.images.create(**kwargs)
image.update('fake data')
return image
class FakeUploadFailCloud(FakeOpenStackCloud):
def create_image(self, **kwargs):
raise exceptions.BuilderError("Test fail image upload.")

View File

@ -470,18 +470,19 @@ class NodeLauncher(threading.Thread):
self.log.info("Creating server with hostname %s in %s from image %s "
"for node id: %s" % (hostname, self.provider.name,
self.image.name, self.node_id))
server_id = self.manager.createServer(
server = self.manager.createServer(
hostname, self.image.min_ram, snap_image.external_id,
name_filter=self.image.name_filter, az=self.node.az,
config_drive=self.image.config_drive,
nodepool_node_id=self.node_id,
nodepool_image_name=self.image.name)
server_id = server['id']
self.node.external_id = server_id
session.commit()
self.log.debug("Waiting for server %s for node id: %s" %
(server_id, self.node.id))
server = self.manager.waitForServer(server_id, self.launch_timeout)
server = self.manager.waitForServer(server, self.launch_timeout)
if server['status'] != 'ACTIVE':
raise LaunchStatusException("Server %s for node id: %s "
"status: %s" %
@ -496,9 +497,6 @@ class NodeLauncher(threading.Thread):
else:
self.log.warning('Preferred ipv6 not available, '
'falling back to ipv4.')
if not ip and self.manager.hasExtension('os-floating-ips'):
ip = self.manager.addPublicIP(server_id,
pool=self.provider.pool)
if not ip:
self.log.debug(
"Server data for failed IP: %s" % pprint.pformat(
@ -763,19 +761,20 @@ class SubNodeLauncher(threading.Thread):
"for subnode id: %s for node id: %s"
% (hostname, self.provider.name,
self.image.name, self.subnode_id, self.node_id))
server_id = self.manager.createServer(
server = self.manager.createServer(
hostname, self.image.min_ram, snap_image.external_id,
name_filter=self.image.name_filter, az=self.node_az,
config_drive=self.image.config_drive,
nodepool_node_id=self.node_id,
nodepool_image_name=self.image.name)
server_id = server['id']
self.subnode.external_id = server_id
session.commit()
self.log.debug("Waiting for server %s for subnode id: %s for "
"node id: %s" %
(server_id, self.subnode_id, self.node_id))
server = self.manager.waitForServer(server_id, self.launch_timeout)
server = self.manager.waitForServer(server, self.launch_timeout)
if server['status'] != 'ACTIVE':
raise LaunchStatusException("Server %s for subnode id: "
"%s for node id: %s "
@ -791,9 +790,6 @@ class SubNodeLauncher(threading.Thread):
else:
self.log.warning('Preferred ipv6 not available, '
'falling back to ipv4.')
if not ip and self.manager.hasExtension('os-floating-ips'):
ip = self.manager.addPublicIP(server_id,
pool=self.provider.pool)
if not ip:
raise LaunchNetworkException("Unable to find public IP of server")
@ -908,14 +904,15 @@ class SnapshotImageUpdater(ImageUpdater):
key_name = self.provider.keypair
key = None
use_password = False
elif self.manager.hasExtension('os-keypairs'):
key_name = hostname.split('.')[0]
key = self.manager.addKeypair(key_name)
use_password = False
else:
key_name = None
key = None
use_password = True
try:
key_name = hostname.split('.')[0]
key = self.manager.addKeypair(key_name)
use_password = False
except Exception:
key_name = None
key = None
use_password = True
uuid_pattern = 'hex{8}-(hex{4}-){3}hex{12}'.replace('hex',
'[0-9a-fA-F]')
@ -926,20 +923,18 @@ class SnapshotImageUpdater(ImageUpdater):
image_name = self.image.base_image
image_id = None
try:
server_id = self.manager.createServer(
server = self.manager.createServer(
hostname, self.image.min_ram, image_name=image_name,
key_name=key_name, name_filter=self.image.name_filter,
image_id=image_id, config_drive=self.image.config_drive,
nodepool_snapshot_image_id=self.snap_image.id)
server_id = server['id']
except Exception:
if (self.manager.hasExtension('os-keypairs') and
not self.provider.keypair):
for kp in self.manager.listKeypairs():
if kp['name'] == key_name:
self.log.debug(
'Deleting keypair for failed image build %s' %
self.snap_image.id)
self.manager.deleteKeypair(kp['name'])
if self.manager.deleteKeypair(key_name):
# Only log success - failure is logged inside of shade
self.log.debug(
'Deleted keypair for failed image build %s' %
self.snap_image.id)
raise
self.snap_image.hostname = hostname
@ -949,7 +944,7 @@ class SnapshotImageUpdater(ImageUpdater):
self.log.debug("Image id: %s waiting for server %s" %
(self.snap_image.id, server_id))
server = self.manager.waitForServer(server_id)
server = self.manager.waitForServer(server)
if server['status'] != 'ACTIVE':
raise Exception("Server %s for image id: %s status: %s" %
(server_id, self.snap_image.id, server['status']))
@ -962,24 +957,25 @@ class SnapshotImageUpdater(ImageUpdater):
else:
self.log.warning('Preferred ipv6 not available, '
'falling back to ipv4.')
if not ip and self.manager.hasExtension('os-floating-ips'):
ip = self.manager.addPublicIP(server_id,
pool=self.provider.pool)
if not ip:
self.log.error("Server dict {server}".format(
server=pprint.pformat(dict(server))))
raise Exception("Unable to find public IP of server")
server['public_ip'] = ip
self.bootstrapServer(server, key, use_password=use_password)
image_id = self.manager.createImage(server_id, hostname,
self.image.meta)
image_id = self.manager.createImage(server, hostname,
self.image.meta)['id']
self.snap_image.external_id = image_id
session.commit()
self.log.debug("Image id: %s building image %s" %
(self.snap_image.id, image_id))
# It can take a _very_ long time for Rackspace 1.0 to save an image
image = self.manager.waitForImage(image_id, IMAGE_TIMEOUT)
if image['status'] != 'ACTIVE':
# Throw exception here and not in waitForImage so that we can log
# the snap_image.id as well, which waitForImage does not know
if image['status'].lower() != 'active':
raise Exception("Image %s for image id: %s status: %s" %
(image_id, self.snap_image.id, image['status']))
@ -1865,23 +1861,26 @@ class NodePool(threading.Thread):
if snap_image.server_external_id:
try:
server = manager.getServer(snap_image.server_external_id)
self.log.debug('Deleting server %s for image id: %s' %
(snap_image.server_external_id,
snap_image.id))
manager.cleanupServer(server['id'])
manager.waitForServerDeletion(server['id'])
if server:
self.log.debug('Deleting server %s for image id: %s' %
(snap_image.server_external_id,
snap_image.id))
manager.cleanupServer(server['id'])
manager.waitForServerDeletion(server['id'])
else:
raise provider_manager.NotFound
except provider_manager.NotFound:
self.log.warning('Image server id %s not found' %
snap_image.server_external_id)
if snap_image.external_id:
try:
remote_image = manager.getImage(snap_image.external_id)
self.log.debug('Deleting image %s' % remote_image['id'])
manager.deleteImage(remote_image['id'])
except provider_manager.NotFound:
remote_image = manager.getImage(snap_image.external_id)
if remote_image is None:
self.log.warning('Image id %s not found' %
snap_image.external_id)
else:
self.log.debug('Deleting image %s' % remote_image['id'])
manager.deleteImage(remote_image['id'])
snap_image.delete()
self.log.info("Deleted image id: %s" % snap_image.id)

View File

@ -21,31 +21,16 @@ import logging
import paramiko
from contextlib import contextmanager
import threading
import time
import requests.exceptions
import sys
import shade
import novaclient
import exceptions
from nodeutils import iterate_timeout
from task_manager import Task, TaskManager, ManagerStoppedException
from task_manager import TaskManager, ManagerStoppedException
SERVER_LIST_AGE = 5 # How long to keep a cached copy of the server list
IPS_LIST_AGE = 5 # How long to keep a cached copy of the ip list
class ServerCreateException(exceptions.TimeoutException):
statsd_key = 'error.servertimeout'
class ImageCreateException(exceptions.TimeoutException):
statsd_key = 'error.imagetimeout'
def get_public_ip(server, provider, version=4):
for addr in server.addresses.get('public', []):
if type(addr) == type(u''): # Rackspace/openstack 1.0
@ -99,25 +84,6 @@ def get_private_ip(server):
return ret[0]
def make_server_dict(server, provider):
d = dict(id=str(server.id),
name=server.name,
status=server.status,
addresses=server.addresses)
if hasattr(server, 'adminPass'):
d['admin_pass'] = server.adminPass
if hasattr(server, 'key_name'):
d['key_name'] = server.key_name
if hasattr(server, 'progress'):
d['progress'] = server.progress
if hasattr(server, 'metadata'):
d['metadata'] = server.metadata
d['public_v4'] = get_public_ip(server, provider)
d['private_v4'] = get_private_ip(server)
d['public_v6'] = get_public_ip(server, provider, version=6)
return d
def make_image_dict(image):
d = dict(id=str(image.id), name=image.name, status=image.status,
metadata=image.metadata)
@ -139,146 +105,6 @@ class NotFound(Exception):
pass
class CreateServerTask(Task):
def main(self, client):
server = client.nova_client.servers.create(**self.args)
return str(server.id)
class GetServerTask(Task):
def main(self, client):
provider = self.args.pop('_nodepool_provider')
try:
server = client.nova_client.servers.get(self.args['server_id'])
except novaclient.exceptions.NotFound:
raise NotFound()
return make_server_dict(server, provider)
class DeleteServerTask(Task):
def main(self, client):
client.nova_client.servers.delete(self.args['server_id'])
class ListServersTask(Task):
def main(self, client):
provider = self.args.pop('_nodepool_provider')
servers = client.nova_client.servers.list()
return [make_server_dict(server, provider)
for server in servers]
class AddKeypairTask(Task):
def main(self, client):
client.nova_client.keypairs.create(**self.args)
class ListKeypairsTask(Task):
def main(self, client):
keys = client.nova_client.keypairs.list()
return [dict(id=str(key.id), name=key.name) for
key in keys]
class DeleteKeypairTask(Task):
def main(self, client):
client.nova_client.keypairs.delete(self.args['name'])
class CreateFloatingIPTask(Task):
def main(self, client):
ip = client.nova_client.floating_ips.create(**self.args)
return dict(id=str(ip.id), ip=ip.ip)
class AddFloatingIPTask(Task):
def main(self, client):
client.nova_client.servers.add_floating_ip(**self.args)
class GetFloatingIPTask(Task):
def main(self, client):
ip = client.nova_client.floating_ips.get(self.args['ip_id'])
return dict(id=str(ip.id), ip=ip.ip, instance_id=str(ip.instance_id))
class ListFloatingIPsTask(Task):
def main(self, client):
ips = client.nova_client.floating_ips.list()
return [dict(id=str(ip.id), ip=ip.ip,
instance_id=str(ip.instance_id)) for
ip in ips]
class RemoveFloatingIPTask(Task):
def main(self, client):
client.nova_client.servers.remove_floating_ip(**self.args)
class DeleteFloatingIPTask(Task):
def main(self, client):
client.nova_client.floating_ips.delete(self.args['ip_id'])
class CreateImageTask(Task):
def main(self, client):
# This returns an id
return str(client.nova_client.servers.create_image(**self.args))
class GetImageTask(Task):
def main(self, client):
try:
image = client.nova_client.images.get(**self.args)
except novaclient.exceptions.NotFound:
raise NotFound()
# HP returns 404, rackspace can return a 'DELETED' image.
if image.status == 'DELETED':
raise NotFound()
return make_image_dict(image)
class ListExtensionsTask(Task):
def main(self, client):
try:
resp, body = client.nova_client.client.get('/extensions')
return [x['alias'] for x in body['extensions']]
except novaclient.exceptions.NotFound:
# No extensions present.
return []
class ListFlavorsTask(Task):
def main(self, client):
flavors = client.nova_client.flavors.list()
return [dict(id=str(flavor.id), ram=flavor.ram, name=flavor.name)
for flavor in flavors]
class ListImagesTask(Task):
def main(self, client):
images = client.nova_client.images.list()
return [make_image_dict(image) for image in images]
class FindImageTask(Task):
def main(self, client):
image = client.nova_client.images.find(**self.args)
return dict(id=str(image.id))
class DeleteImageTask(Task):
def main(self, client):
client.nova_client.images.delete(**self.args)
class FindNetworkTask(Task):
def main(self, client):
for network in client.neutron_client.list_networks()['networks']:
if self.args['name'] == network['name']:
return dict(id=str(network['id']))
class ProviderManager(TaskManager):
log = logging.getLogger("nodepool.ProviderManager")
@ -310,52 +136,25 @@ class ProviderManager(TaskManager):
self.resetClient()
self._images = {}
self._networks = {}
self._cloud_metadata_read = False
self.__flavors = {}
self.__extensions = {}
self._servers = []
self._servers_time = 0
self._servers_lock = threading.Lock()
self._ips = []
self._ips_time = 0
self._ips_lock = threading.Lock()
@property
def _flavors(self):
if not self._cloud_metadata_read:
self._getCloudMetadata()
if not self.__flavors:
self.__flavors = self._getFlavors()
return self.__flavors
@property
def _extensions(self):
if not self._cloud_metadata_read:
self._getCloudMetadata()
return self.__extensions
def _getCloudMetadata(self):
self.__flavors = self._getFlavors()
self.__extensions = self.listExtensions()
self._cloud_metadata_read = True
def _getClient(self):
return shade.OpenStackCloud(
cloud_config=self.provider.cloud_config,
manager=self,
**self.provider.cloud_config.config)
def runTask(self, task):
try:
task.run(self._client)
except requests.exceptions.ProxyError:
# Try to get a new client object if we get a ProxyError
self.log.exception('Resetting client due to ProxyError')
self.resetClient()
try:
task.run(self._client)
except requests.exceptions.ProxyError as e:
# If we get a second ProxyError, then make sure it gets raised
# the same way all other Exceptions from the Task object do.
# This will move the Exception to the main thread.
task.exception(e, sys.exc_info()[2])
# Run the given task in the TaskManager passed to shade. It turns
# out that this provider manager is the TaskManager we pass, so
# this is a way of running each cloud operation in its own thread
task.run(self._client)
def resetClient(self):
self._client = self._getClient()
@ -365,13 +164,6 @@ class ProviderManager(TaskManager):
flavors.sort(lambda a, b: cmp(a['ram'], b['ram']))
return flavors
def hasExtension(self, extension):
# Note: this will throw an error if the provider is offline
# but all the callers are in threads so the mainloop won't be affected.
if extension in self._extensions:
return True
return False
def findFlavor(self, min_ram, name_filter=None):
# Note: this will throw an error if the provider is offline
# but all the callers are in threads (they call in via CreateServer) so
@ -385,33 +177,43 @@ class ProviderManager(TaskManager):
def findImage(self, name):
if name in self._images:
return self._images[name]
image = self.submitTask(FindImageTask(name=name))
with shade_inner_exceptions():
image = self._client.get_image(name)
self._images[name] = image
return image
def findNetwork(self, name):
if name in self._networks:
return self._networks[name]
network = self.submitTask(FindNetworkTask(name=name))
with shade_inner_exceptions():
network = self._client.get_network(name)
self._networks[name] = network
return network
def deleteImage(self, name):
if name in self._images:
del self._images[name]
return self.submitTask(DeleteImageTask(image=name))
with shade_inner_exceptions():
return self._client.delete_image(name)
def addKeypair(self, name):
key = paramiko.RSAKey.generate(2048)
public_key = key.get_name() + ' ' + key.get_base64()
self.submitTask(AddKeypairTask(name=name, public_key=public_key))
with shade_inner_exceptions():
self._client.create_keypair(name=name, public_key=public_key)
return key
def listKeypairs(self):
return self.submitTask(ListKeypairsTask())
with shade_inner_exceptions():
keypairs = self._client.list_keypairs()
return keypairs
def deleteKeypair(self, name):
return self.submitTask(DeleteKeypairTask(name=name))
with shade_inner_exceptions():
return self._client.delete_keypair(name=name)
def createServer(self, name, min_ram, image_id=None, image_name=None,
az=None, key_name=None, name_filter=None,
@ -437,7 +239,8 @@ class ProviderManager(TaskManager):
nics.append({'net-id': net_id})
else:
raise Exception("Invalid 'networks' configuration.")
create_args['nics'] = nics
if nics:
create_args['nics'] = nics
# Put provider.name and image_name in as groups so that ansible
# inventory can auto-create groups for us based on each of those
# qualities
@ -458,232 +261,120 @@ class ProviderManager(TaskManager):
nodepool=json.dumps(nodepool_meta)
)
return self.submitTask(CreateServerTask(**create_args))
with shade_inner_exceptions():
return self._client.create_server(wait=False, **create_args)
def getServer(self, server_id):
return self.submitTask(GetServerTask(server_id=server_id,
_nodepool_provider=self.provider))
with shade_inner_exceptions():
return self._client.get_server(server_id)
def getFloatingIP(self, ip_id):
return self.submitTask(GetFloatingIPTask(ip_id=ip_id))
def getServerFromList(self, server_id):
for s in self.listServers():
if s['id'] == server_id:
return s
raise NotFound()
def _waitForResource(self, resource_type, resource_id, timeout):
last_status = None
if resource_type == 'server':
exc = ServerCreateException
elif resource_type == 'image':
exc = ImageCreateException
for count in iterate_timeout(timeout, exc,
"%s creation" % resource_type):
try:
if resource_type == 'server':
resource = self.getServerFromList(resource_id)
elif resource_type == 'image':
resource = self.getImage(resource_id)
except NotFound:
continue
except ManagerStoppedException:
raise
except Exception:
self.log.exception('Unable to list %ss while waiting for '
'%s will retry' % (resource_type,
resource_id))
continue
status = resource.get('status')
if (last_status != status):
self.log.debug(
'Status of {type} in {provider} {id}: {status}'.format(
type=resource_type,
provider=self.provider.name,
id=resource_id,
status=status))
if status == 'ERROR' and 'fault' in resource:
self.log.debug(
'ERROR in {provider} on {id}: {resason}'.format(
provider=self.provider.name,
id=resource_id,
resason=resource['fault']['message']))
last_status = status
if status in ['ACTIVE', 'ERROR']:
return resource
def waitForServer(self, server_id, timeout=3600):
return self._waitForResource('server', server_id, timeout)
def waitForServer(self, server, timeout=3600):
with shade_inner_exceptions():
return self._client.wait_for_server(
server=server, auto_ip=True, reuse=False,
timeout=timeout)
def waitForServerDeletion(self, server_id, timeout=600):
for count in iterate_timeout(600,
exceptions.ServerDeleteException,
"server %s deletion " % server_id):
try:
self.getServerFromList(server_id)
except NotFound:
for count in iterate_timeout(
timeout, exceptions.ServerDeleteException,
"server %s deletion" % server_id):
if not self.getServer(server_id):
return
def waitForImage(self, image_id, timeout=3600):
# TODO(mordred): This should just be handled by the Fake, but we're
# not quite plumbed through for that yet
if image_id == 'fake-glance-id':
return True
return self._waitForResource('image', image_id, timeout)
def createFloatingIP(self, pool=None):
return self.submitTask(CreateFloatingIPTask(pool=pool))
def addFloatingIP(self, server_id, address):
self.submitTask(AddFloatingIPTask(server=server_id,
address=address))
def addPublicIP(self, server_id, pool=None):
ip = self.createFloatingIP(pool)
try:
self.addFloatingIP(server_id, ip['ip'])
except novaclient.exceptions.ClientException:
# Delete the floating IP here as cleanupServer will not
# have access to the ip -> server mapping preventing it
# from removing this IP.
self.deleteFloatingIP(ip['id'])
raise
for count in iterate_timeout(600,
exceptions.IPAddTimeoutException,
"ip to be added to %s" % server_id):
last_status = None
for count in iterate_timeout(
timeout, exceptions.ImageCreateException, "image creation"):
try:
newip = self.getFloatingIP(ip['id'])
image = self.getImage(image_id)
except NotFound:
continue
except ManagerStoppedException:
raise
except Exception:
self.log.exception('Unable to get IP details for server %s, '
'will retry' % (server_id))
self.log.exception('Unable to list images while waiting for '
'%s will retry' % (image_id))
continue
if newip['instance_id'] == server_id:
return newip['ip']
def createImage(self, server_id, image_name, meta):
return self.submitTask(CreateImageTask(server=server_id,
image_name=image_name,
metadata=meta))
# shade returns None when not found
if not image:
continue
status = image['status']
if (last_status != status):
self.log.debug(
'Status of image in {provider} {id}: {status}'.format(
provider=self.provider.name,
id=image_id,
status=status))
if status == 'ERROR' and 'fault' in image:
self.log.debug(
'ERROR in {provider} on {id}: {resason}'.format(
provider=self.provider.name,
id=image_id,
resason=image['fault']['message']))
last_status = status
# Glance client returns lower case statuses - but let's be sure
if status.lower() in ['active', 'error']:
return image
def createImage(self, server, image_name, meta):
with shade_inner_exceptions():
return self._client.create_image_snapshot(
image_name, server, **meta)
def getImage(self, image_id):
return self.submitTask(GetImageTask(image=image_id))
with shade_inner_exceptions():
return self._client.get_image(image_id)
def uploadImage(self, image_name, filename, disk_format, container_format,
meta):
def uploadImage(self, image_name, filename, image_type=None, meta=None):
# configure glance and upload image. Note the meta flags
# are provided as custom glance properties
# NOTE: we have wait=True set here. This is not how we normally
# do things in nodepool, preferring to poll ourselves thankyouverymuch.
# However - two things to note:
# - glance v1 has no aysnc mechanism, so we have to handle it anyway
# - glance v2 waiting is very strange and complex - but we have to
# - PUT has no aysnc mechanism, so we have to handle it anyway
# - v2 w/task waiting is very strange and complex - but we have to
# block for our v1 clouds anyway, so we might as well
# have the interface be the same and treat faking-out
# a shade-level fake-async interface later
if not meta:
meta = {}
if image_type:
meta['disk_format'] = image_type
with shade_inner_exceptions():
image = self._client.create_image(
name=image_name,
filename='%s.%s' % (filename, disk_format),
filename=filename,
is_public=False,
disk_format=disk_format,
container_format=container_format,
wait=True,
**meta)
return image.id
def listExtensions(self):
return self.submitTask(ListExtensionsTask())
def listImages(self):
return self.submitTask(ListImagesTask())
with shade_inner_exceptions():
return self._client.list_images()
def listFlavors(self):
return self.submitTask(ListFlavorsTask())
with shade_inner_exceptions():
return self._client.list_flavors()
def listFloatingIPs(self):
if time.time() - self._ips_time >= IPS_LIST_AGE:
if self._ips_lock.acquire(False):
try:
self._ips = self.submitTask(ListFloatingIPsTask())
self._ips_time = time.time()
finally:
self._ips_lock.release()
return self._ips
def removeFloatingIP(self, server_id, address):
return self.submitTask(RemoveFloatingIPTask(server=server_id,
address=address))
def deleteFloatingIP(self, ip_id):
return self.submitTask(DeleteFloatingIPTask(ip_id=ip_id))
def listServers(self, cache=True):
if (not cache or
time.time() - self._servers_time >= SERVER_LIST_AGE):
# Since we're using cached data anyway, we don't need to
# have more than one thread actually submit the list
# servers task. Let the first one submit it while holding
# a lock, and the non-blocking acquire method will cause
# subsequent threads to just skip this and use the old
# data until it succeeds.
if self._servers_lock.acquire(False):
try:
self._servers = self.submitTask(ListServersTask(
_nodepool_provider=self.provider))
self._servers_time = time.time()
finally:
self._servers_lock.release()
return self._servers
def listServers(self):
# shade list_servers carries the nodepool server list caching logic
with shade_inner_exceptions():
return self._client.list_servers()
def deleteServer(self, server_id):
return self.submitTask(DeleteServerTask(server_id=server_id))
with shade_inner_exceptions():
return self._client.delete_server(server_id, delete_ips=True)
def cleanupServer(self, server_id):
done = False
while not done:
try:
server = self.getServerFromList(server_id)
done = True
except NotFound:
# If we have old data, that's fine, it should only
# indicate that a server exists when it doesn't; we'll
# recover from that. However, if we have no data at
# all, wait until the first server list task
# completes.
if self._servers_time == 0:
time.sleep(SERVER_LIST_AGE + 1)
else:
done = True
server = self.getServer(server_id)
if not server:
raise NotFound()
# This will either get the server or raise an exception
server = self.getServerFromList(server_id)
has_floating_ip = False
for (name, network) in server['addresses'].iteritems():
for interface_spec in network:
if interface_spec['version'] != 4:
continue
if ('OS-EXT-IPS:type' in interface_spec
and interface_spec['OS-EXT-IPS:type'] == 'floating'):
has_floating_ip = True
if has_floating_ip:
for ip in self.listFloatingIPs():
if ip['instance_id'] == server_id:
self.log.debug('Deleting floating ip for server %s' %
server_id)
self.deleteFloatingIP(ip['id'])
if (self.hasExtension('os-keypairs') and
server['key_name'] != self.provider.keypair):
for kp in self.listKeypairs():
if kp['name'] == server['key_name']:
self.log.debug('Deleting keypair for server %s' %
server_id)
self.deleteKeypair(kp['name'])
with shade_inner_exceptions():
self._client.delete_keypair(name=server['key_name'])
self.log.debug('Deleting server %s' % server_id)
self.deleteServer(server_id)

View File

@ -115,8 +115,9 @@ class TestNodepoolCMD(tests.DBTestCase):
def test_alien_list_fail(self):
def fail_list(self):
raise RuntimeError('Fake list error')
self.useFixture(fixtures.MonkeyPatch('nodepool.fakeprovider.FakeList'
'.list', fail_list))
self.useFixture(fixtures.MonkeyPatch(
'nodepool.fakeprovider.FakeOpenStackCloud.list_servers',
fail_list))
configfile = self.setup_config("node_cmd.yaml")
self.patch_argv("-c", configfile, "alien-list")
@ -125,8 +126,9 @@ class TestNodepoolCMD(tests.DBTestCase):
def test_alien_image_list_fail(self):
def fail_list(self):
raise RuntimeError('Fake list error')
self.useFixture(fixtures.MonkeyPatch('nodepool.fakeprovider.FakeList'
'.list', fail_list))
self.useFixture(fixtures.MonkeyPatch(
'nodepool.fakeprovider.FakeOpenStackCloud.list_servers',
fail_list))
configfile = self.setup_config("node_cmd.yaml")
self.patch_argv("-c", configfile, "alien-image-list")

View File

@ -25,9 +25,6 @@ from nodepool import nodedb
import nodepool.fakeprovider
import nodepool.nodepool
import requests.exceptions
from testtools import ExpectedException
class TestNodepool(tests.DBTestCase):
log = logging.getLogger("nodepool.TestNodepool")
@ -401,39 +398,6 @@ class TestNodepool(tests.DBTestCase):
self.assertEqual(len(deleted_nodes), 1)
self.assertEqual(node_id, deleted_nodes[0].id)
def test_proxy_timeout(self):
"""Test that we re-run a task after a ProxyError"""
configfile = self.setup_config('node.yaml')
pool = self.useNodepool(configfile, watermark_sleep=1)
pool.start()
self.waitForNodes(pool)
provider = pool.config.providers['fake-provider']
manager = pool.getProviderManager(provider)
def get_bad_client(manager):
return nodepool.fakeprovider.BadOpenstackCloud(
manager._client.nova_client.images)
# In order to test recovering from a ProxyError from the client
# we are going manually set the client object to be a bad client that
# always raises a ProxyError. If our client reset works correctly
# then we will create a new client object, which in this case would
# be a new fake client in place of the bad client.
manager._client = get_bad_client(manager)
# The only implemented function for the fake and bad clients
# If we don't raise an uncaught exception, we pass
manager.listExtensions()
# Now let's do it again, but let's prevent the client object from being
# replaced and then assert that we raised the exception that we expect.
manager._client = get_bad_client(manager)
manager._getClient = lambda: get_bad_client(manager)
with ExpectedException(requests.exceptions.ProxyError):
manager.listExtensions()
def test_leaked_node(self):
"""Test that a leaked node is deleted"""
configfile = self.setup_config('leaked_node.yaml')
@ -447,7 +411,7 @@ class TestNodepool(tests.DBTestCase):
# Make sure we have a node built and ready
provider = pool.config.providers['fake-provider']
manager = pool.getProviderManager(provider)
servers = manager.listServers(cache=False)
servers = manager.listServers()
self.assertEqual(len(servers), 1)
with pool.getDB().getSession() as session:
@ -475,7 +439,7 @@ class TestNodepool(tests.DBTestCase):
self.log.debug("...done waiting for replacement pool.")
# Make sure we end up with only one server (the replacement)
servers = manager.listServers(cache=False)
servers = manager.listServers()
self.assertEqual(len(servers), 1)
with pool.getDB().getSession() as session:
nodes = session.getNodes(provider_name='fake-provider',

View File

@ -10,12 +10,11 @@ statsd>=3.0
apscheduler>=3.0
sqlalchemy>=0.8.2,<1.1.0
pyzmq>=13.1.0
python-novaclient>=2.21.0
PyMySQL
PrettyTable>=0.6,<0.8
# shade has a looser requirement on six than nodepool, so install six first
six>=1.7.0
os-client-config>=1.2.0
shade>=0.12.0
shade>=1.6.2
diskimage-builder
voluptuous

View File

@ -39,3 +39,14 @@ waitforimage ubuntu-dib
waitfornode trusty-server
# check dib image was bootable
waitfornode ubuntu-dib
set -o errexit
# Show the built nodes
$NODEPOOL list
# Try to delete the nodes that were just built
$NODEPOOL delete --now 1
$NODEPOOL delete --now 2
# show the deleted nodes (and their replacements may be building)
$NODEPOOL list