manual merge

This commit is contained in:
Sandy Walsh 2011-08-05 06:40:34 -07:00
commit d81b0d84c1
65 changed files with 4806 additions and 2018 deletions

5
.gitignore vendored
View File

@ -1,2 +1,5 @@
.coverage
*,cover
cover
*.pyc
.idea
.idea

View File

@ -1,87 +0,0 @@
# Copyright 2010 Jacob Kaplan-Moss
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
novaclient module.
"""
__version__ = '2.5'
from novaclient.accounts import Account, AccountManager
from novaclient.backup_schedules import (
BackupSchedule, BackupScheduleManager,
BACKUP_WEEKLY_DISABLED, BACKUP_WEEKLY_SUNDAY, BACKUP_WEEKLY_MONDAY,
BACKUP_WEEKLY_TUESDAY, BACKUP_WEEKLY_WEDNESDAY,
BACKUP_WEEKLY_THURSDAY, BACKUP_WEEKLY_FRIDAY, BACKUP_WEEKLY_SATURDAY,
BACKUP_DAILY_DISABLED, BACKUP_DAILY_H_0000_0200,
BACKUP_DAILY_H_0200_0400, BACKUP_DAILY_H_0400_0600,
BACKUP_DAILY_H_0600_0800, BACKUP_DAILY_H_0800_1000,
BACKUP_DAILY_H_1000_1200, BACKUP_DAILY_H_1200_1400,
BACKUP_DAILY_H_1400_1600, BACKUP_DAILY_H_1600_1800,
BACKUP_DAILY_H_1800_2000, BACKUP_DAILY_H_2000_2200,
BACKUP_DAILY_H_2200_0000)
from novaclient.client import OpenStackClient
from novaclient.exceptions import (OpenStackException, BadRequest,
Unauthorized, Forbidden, NotFound, OverLimit)
from novaclient.flavors import FlavorManager, Flavor
from novaclient.images import ImageManager, Image
from novaclient.ipgroups import IPGroupManager, IPGroup
from novaclient.servers import (ServerManager, Server, REBOOT_HARD,
REBOOT_SOFT)
from novaclient.zones import Zone, ZoneManager
class OpenStack(object):
"""
Top-level object to access the OpenStack Nova API.
Create an instance with your creds::
>>> os = OpenStack(USERNAME, API_KEY, PROJECT_ID, AUTH_URL)
Then call methods on its managers::
>>> os.servers.list()
...
>>> os.flavors.list()
...
&c.
"""
def __init__(self, username, apikey, projectid,
auth_url='https://auth.api.rackspacecloud.com/v1.0', timeout=None):
self.backup_schedules = BackupScheduleManager(self)
self.client = OpenStackClient(username, apikey, projectid, auth_url,
timeout=timeout)
self.flavors = FlavorManager(self)
self.images = ImageManager(self)
self.ipgroups = IPGroupManager(self)
self.servers = ServerManager(self)
self.zones = ZoneManager(self)
self.accounts = AccountManager(self)
def authenticate(self):
"""
Authenticate against the server.
Normally this is called automatically when you first access the API,
but you can call this method to force authentication right now.
Returns on success; raises :exc:`novaclient.Unauthorized` if the
credentials are wrong.
"""
self.client.authenticate()

View File

@ -19,7 +19,8 @@
Base utilities to build API operation managers and objects on top of.
"""
from novaclient.exceptions import NotFound
from novaclient import exceptions
# Python 2.4 compat
try:
@ -101,8 +102,8 @@ class ManagerWithFind(Manager):
try:
return rl[0]
except IndexError:
raise NotFound(404, "No %s matching %s." %
(self.resource_class.__name__, kwargs))
msg = "No %s matching %s." % (self.resource_class.__name__, kwargs)
raise exceptions.NotFound(404, msg)
def findall(self, **kwargs):
"""

View File

@ -19,18 +19,19 @@ if not hasattr(urlparse, 'parse_qsl'):
import cgi
urlparse.parse_qsl = cgi.parse_qsl
import novaclient
from novaclient import exceptions
_logger = logging.getLogger(__name__)
class OpenStackClient(httplib2.Http):
class HTTPClient(httplib2.Http):
USER_AGENT = 'python-novaclient/%s' % novaclient.__version__
USER_AGENT = 'python-novaclient'
def __init__(self, user, apikey, projectid, auth_url, timeout=None):
super(OpenStackClient, self).__init__(timeout=timeout)
super(HTTPClient, self).__init__(timeout=timeout)
self.user = user
self.apikey = apikey
self.projectid = projectid
@ -68,7 +69,7 @@ class OpenStackClient(httplib2.Http):
kwargs['headers']['Content-Type'] = 'application/json'
kwargs['body'] = json.dumps(kwargs['body'])
resp, body = super(OpenStackClient, self).request(*args, **kwargs)
resp, body = super(HTTPClient, self).request(*args, **kwargs)
self.http_log(args, kwargs, resp, body)
@ -144,7 +145,7 @@ class OpenStackClient(httplib2.Http):
"""
Munge GET URLs to always return uncached content.
The OpenStack Nova API caches data *very* agressively and doesn't
The OpenStack Compute API caches data *very* agressively and doesn't
respect cache headers. To avoid stale data, then, we append a little
bit of nonsense onto GET parameters; this appears to force the data not
to be cached.

View File

@ -4,7 +4,11 @@ Exception definitions.
"""
class OpenStackException(Exception):
class CommandError(Exception):
pass
class ClientException(Exception):
"""
The base exception class for all exceptions this library raises.
"""
@ -17,7 +21,7 @@ class OpenStackException(Exception):
return "%s (HTTP %s)" % (self.message, self.code)
class BadRequest(OpenStackException):
class BadRequest(ClientException):
"""
HTTP 400 - Bad request: you sent some malformed data.
"""
@ -25,7 +29,7 @@ class BadRequest(OpenStackException):
message = "Bad request"
class Unauthorized(OpenStackException):
class Unauthorized(ClientException):
"""
HTTP 401 - Unauthorized: bad credentials.
"""
@ -33,7 +37,7 @@ class Unauthorized(OpenStackException):
message = "Unauthorized"
class Forbidden(OpenStackException):
class Forbidden(ClientException):
"""
HTTP 403 - Forbidden: your credentials don't give you access to this
resource.
@ -42,7 +46,7 @@ class Forbidden(OpenStackException):
message = "Forbidden"
class NotFound(OpenStackException):
class NotFound(ClientException):
"""
HTTP 404 - Not found
"""
@ -50,7 +54,7 @@ class NotFound(OpenStackException):
message = "Not found"
class OverLimit(OpenStackException):
class OverLimit(ClientException):
"""
HTTP 413 - Over limit: you're over the API limits for this time period.
"""
@ -59,7 +63,7 @@ class OverLimit(OpenStackException):
# NotImplemented is a python keyword.
class HTTPNotImplemented(OpenStackException):
class HTTPNotImplemented(ClientException):
"""
HTTP 501 - Not Implemented: the server does not support this operation.
"""
@ -70,7 +74,7 @@ class HTTPNotImplemented(OpenStackException):
# In Python 2.4 Exception is old-style and thus doesn't have a __subclasses__()
# so we can do this:
# _code_map = dict((c.http_status, c)
# for c in OpenStackException.__subclasses__())
# for c in ClientException.__subclasses__())
#
# Instead, we have to hardcode it:
_code_map = dict((c.http_status, c) for c in [BadRequest, Unauthorized,
@ -79,7 +83,7 @@ _code_map = dict((c.http_status, c) for c in [BadRequest, Unauthorized,
def from_response(response, body):
"""
Return an instance of an OpenStackException or subclass
Return an instance of an ClientException or subclass
based on an httplib2 response.
Usage::
@ -88,7 +92,7 @@ def from_response(response, body):
if resp.status != 200:
raise exception_from_response(resp, body)
"""
cls = _code_map.get(response.status, OpenStackException)
cls = _code_map.get(response.status, ClientException)
if body:
message = "n/a"
details = "n/a"

View File

@ -20,56 +20,25 @@ Command-line interface to the OpenStack Nova API.
"""
import argparse
import novaclient
import getpass
import httplib2
import os
import prettytable
import sys
import textwrap
import uuid
# Choices for flags.
DAY_CHOICES = [getattr(novaclient, i).lower()
for i in dir(novaclient)
if i.startswith('BACKUP_WEEKLY_')]
HOUR_CHOICES = [getattr(novaclient, i).lower()
for i in dir(novaclient)
if i.startswith('BACKUP_DAILY_')]
def pretty_choice_list(l):
return ', '.join("'%s'" % i for i in l)
# Sentinal for boot --key
AUTO_KEY = object()
# Decorator for args
def arg(*args, **kwargs):
def _decorator(func):
# Because of the sematics of decorator composition if we just append
# to the options list positional options will appear to be backwards.
func.__dict__.setdefault('arguments', []).insert(0, (args, kwargs))
return func
return _decorator
class CommandError(Exception):
pass
from novaclient import exceptions
from novaclient import utils
from novaclient.v1_0 import shell as shell_v1_0
from novaclient.v1_1 import shell as shell_v1_1
def env(e):
return os.environ.get(e, '')
class OpenStackShell(object):
class OpenStackComputeShell(object):
# Hook for the test suite to inject a fake server.
_api_class = novaclient.OpenStack
def __init__(self):
self.parser = argparse.ArgumentParser(
def get_base_parser(self):
parser = argparse.ArgumentParser(
prog='nova',
description=__doc__.strip(),
epilog='See "nova help COMMAND" '\
@ -79,44 +48,62 @@ class OpenStackShell(object):
)
# Global arguments
self.parser.add_argument('-h', '--help',
parser.add_argument('-h', '--help',
action='help',
help=argparse.SUPPRESS,
)
self.parser.add_argument('--debug',
parser.add_argument('--debug',
default=False,
action='store_true',
help=argparse.SUPPRESS)
self.parser.add_argument('--username',
parser.add_argument('--username',
default=env('NOVA_USERNAME'),
help='Defaults to env[NOVA_USERNAME].')
self.parser.add_argument('--apikey',
parser.add_argument('--apikey',
default=env('NOVA_API_KEY'),
help='Defaults to env[NOVA_API_KEY].')
self.parser.add_argument('--projectid',
parser.add_argument('--projectid',
default=env('NOVA_PROJECT_ID'),
help='Defaults to env[NOVA_PROJECT_ID].')
auth_url = env('NOVA_URL')
if auth_url == '':
auth_url = 'https://auth.api.rackspacecloud.com/v1.0'
self.parser.add_argument('--url',
default=auth_url,
parser.add_argument('--url',
default=env('NOVA_URL'),
help='Defaults to env[NOVA_URL].')
# Subcommands
subparsers = self.parser.add_subparsers(metavar='<subcommand>')
self.subcommands = {}
parser.add_argument('--version',
default=env('NOVA_VERSION'),
help='Accepts 1.0 or 1.1, defaults to env[NOVA_VERSION].')
# Everything that's do_* is a subcommand.
for attr in (a for a in dir(self) if a.startswith('do_')):
return parser
def get_subcommand_parser(self, version):
parser = self.get_base_parser()
self.subcommands = {}
subparsers = parser.add_subparsers(metavar='<subcommand>')
try:
actions_module = {
'1.0': shell_v1_0,
'1.1': shell_v1_1,
}[version]
except KeyError:
actions_module = shell_v1_0
self._find_actions(subparsers, actions_module)
self._find_actions(subparsers, self)
return parser
def _find_actions(self, subparsers, actions_module):
for attr in (a for a in dir(actions_module) if a.startswith('do_')):
# I prefer to be hypen-separated instead of underscores.
command = attr[3:].replace('_', '-')
callback = getattr(self, attr)
callback = getattr(actions_module, attr)
desc = callback.__doc__ or ''
help = desc.strip().split('\n')[0]
arguments = getattr(callback, 'arguments', [])
@ -137,18 +124,26 @@ class OpenStackShell(object):
subparser.set_defaults(func=callback)
def main(self, argv):
# Parse args and call whatever callback was selected
args = self.parser.parse_args(argv)
# Parse args once to find version
parser = self.get_base_parser()
(options, args) = parser.parse_known_args(argv)
# build available subcommands based on version
subcommand_parser = self.get_subcommand_parser(options.version)
self.parser = subcommand_parser
# Parse args again and call whatever callback was selected
args = subcommand_parser.parse_args(argv)
# Deal with global arguments
if args.debug:
httplib2.debuglevel = 1
# Short-circuit and deal with help right away.
if args.func == self.do_help:
self.do_help(args)
return 0
# Deal with global arguments
if args.debug:
httplib2.debuglevel = 1
user, apikey, projectid, url = args.username, args.apikey, \
args.projectid, args.url
@ -156,21 +151,31 @@ class OpenStackShell(object):
# for username or apikey but for compatibility it is not.
if not user:
raise CommandError("You must provide a username, either via "
raise exceptions.CommandError("You must provide a username, either via "
"--username or via env[NOVA_USERNAME]")
if not apikey:
raise CommandError("You must provide an API key, either via "
raise exceptions.CommandError("You must provide an API key, either via "
"--apikey or via env[NOVA_API_KEY]")
self.cs = self._api_class(user, apikey, projectid, url)
self.cs = self.get_api_class(options.version)(user, apikey, projectid, url)
try:
self.cs.authenticate()
except novaclient.Unauthorized:
raise CommandError("Invalid OpenStack Nova credentials.")
except exceptions.Unauthorized:
raise exceptions.CommandError("Invalid OpenStack Nova credentials.")
args.func(args)
args.func(self.cs, args)
@arg('command', metavar='<subcommand>', nargs='?',
def get_api_class(self, version):
try:
return {
"1.0": shell_v1_0.CLIENT_CLASS,
"1.1": shell_v1_1.CLIENT_CLASS,
}[version]
except KeyError:
return shell_v1_0.CLIENT_CLASS
@utils.arg('command', metavar='<subcommand>', nargs='?',
help='Display help for <subcommand>')
def do_help(self, args):
"""
@ -180,682 +185,11 @@ class OpenStackShell(object):
if args.command in self.subcommands:
self.subcommands[args.command].print_help()
else:
raise CommandError("'%s' is not a valid subcommand." %
raise exceptions.CommandError("'%s' is not a valid subcommand." %
args.command)
else:
self.parser.print_help()
@arg('server', metavar='<server>', help='Name or ID of server.')
@arg('--enable', dest='enabled', default=None, action='store_true',
help='Enable backups.')
@arg('--disable', dest='enabled', action='store_false',
help='Disable backups.')
@arg('--weekly', metavar='<day>', choices=DAY_CHOICES,
help='Schedule a weekly backup for <day> (one of: %s).' %
pretty_choice_list(DAY_CHOICES))
@arg('--daily', metavar='<time-window>', choices=HOUR_CHOICES,
help='Schedule a daily backup during <time-window> (one of: %s).' %
pretty_choice_list(HOUR_CHOICES))
def do_backup_schedule(self, args):
"""
Show or edit the backup schedule for a server.
With no flags, the backup schedule will be shown. If flags are given,
the backup schedule will be modified accordingly.
"""
server = self._find_server(args.server)
# If we have some flags, update the backup
backup = {}
if args.daily:
backup['daily'] = getattr(novaclient, 'BACKUP_DAILY_%s' %
args.daily.upper())
if args.weekly:
backup['weekly'] = getattr(novaclient, 'BACKUP_WEEKLY_%s' %
args.weekly.upper())
if args.enabled is not None:
backup['enabled'] = args.enabled
if backup:
server.backup_schedule.update(**backup)
else:
print_dict(server.backup_schedule._info)
@arg('server', metavar='<server>', help='Name or ID of server.')
def do_backup_schedule_delete(self, args):
"""
Delete the backup schedule for a server.
"""
server = self._find_server(args.server)
server.backup_schedule.delete()
def _boot(self, args, reservation_id=None, min_count=None, max_count=None):
"""Boot a new server."""
if min_count is None:
min_count = 1
if max_count is None:
max_count = min_count
if min_count > max_count:
raise CommandError("min_instances should be <= max_instances")
if not min_count or not max_count:
raise CommandError("min_instances nor max_instances should be 0")
flavor = args.flavor or self.cs.flavors.find(ram=256)
image = args.image or self.cs.images.find(name="Ubuntu 10.04 LTS "\
"(lucid)")
# Map --ipgroup <name> to an ID.
# XXX do this for flavor/image?
if args.ipgroup:
ipgroup = self._find_ipgroup(args.ipgroup)
else:
ipgroup = None
metadata = dict(v.split('=') for v in args.meta)
files = {}
for f in args.files:
dst, src = f.split('=', 1)
try:
files[dst] = open(src)
except IOError, e:
raise CommandError("Can't open '%s': %s" % (src, e))
if args.key is AUTO_KEY:
possible_keys = [os.path.join(os.path.expanduser('~'), '.ssh', k)
for k in ('id_dsa.pub', 'id_rsa.pub')]
for k in possible_keys:
if os.path.exists(k):
keyfile = k
break
else:
raise CommandError("Couldn't find a key file: tried "
"~/.ssh/id_dsa.pub or ~/.ssh/id_rsa.pub")
elif args.key:
keyfile = args.key
else:
keyfile = None
if keyfile:
try:
files['/root/.ssh/authorized_keys2'] = open(keyfile)
except IOError, e:
raise CommandError("Can't open '%s': %s" % (keyfile, e))
return (args.name, image, flavor, ipgroup, metadata, files,
reservation_id, min_count, max_count)
@arg('--flavor',
default=None,
metavar='<flavor>',
help="Flavor ID (see 'novaclient flavors'). "\
"Defaults to 256MB RAM instance.")
@arg('--image',
default=None,
metavar='<image>',
help="Image ID (see 'novaclient images'). "\
"Defaults to Ubuntu 10.04 LTS.")
@arg('--ipgroup',
default=None,
metavar='<group>',
help="IP group name or ID (see 'novaclient ipgroup-list').")
@arg('--meta',
metavar="<key=value>",
action='append',
default=[],
help="Record arbitrary key/value metadata. "\
"May be give multiple times.")
@arg('--file',
metavar="<dst-path=src-path>",
action='append',
dest='files',
default=[],
help="Store arbitrary files from <src-path> locally to <dst-path> "\
"on the new server. You may store up to 5 files.")
@arg('--key',
metavar='<path>',
nargs='?',
const=AUTO_KEY,
help="Key the server with an SSH keypair. "\
"Looks in ~/.ssh for a key, "\
"or takes an explicit <path> to one.")
@arg('name', metavar='<name>', help='Name for the new server')
def do_boot(self, args):
"""Boot a new server."""
name, image, flavor, ipgroup, metadata, files, reservation_id, \
min_count, max_count = self._boot(args)
server = self.cs.servers.create(args.name, image, flavor,
ipgroup=ipgroup,
meta=metadata,
files=files,
min_count=min_count,
max_count=max_count)
print_dict(server._info)
@arg('--flavor',
default=None,
metavar='<flavor>',
help="Flavor ID (see 'novaclient flavors'). "\
"Defaults to 256MB RAM instance.")
@arg('--image',
default=None,
metavar='<image>',
help="Image ID (see 'novaclient images'). "\
"Defaults to Ubuntu 10.04 LTS.")
@arg('--ipgroup',
default=None,
metavar='<group>',
help="IP group name or ID (see 'novaclient ipgroup-list').")
@arg('--meta',
metavar="<key=value>",
action='append',
default=[],
help="Record arbitrary key/value metadata. "\
"May be give multiple times.")
@arg('--file',
metavar="<dst-path=src-path>",
action='append',
dest='files',
default=[],
help="Store arbitrary files from <src-path> locally to <dst-path> "\
"on the new server. You may store up to 5 files.")
@arg('--key',
metavar='<path>',
nargs='?',
const=AUTO_KEY,
help="Key the server with an SSH keypair. "\
"Looks in ~/.ssh for a key, "\
"or takes an explicit <path> to one.")
@arg('account', metavar='<account>', help='Account to build this'\
' server for')
@arg('name', metavar='<name>', help='Name for the new server')
def do_boot_for_account(self, args):
"""Boot a new server in an account."""
name, image, flavor, ipgroup, metadata, files, reservation_id, \
min_count, max_count = self._boot(args)
server = self.cs.accounts.create_instance_for(args.account, args.name,
image, flavor,
ipgroup=ipgroup,
meta=metadata,
files=files)
print_dict(server._info)
@arg('--flavor',
default=None,
metavar='<flavor>',
help="Flavor ID (see 'novaclient flavors'). "\
"Defaults to 256MB RAM instance.")
@arg('--image',
default=None,
metavar='<image>',
help="Image ID (see 'novaclient images'). "\
"Defaults to Ubuntu 10.04 LTS.")
@arg('--ipgroup',
default=None,
metavar='<group>',
help="IP group name or ID (see 'novaclient ipgroup-list').")
@arg('--meta',
metavar="<key=value>",
action='append',
default=[],
help="Record arbitrary key/value metadata. "\
"May be give multiple times.")
@arg('--file',
metavar="<dst-path=src-path>",
action='append',
dest='files',
default=[],
help="Store arbitrary files from <src-path> locally to <dst-path> "\
"on the new server. You may store up to 5 files.")
@arg('--key',
metavar='<path>',
nargs='?',
const=AUTO_KEY,
help="Key the server with an SSH keypair. "\
"Looks in ~/.ssh for a key, "\
"or takes an explicit <path> to one.")
@arg('--reservation_id',
default=None,
metavar='<reservation_id>',
help="Reservation ID (a UUID). "\
"If unspecified will be generated by the server.")
@arg('--min_instances',
default=None,
type=int,
metavar='<number>',
help="The minimum number of instances to build. "\
"Defaults to 1.")
@arg('--max_instances',
default=None,
type=int,
metavar='<number>',
help="The maximum number of instances to build. "\
"Defaults to 'min_instances' setting.")
@arg('name', metavar='<name>', help='Name for the new server')
def do_zone_boot(self, args):
"""Boot a new server, potentially across Zones."""
reservation_id = args.reservation_id
min_count = args.min_instances
max_count = args.max_instances
name, image, flavor, ipgroup, metadata, \
files, reservation_id, min_count, max_count = \
self._boot(args,
reservation_id=reservation_id,
min_count=min_count,
max_count=max_count)
reservation_id = self.cs.zones.boot(args.name, image, flavor,
ipgroup=ipgroup,
meta=metadata,
files=files,
reservation_id=reservation_id,
min_count=min_count,
max_count=max_count)
print "Reservation ID=", reservation_id
def _translate_flavor_keys(self, collection):
convert = [('ram', 'memory_mb'), ('disk', 'local_gb')]
for item in collection:
keys = item.__dict__.keys()
for from_key, to_key in convert:
if from_key in keys and to_key not in keys:
setattr(item, to_key, item._info[from_key])
def do_flavor_list(self, args):
"""Print a list of available 'flavors' (sizes of servers)."""
flavors = self.cs.flavors.list()
self._translate_flavor_keys(flavors)
print_list(flavors, [
'ID',
'Name',
'Memory_MB',
'Swap',
'Local_GB',
'VCPUs',
'RXTX_Quota',
'RXTX_Cap'])
def do_image_list(self, args):
"""Print a list of available images to boot from."""
print_list(self.cs.images.list(), ['ID', 'Name', 'Status'])
@arg('server', metavar='<server>', help='Name or ID of server.')
@arg('name', metavar='<name>', help='Name of snapshot.')
def do_image_create(self, args):
"""Create a new image by taking a snapshot of a running server."""
server = self._find_server(args.server)
image = self.cs.images.create(server, args.name)
print_dict(image._info)
@arg('image', metavar='<image>', help='Name or ID of image.')
def do_image_delete(self, args):
"""
Delete an image.
It should go without saying, but you can only delete images you
created.
"""
image = self._find_image(args.image)
image.delete()
@arg('server', metavar='<server>', help='Name or ID of server.')
@arg('group', metavar='<group>', help='Name or ID of group.')
@arg('address', metavar='<address>', help='IP address to share.')
def do_ip_share(self, args):
"""Share an IP address from the given IP group onto a server."""
server = self._find_server(args.server)
group = self._find_ipgroup(args.group)
server.share_ip(group, args.address)
@arg('server', metavar='<server>', help='Name or ID of server.')
@arg('address', metavar='<address>',
help='Shared IP address to remove from the server.')
def do_ip_unshare(self, args):
"""Stop sharing an given address with a server."""
server = self._find_server(args.server)
server.unshare_ip(args.address)
def do_ipgroup_list(self, args):
"""Show IP groups."""
def pretty_server_list(ipgroup):
return ", ".join(self.cs.servers.get(id).name
for id in ipgroup.servers)
print_list(self.cs.ipgroups.list(),
fields=['ID', 'Name', 'Server List'],
formatters={'Server List': pretty_server_list})
@arg('group', metavar='<group>', help='Name or ID of group.')
def do_ipgroup_show(self, args):
"""Show details about a particular IP group."""
group = self._find_ipgroup(args.group)
print_dict(group._info)
@arg('name', metavar='<name>', help='What to name this new group.')
@arg('server', metavar='<server>', nargs='?',
help='Server (name or ID) to make a member of this new group.')
def do_ipgroup_create(self, args):
"""Create a new IP group."""
if args.server:
server = self._find_server(args.server)
else:
server = None
group = self.cs.ipgroups.create(args.name, server)
print_dict(group._info)
@arg('group', metavar='<group>', help='Name or ID of group.')
def do_ipgroup_delete(self, args):
"""Delete an IP group."""
self._find_ipgroup(args.group).delete()
@arg('--fixed_ip',
dest='fixed_ip',
metavar='<fixed_ip>',
default=None,
help='Only match against fixed IP.')
@arg('--reservation_id',
dest='reservation_id',
metavar='<reservation_id>',
default=None,
help='Only return instances that match reservation_id.')
@arg('--recurse_zones',
dest='recurse_zones',
metavar='<0|1>',
nargs='?',
type=int,
const=1,
default=0,
help='Recurse through all zones if set.')
@arg('--ip',
dest='ip',
metavar='<ip_regexp>',
default=None,
help='Search with regular expression match by IP address')
@arg('--ip6',
dest='ip6',
metavar='<ip6_regexp>',
default=None,
help='Search with regular expression match by IPv6 address')
@arg('--server_name',
dest='server_name',
metavar='<name_regexp>',
default=None,
help='Search with regular expression match by server name')
@arg('--name',
dest='display_name',
metavar='<name_regexp>',
default=None,
help='Search with regular expression match by display name')
@arg('--instance_name',
dest='name',
metavar='<name_regexp>',
default=None,
help='Search with regular expression match by instance name')
def do_list(self, args):
"""List active servers."""
recurse_zones = args.recurse_zones
search_opts = {
'reservation_id': args.reservation_id,
'fixed_ip': args.fixed_ip,
'recurse_zones': recurse_zones,
'ip': args.ip,
'ip6': args.ip6,
'name': args.name,
'server_name': args.server_name,
'display_name': args.display_name}
if recurse_zones:
to_print = ['UUID', 'Name', 'Status', 'Public IP', 'Private IP']
else:
to_print = ['ID', 'Name', 'Status', 'Public IP', 'Private IP']
print_list(self.cs.servers.list(search_opts=search_opts),
to_print)
@arg('--hard',
dest='reboot_type',
action='store_const',
const=novaclient.REBOOT_HARD,
default=novaclient.REBOOT_SOFT,
help='Perform a hard reboot (instead of a soft one).')
@arg('server', metavar='<server>', help='Name or ID of server.')
def do_reboot(self, args):
"""Reboot a server."""
self._find_server(args.server).reboot(args.reboot_type)
@arg('server', metavar='<server>', help='Name or ID of server.')
@arg('image', metavar='<image>', help="Name or ID of new image.")
def do_rebuild(self, args):
"""Shutdown, re-image, and re-boot a server."""
server = self._find_server(args.server)
image = self._find_image(args.image)
server.rebuild(image)
@arg('server', metavar='<server>', help='Name (old name) or ID of server.')
@arg('name', metavar='<name>', help='New name for the server.')
def do_rename(self, args):
"""Rename a server."""
self._find_server(args.server).update(name=args.name)
@arg('server', metavar='<server>', help='Name or ID of server.')
@arg('flavor', metavar='<flavor>', help="Name or ID of new flavor.")
def do_resize(self, args):
"""Resize a server."""
server = self._find_server(args.server)
flavor = self._find_flavor(args.flavor)
server.resize(flavor)
@arg('server', metavar='<server>', help='Name or ID of server.')
@arg('name', metavar='<name>', help='Name of snapshot.')
@arg('backup_type', metavar='<daily|weekly>', help='type of backup')
@arg('rotation', type=int, metavar='<rotation>',
help="Number of backups to retain. Used for backup image_type.")
def do_backup(self, args):
"""Resize a server."""
server = self._find_server(args.server)
server.backup(args.name, args.backup_type, args.rotation)
@arg('server', metavar='<server>', help='Name or ID of server.')
def do_migrate(self, args):
"""Migrate a server."""
self._find_server(args.server).migrate()
@arg('server', metavar='<server>', help='Name or ID of server.')
def do_pause(self, args):
"""Pause a server."""
self._find_server(args.server).pause()
@arg('server', metavar='<server>', help='Name or ID of server.')
def do_unpause(self, args):
"""Unpause a server."""
self._find_server(args.server).unpause()
@arg('server', metavar='<server>', help='Name or ID of server.')
def do_suspend(self, args):
"""Suspend a server."""
self._find_server(args.server).suspend()
@arg('server', metavar='<server>', help='Name or ID of server.')
def do_resume(self, args):
"""Resume a server."""
self._find_server(args.server).resume()
@arg('server', metavar='<server>', help='Name or ID of server.')
def do_rescue(self, args):
"""Rescue a server."""
self._find_server(args.server).rescue()
@arg('server', metavar='<server>', help='Name or ID of server.')
def do_unrescue(self, args):
"""Unrescue a server."""
self._find_server(args.server).unrescue()
@arg('server', metavar='<server>', help='Name or ID of server.')
def do_diagnostics(self, args):
"""Retrieve server diagnostics."""
print_dict(self.cs.servers.diagnostics(args.server)[1])
@arg('server', metavar='<server>', help='Name or ID of server.')
def do_actions(self, args):
"""Retrieve server actions."""
print_list(
self.cs.servers.actions(args.server),
["Created_At", "Action", "Error"])
@arg('server', metavar='<server>', help='Name or ID of server.')
def do_resize_confirm(self, args):
"""Confirm a previous resize."""
self._find_server(args.server).confirm_resize()
@arg('server', metavar='<server>', help='Name or ID of server.')
def do_resize_revert(self, args):
"""Revert a previous resize (and return to the previous VM)."""
self._find_server(args.server).revert_resize()
@arg('server', metavar='<server>', help='Name or ID of server.')
def do_root_password(self, args):
"""
Change the root password for a server.
"""
server = self._find_server(args.server)
p1 = getpass.getpass('New password: ')
p2 = getpass.getpass('Again: ')
if p1 != p2:
raise CommandError("Passwords do not match.")
server.update(password=p1)
@arg('server', metavar='<server>', help='Name or ID of server.')
def do_show(self, args):
"""Show details about the given server."""
s = self._find_server(args.server)
info = s._info.copy()
addresses = info.pop('addresses')
for addrtype in addresses:
info['%s ip' % addrtype] = ', '.join(addresses[addrtype])
flavorId = info.get('flavorId', None)
if flavorId:
info['flavor'] = self._find_flavor(info.pop('flavorId')).name
imageId = info.get('imageId', None)
if imageId:
info['image'] = self._find_image(info.pop('imageId')).name
print_dict(info)
@arg('server', metavar='<server>', help='Name or ID of server.')
def do_delete(self, args):
"""Immediately shut down and delete a server."""
self._find_server(args.server).delete()
# --zone_username is required since --username is already used.
@arg('zone', metavar='<zone_id>', help='ID of the zone', default=None)
@arg('--api_url', dest='api_url', default=None, help='New URL.')
@arg('--zone_username', dest='zone_username', default=None,
help='New zone username.')
@arg('--password', dest='password', default=None, help='New password.')
@arg('--weight_offset', dest='weight_offset', default=None,
help='Child Zone weight offset.')
@arg('--weight_scale', dest='weight_scale', default=None,
help='Child Zone weight scale.')
def do_zone(self, args):
"""Show or edit a child zone. No zone arg for this zone."""
zone = self.cs.zones.get(args.zone)
# If we have some flags, update the zone
zone_delta = {}
if args.api_url:
zone_delta['api_url'] = args.api_url
if args.zone_username:
zone_delta['username'] = args.zone_username
if args.password:
zone_delta['password'] = args.password
if args.weight_offset:
zone_delta['weight_offset'] = args.weight_offset
if args.weight_scale:
zone_delta['weight_scale'] = args.weight_scale
if zone_delta:
zone.update(**zone_delta)
else:
print_dict(zone._info)
def do_zone_info(self, args):
"""Get this zones name and capabilities."""
zone = self.cs.zones.info()
print_dict(zone._info)
@arg('api_url', metavar='<api_url>', help="URL for the Zone's API")
@arg('zone_username', metavar='<zone_username>',
help='Authentication username.')
@arg('password', metavar='<password>', help='Authentication password.')
@arg('weight_offset', metavar='<weight_offset>',
help='Child Zone weight offset (typically 0.0).')
@arg('weight_scale', metavar='<weight_scale>',
help='Child Zone weight scale (typically 1.0).')
def do_zone_add(self, args):
"""Add a new child zone."""
zone = self.cs.zones.create(args.api_url, args.zone_username,
args.password, args.weight_offset,
args.weight_scale)
print_dict(zone._info)
@arg('zone', metavar='<zone>', help='Name or ID of the zone')
def do_zone_delete(self, args):
"""Delete a zone."""
self.cs.zones.delete(args.zone)
def do_zone_list(self, args):
"""List the children of a zone."""
print_list(self.cs.zones.list(), ['ID', 'Name', 'Is Active', \
'API URL', 'Weight Offset', 'Weight Scale'])
@arg('server', metavar='<server>', help='Name or ID of server.')
@arg('network_id', metavar='<network_id>', help='Network ID.')
def do_add_fixed_ip(self, args):
"""Add new IP address to network."""
server = self._find_server(args.server)
server.add_fixed_ip(args.network_id)
@arg('server', metavar='<server>', help='Name or ID of server.')
@arg('address', metavar='<address>', help='IP Address.')
def do_remove_fixed_ip(self, args):
"""Remove an IP address from a server."""
server = self._find_server(args.server)
server.remove_fixed_ip(args.address)
def _find_server(self, server):
"""Get a server by name or ID."""
return self._find_resource(self.cs.servers, server)
def _find_ipgroup(self, group):
"""Get an IP group by name or ID."""
return self._find_resource(self.cs.ipgroups, group)
def _find_image(self, image):
"""Get an image by name or ID."""
return self._find_resource(self.cs.images, image)
def _find_flavor(self, flavor):
"""Get a flavor by name, ID, or RAM size."""
try:
return self._find_resource(self.cs.flavors, flavor)
except novaclient.NotFound:
return self.cs.flavors.find(ram=flavor)
def _find_resource(self, manager, name_or_id):
"""Helper for the _find_* methods."""
try:
if isinstance(name_or_id, int) or name_or_id.isdigit():
return manager.get(int(name_or_id))
try:
uuid.UUID(name_or_id)
return manager.get(name_or_id)
except ValueError:
return manager.find(name=name_or_id)
except novaclient.NotFound:
raise CommandError("No %s with a name or ID of '%s' exists." %
(manager.resource_class.__name__.lower(), name_or_id))
# I'm picky about my shell help.
class OpenStackHelpFormatter(argparse.HelpFormatter):
@ -865,35 +199,9 @@ class OpenStackHelpFormatter(argparse.HelpFormatter):
super(OpenStackHelpFormatter, self).start_section(heading)
# Helpers
def print_list(objs, fields, formatters={}):
pt = prettytable.PrettyTable([f for f in fields], caching=False)
pt.aligns = ['l' for f in fields]
for o in objs:
row = []
for field in fields:
if field in formatters:
row.append(formatters[field](o))
else:
field_name = field.lower().replace(' ', '_')
data = getattr(o, field_name, '')
row.append(data)
pt.add_row(row)
pt.printt(sortby=fields[0])
def print_dict(d):
pt = prettytable.PrettyTable(['Property', 'Value'], caching=False)
pt.aligns = ['l', 'l']
[pt.add_row(list(r)) for r in d.iteritems()]
pt.printt(sortby='Property')
def main():
try:
OpenStackShell().main(sys.argv[1:])
OpenStackComputeShell().main(sys.argv[1:])
except Exception, e:
if httplib2.debuglevel == 1:

41
novaclient/utils.py Normal file
View File

@ -0,0 +1,41 @@
import prettytable
# Decorator for cli-args
def arg(*args, **kwargs):
def _decorator(func):
# Because of the sematics of decorator composition if we just append
# to the options list positional options will appear to be backwards.
func.__dict__.setdefault('arguments', []).insert(0, (args, kwargs))
return func
return _decorator
def pretty_choice_list(l):
return ', '.join("'%s'" % i for i in l)
def print_list(objs, fields, formatters={}):
pt = prettytable.PrettyTable([f for f in fields], caching=False)
pt.aligns = ['l' for f in fields]
for o in objs:
row = []
for field in fields:
if field in formatters:
row.append(formatters[field](o))
else:
field_name = field.lower().replace(' ', '_')
data = getattr(o, field_name, '')
row.append(data)
pt.add_row(row)
pt.printt(sortby=fields[0])
def print_dict(d):
pt = prettytable.PrettyTable(['Property', 'Value'], caching=False)
pt.aligns = ['l', 'l']
[pt.add_row(list(r)) for r in d.iteritems()]
pt.printt(sortby='Property')

View File

@ -0,0 +1 @@
from novaclient.v1_0.client import Client

View File

@ -1,11 +1,14 @@
from novaclient import base
from novaclient.v1_0 import base as local_base
class Account(base.Resource):
pass
class AccountManager(base.BootingManagerWithFind):
class AccountManager(local_base.BootingManagerWithFind):
resource_class = Account
def create_instance_for(self, account_id, name, image, flavor,

View File

@ -5,6 +5,7 @@ Backup Schedule interface.
from novaclient import base
BACKUP_WEEKLY_DISABLED = 'DISABLED'
BACKUP_WEEKLY_SUNDAY = 'SUNDAY'
BACKUP_WEEKLY_MONDAY = 'MONDAY'

100
novaclient/v1_0/base.py Normal file
View File

@ -0,0 +1,100 @@
# Copyright 2010 Jacob Kaplan-Moss
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Base utilities to build API operation managers and objects on top of.
"""
from novaclient import base
from novaclient import exceptions
# Python 2.4 compat
try:
all
except NameError:
def all(iterable):
return True not in (not x for x in iterable)
class BootingManagerWithFind(base.ManagerWithFind):
"""Like a `ManagerWithFind`, but has the ability to boot servers."""
def _boot(self, resource_url, response_key, name, image, flavor,
ipgroup=None, meta=None, files=None, zone_blob=None,
reservation_id=None, return_raw=False, min_count=None,
max_count=None):
"""
Create (boot) a new server.
:param name: Something to name the server.
:param image: The :class:`Image` to boot with.
:param flavor: The :class:`Flavor` to boot onto.
:param ipgroup: An initial :class:`IPGroup` for this server.
:param meta: A dict of arbitrary key/value metadata to store for this
server. A maximum of five entries is allowed, and both
keys and values must be 255 characters or less.
:param files: A dict of files to overrwrite on the server upon boot.
Keys are file names (i.e. ``/etc/passwd``) and values
are the file contents (either as a string or as a
file-like object). A maximum of five entries is allowed,
and each file must be 10k or less.
:param zone_blob: a single (encrypted) string which is used internally
by Nova for routing between Zones. Users cannot populate
this field.
:param reservation_id: a UUID for the set of servers being requested.
:param return_raw: If True, don't try to coearse the result into
a Resource object.
"""
body = {"server": {
"name": name,
"imageId": base.getid(image),
"flavorId": base.getid(flavor),
}}
if ipgroup:
body["server"]["sharedIpGroupId"] = base.getid(ipgroup)
if meta:
body["server"]["metadata"] = meta
if reservation_id:
body["server"]["reservation_id"] = reservation_id
if zone_blob:
body["server"]["zone_blob"] = zone_blob
if not min_count:
min_count = 1
if not max_count:
max_count = min_count
body["server"]["min_count"] = min_count
body["server"]["max_count"] = max_count
# Files are a slight bit tricky. They're passed in a "personality"
# list to the POST. Each item is a dict giving a file name and the
# base64-encoded contents of the file. We want to allow passing
# either an open file *or* some contents as files here.
if files:
personality = body['server']['personality'] = []
for filepath, file_or_string in files.items():
if hasattr(file_or_string, 'read'):
data = file_or_string.read()
else:
data = file_or_string
personality.append({
'path': filepath,
'contents': data.encode('base64'),
})
return self._create(resource_url, body, response_key,
return_raw=return_raw)

60
novaclient/v1_0/client.py Normal file
View File

@ -0,0 +1,60 @@
from novaclient import client
from novaclient.v1_0 import accounts
from novaclient.v1_0 import backup_schedules
from novaclient.v1_0 import flavors
from novaclient.v1_0 import images
from novaclient.v1_0 import ipgroups
from novaclient.v1_0 import servers
from novaclient.v1_0 import zones
class Client(object):
"""
Top-level object to access the OpenStack Compute API.
Create an instance with your creds::
>>> client = Client(USERNAME, API_KEY, PROJECT_ID, AUTH_URL)
Then call methods on its managers::
>>> client.servers.list()
...
>>> client.flavors.list()
...
"""
def __init__(self, username, api_key, project_id, auth_url=None,
timeout=None):
self.accounts = accounts.AccountManager(self)
self.backup_schedules = backup_schedules.BackupScheduleManager(self)
self.flavors = flavors.FlavorManager(self)
self.images = images.ImageManager(self)
self.ipgroups = ipgroups.IPGroupManager(self)
self.servers = servers.ServerManager(self)
self.zones = zones.ZoneManager(self)
_auth_url = auth_url or 'https://auth.api.rackspacecloud.com/v1.0'
self.client = client.HTTPClient(username,
api_key,
project_id,
_auth_url,
timeout=timeout)
def authenticate(self):
"""
Authenticate against the server.
Normally this is called automatically when you first access the API,
but you can call this method to force authentication right now.
Returns on success; raises :exc:`exceptions.Unauthorized` if the
credentials are wrong.
"""
self.client.authenticate()

View File

@ -3,7 +3,6 @@
Flavor interface.
"""
from novaclient import base

View File

@ -20,7 +20,10 @@ Server interface.
"""
import urllib
from novaclient import base
from novaclient.v1_0 import base as local_base
REBOOT_SOFT, REBOOT_HARD = 'SOFT', 'HARD'
@ -210,7 +213,7 @@ class Server(base.Resource):
return self.addresses['private']
class ServerManager(base.BootingManagerWithFind):
class ServerManager(local_base.BootingManagerWithFind):
resource_class = Server
def get(self, server):

714
novaclient/v1_0/shell.py Normal file
View File

@ -0,0 +1,714 @@
# Copyright 2010 Jacob Kaplan-Moss
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import getpass
import os
import uuid
from novaclient import exceptions
from novaclient import utils
from novaclient.v1_0 import client
from novaclient.v1_0 import backup_schedules
from novaclient.v1_0 import servers
CLIENT_CLASS = client.Client
# Choices for flags.
DAY_CHOICES = [getattr(backup_schedules, i).lower()
for i in dir(backup_schedules)
if i.startswith('BACKUP_WEEKLY_')]
HOUR_CHOICES = [getattr(backup_schedules, i).lower()
for i in dir(backup_schedules)
if i.startswith('BACKUP_DAILY_')]
# Sentinal for boot --key
AUTO_KEY = object()
@utils.arg('server', metavar='<server>', help='Name or ID of server.')
@utils.arg('--enable', dest='enabled', default=None, action='store_true',
help='Enable backups.')
@utils.arg('--disable', dest='enabled', action='store_false',
help='Disable backups.')
@utils.arg('--weekly', metavar='<day>', choices=DAY_CHOICES,
help='Schedule a weekly backup for <day> (one of: %s).' %
utils.pretty_choice_list(DAY_CHOICES))
@utils.arg('--daily', metavar='<time-window>', choices=HOUR_CHOICES,
help='Schedule a daily backup during <time-window> (one of: %s).' %
utils.pretty_choice_list(HOUR_CHOICES))
def do_backup_schedule(cs, args):
"""
Show or edit the backup schedule for a server.
With no flags, the backup schedule will be shown. If flags are given,
the backup schedule will be modified accordingly.
"""
server = _find_server(cs, args.server)
# If we have some flags, update the backup
backup = {}
if args.daily:
backup['daily'] = getattr(backup_schedules, 'BACKUP_DAILY_%s' %
args.daily.upper())
if args.weekly:
backup['weekly'] = getattr(backup_schedules, 'BACKUP_WEEKLY_%s' %
args.weekly.upper())
if args.enabled is not None:
backup['enabled'] = args.enabled
if backup:
server.backup_schedule.update(**backup)
else:
utils.print_dict(server.backup_schedule._info)
@utils.arg('server', metavar='<server>', help='Name or ID of server.')
def do_backup_schedule_delete(cs, args):
"""
Delete the backup schedule for a server.
"""
server = _find_server(cs, args.server)
server.backup_schedule.delete()
def _boot(cs, args, reservation_id=None, min_count=None, max_count=None):
"""Boot a new server."""
if min_count is None:
min_count = 1
if max_count is None:
max_count = min_count
if min_count > max_count:
raise exceptions.CommandError("min_instances should be <= max_instances")
if not min_count or not max_count:
raise exceptions.CommandError("min_instances nor max_instances should be 0")
flavor = args.flavor or cs.flavors.find(ram=256)
image = args.image or cs.images.find(name="Ubuntu 10.04 LTS "\
"(lucid)")
# Map --ipgroup <name> to an ID.
# XXX do this for flavor/image?
if args.ipgroup:
ipgroup = _find_ipgroup(cs, args.ipgroup)
else:
ipgroup = None
metadata = dict(v.split('=') for v in args.meta)
files = {}
for f in args.files:
dst, src = f.split('=', 1)
try:
files[dst] = open(src)
except IOError, e:
raise exceptions.CommandError("Can't open '%s': %s" % (src, e))
if args.key is AUTO_KEY:
possible_keys = [os.path.join(os.path.expanduser('~'), '.ssh', k)
for k in ('id_dsa.pub', 'id_rsa.pub')]
for k in possible_keys:
if os.path.exists(k):
keyfile = k
break
else:
raise exceptions.CommandError("Couldn't find a key file: tried "
"~/.ssh/id_dsa.pub or ~/.ssh/id_rsa.pub")
elif args.key:
keyfile = args.key
else:
keyfile = None
if keyfile:
try:
files['/root/.ssh/authorized_keys2'] = open(keyfile)
except IOError, e:
raise exceptions.CommandError("Can't open '%s': %s" % (keyfile, e))
return (args.name, image, flavor, ipgroup, metadata, files,
reservation_id, min_count, max_count)
@utils.arg('--flavor',
default=None,
metavar='<flavor>',
help="Flavor ID (see 'nova flavors'). "\
"Defaults to 256MB RAM instance.")
@utils.arg('--image',
default=None,
metavar='<image>',
help="Image ID (see 'nova images'). "\
"Defaults to Ubuntu 10.04 LTS.")
@utils.arg('--ipgroup',
default=None,
metavar='<group>',
help="IP group name or ID (see 'nova ipgroup-list').")
@utils.arg('--meta',
metavar="<key=value>",
action='append',
default=[],
help="Record arbitrary key/value metadata. "\
"May be give multiple times.")
@utils.arg('--file',
metavar="<dst-path=src-path>",
action='append',
dest='files',
default=[],
help="Store arbitrary files from <src-path> locally to <dst-path> "\
"on the new server. You may store up to 5 files.")
@utils.arg('--key',
metavar='<path>',
nargs='?',
const=AUTO_KEY,
help="Key the server with an SSH keypair. "\
"Looks in ~/.ssh for a key, "\
"or takes an explicit <path> to one.")
@utils.arg('name', metavar='<name>', help='Name for the new server')
def do_boot(cs, args):
"""Boot a new server."""
name, image, flavor, ipgroup, metadata, files, reservation_id, \
min_count, max_count = _boot(cs, args)
server = cs.servers.create(args.name, image, flavor,
ipgroup=ipgroup,
meta=metadata,
files=files,
min_count=min_count,
max_count=max_count)
utils.print_dict(server._info)
@utils.arg('--flavor',
default=None,
metavar='<flavor>',
help="Flavor ID (see 'nova flavors'). "\
"Defaults to 256MB RAM instance.")
@utils.arg('--image',
default=None,
metavar='<image>',
help="Image ID (see 'nova images'). "\
"Defaults to Ubuntu 10.04 LTS.")
@utils.arg('--ipgroup',
default=None,
metavar='<group>',
help="IP group name or ID (see 'nova ipgroup-list').")
@utils.arg('--meta',
metavar="<key=value>",
action='append',
default=[],
help="Record arbitrary key/value metadata. "\
"May be give multiple times.")
@utils.arg('--file',
metavar="<dst-path=src-path>",
action='append',
dest='files',
default=[],
help="Store arbitrary files from <src-path> locally to <dst-path> "\
"on the new server. You may store up to 5 files.")
@utils.arg('--key',
metavar='<path>',
nargs='?',
const=AUTO_KEY,
help="Key the server with an SSH keypair. "\
"Looks in ~/.ssh for a key, "\
"or takes an explicit <path> to one.")
@utils.arg('account', metavar='<account>', help='Account to build this'\
' server for')
@utils.arg('name', metavar='<name>', help='Name for the new server')
def do_boot_for_account(cs, args):
"""Boot a new server in an account."""
name, image, flavor, ipgroup, metadata, files, reservation_id, \
min_count, max_count = _boot(cs, args)
server = cs.accounts.create_instance_for(args.account, args.name,
image, flavor,
ipgroup=ipgroup,
meta=metadata,
files=files)
utils.print_dict(server._info)
@utils.arg('--flavor',
default=None,
metavar='<flavor>',
help="Flavor ID (see 'nova flavors'). "\
"Defaults to 256MB RAM instance.")
@utils.arg('--image',
default=None,
metavar='<image>',
help="Image ID (see 'nova images'). "\
"Defaults to Ubuntu 10.04 LTS.")
@utils.arg('--ipgroup',
default=None,
metavar='<group>',
help="IP group name or ID (see 'nova ipgroup-list').")
@utils.arg('--meta',
metavar="<key=value>",
action='append',
default=[],
help="Record arbitrary key/value metadata. "\
"May be give multiple times.")
@utils.arg('--file',
metavar="<dst-path=src-path>",
action='append',
dest='files',
default=[],
help="Store arbitrary files from <src-path> locally to <dst-path> "\
"on the new server. You may store up to 5 files.")
@utils.arg('--key',
metavar='<path>',
nargs='?',
const=AUTO_KEY,
help="Key the server with an SSH keypair. "\
"Looks in ~/.ssh for a key, "\
"or takes an explicit <path> to one.")
@utils.arg('--reservation_id',
default=None,
metavar='<reservation_id>',
help="Reservation ID (a UUID). "\
"If unspecified will be generated by the server.")
@utils.arg('--min_instances',
default=None,
type=int,
metavar='<number>',
help="The minimum number of instances to build. "\
"Defaults to 1.")
@utils.arg('--max_instances',
default=None,
type=int,
metavar='<number>',
help="The maximum number of instances to build. "\
"Defaults to 'min_instances' setting.")
@utils.arg('name', metavar='<name>', help='Name for the new server')
def do_zone_boot(cs, args):
"""Boot a new server, potentially across Zones."""
reservation_id = args.reservation_id
min_count = args.min_instances
max_count = args.max_instances
name, image, flavor, ipgroup, metadata, \
files, reservation_id, min_count, max_count = \
_boot(cs, args,
reservation_id=reservation_id,
min_count=min_count,
max_count=max_count)
reservation_id = cs.zones.boot(args.name, image, flavor,
ipgroup=ipgroup,
meta=metadata,
files=files,
reservation_id=reservation_id,
min_count=min_count,
max_count=max_count)
print "Reservation ID=", reservation_id
def _translate_flavor_keys(collection):
convert = [('ram', 'memory_mb'), ('disk', 'local_gb')]
for item in collection:
keys = item.__dict__.keys()
for from_key, to_key in convert:
if from_key in keys and to_key not in keys:
setattr(item, to_key, item._info[from_key])
def do_flavor_list(cs, args):
"""Print a list of available 'flavors' (sizes of servers)."""
flavors = cs.flavors.list()
_translate_flavor_keys(flavors)
utils.print_list(flavors, [
'ID',
'Name',
'Memory_MB',
'Swap',
'Local_GB',
'VCPUs',
'RXTX_Quota',
'RXTX_Cap'])
def do_image_list(cs, args):
"""Print a list of available images to boot from."""
utils.print_list(cs.images.list(), ['ID', 'Name', 'Status'])
@utils.arg('server', metavar='<server>', help='Name or ID of server.')
@utils.arg('name', metavar='<name>', help='Name of snapshot.')
def do_image_create(cs, args):
"""Create a new image by taking a snapshot of a running server."""
server = _find_server(cs, args.server)
image = cs.images.create(server, args.name)
utils.print_dict(image._info)
@utils.arg('image', metavar='<image>', help='Name or ID of image.')
def do_image_delete(cs, args):
"""
Delete an image.
It should go without saying, but you can only delete images you
created.
"""
image = _find_image(cs, args.image)
image.delete()
@utils.arg('server', metavar='<server>', help='Name or ID of server.')
@utils.arg('group', metavar='<group>', help='Name or ID of group.')
@utils.arg('address', metavar='<address>', help='IP address to share.')
def do_ip_share(cs, args):
"""Share an IP address from the given IP group onto a server."""
server = _find_server(cs, args.server)
group = _find_ipgroup(cs, args.group)
server.share_ip(group, args.address)
@utils.arg('server', metavar='<server>', help='Name or ID of server.')
@utils.arg('address', metavar='<address>',
help='Shared IP address to remove from the server.')
def do_ip_unshare(cs, args):
"""Stop sharing an given address with a server."""
server = _find_server(cs, args.server)
server.unshare_ip(args.address)
def do_ipgroup_list(cs, args):
"""Show IP groups."""
def pretty_server_list(ipgroup):
return ", ".join(cs.servers.get(id).name
for id in ipgroup.servers)
utils.print_list(cs.ipgroups.list(),
fields=['ID', 'Name', 'Server List'],
formatters={'Server List': pretty_server_list})
@utils.arg('group', metavar='<group>', help='Name or ID of group.')
def do_ipgroup_show(cs, args):
"""Show details about a particular IP group."""
group = _find_ipgroup(cs, args.group)
utils.print_dict(group._info)
@utils.arg('name', metavar='<name>', help='What to name this new group.')
@utils.arg('server', metavar='<server>', nargs='?',
help='Server (name or ID) to make a member of this new group.')
def do_ipgroup_create(cs, args):
"""Create a new IP group."""
if args.server:
server = _find_server(cs, args.server)
else:
server = None
group = cs.ipgroups.create(args.name, server)
utils.print_dict(group._info)
@utils.arg('group', metavar='<group>', help='Name or ID of group.')
def do_ipgroup_delete(cs, args):
"""Delete an IP group."""
_find_ipgroup(cs, args.group).delete()
@utils.arg('--fixed_ip',
dest='fixed_ip',
metavar='<fixed_ip>',
default=None,
help='Only match against fixed IP.')
@utils.arg('--reservation_id',
dest='reservation_id',
metavar='<reservation_id>',
default=None,
help='Only return instances that match reservation_id.')
@utils.arg('--recurse_zones',
dest='recurse_zones',
metavar='<0|1>',
nargs='?',
type=int,
const=1,
default=0,
help='Recurse through all zones if set.')
@utils.arg('--ip',
dest='ip',
metavar='<ip_regexp>',
default=None,
help='Search with regular expression match by IP address')
@utils.arg('--ip6',
dest='ip6',
metavar='<ip6_regexp>',
default=None,
help='Search with regular expression match by IPv6 address')
@utils.arg('--server_name',
dest='server_name',
metavar='<name_regexp>',
default=None,
help='Search with regular expression match by server name')
@utils.arg('--name',
dest='display_name',
metavar='<name_regexp>',
default=None,
help='Search with regular expression match by display name')
@utils.arg('--instance_name',
dest='name',
metavar='<name_regexp>',
default=None,
help='Search with regular expression match by instance name')
def do_list(cs, args):
"""List active servers."""
recurse_zones = args.recurse_zones
search_opts = {
'reservation_id': args.reservation_id,
'fixed_ip': args.fixed_ip,
'recurse_zones': recurse_zones,
'ip': args.ip,
'ip6': args.ip6,
'name': args.name,
'server_name': args.server_name,
'display_name': args.display_name}
if recurse_zones:
to_print = ['UUID', 'Name', 'Status', 'Public IP', 'Private IP']
else:
to_print = ['ID', 'Name', 'Status', 'Public IP', 'Private IP']
utils.print_list(cs.servers.list(search_opts=search_opts),
to_print)
@utils.arg('--hard',
dest='reboot_type',
action='store_const',
const=servers.REBOOT_HARD,
default=servers.REBOOT_SOFT,
help='Perform a hard reboot (instead of a soft one).')
@utils.arg('server', metavar='<server>', help='Name or ID of server.')
def do_reboot(cs, args):
"""Reboot a server."""
_find_server(cs, args.server).reboot(args.reboot_type)
@utils.arg('server', metavar='<server>', help='Name or ID of server.')
@utils.arg('image', metavar='<image>', help="Name or ID of new image.")
def do_rebuild(cs, args):
"""Shutdown, re-image, and re-boot a server."""
server = _find_server(cs, args.server)
image = _find_image(cs, args.image)
server.rebuild(image)
@utils.arg('server', metavar='<server>', help='Name (old name) or ID of server.')
@utils.arg('name', metavar='<name>', help='New name for the server.')
def do_rename(cs, args):
"""Rename a server."""
_find_server(cs, args.server).update(name=args.name)
@utils.arg('server', metavar='<server>', help='Name or ID of server.')
@utils.arg('flavor', metavar='<flavor>', help="Name or ID of new flavor.")
def do_resize(cs, args):
"""Resize a server."""
server = _find_server(cs, args.server)
flavor = _find_flavor(cs, args.flavor)
server.resize(flavor)
@utils.arg('server', metavar='<server>', help='Name or ID of server.')
@utils.arg('name', metavar='<name>', help='Name of snapshot.')
@utils.arg('backup_type', metavar='<daily|weekly>', help='type of backup')
@utils.arg('rotation', type=int, metavar='<rotation>',
help="Number of backups to retain. Used for backup image_type.")
def do_backup(cs, args):
"""Resize a server."""
server = _find_server(cs, args.server)
server.backup(args.name, args.backup_type, args.rotation)
@utils.arg('server', metavar='<server>', help='Name or ID of server.')
def do_migrate(cs, args):
"""Migrate a server."""
_find_server(cs, args.server).migrate()
@utils.arg('server', metavar='<server>', help='Name or ID of server.')
def do_pause(cs, args):
"""Pause a server."""
_find_server(cs, args.server).pause()
@utils.arg('server', metavar='<server>', help='Name or ID of server.')
def do_unpause(cs, args):
"""Unpause a server."""
_find_server(cs, args.server).unpause()
@utils.arg('server', metavar='<server>', help='Name or ID of server.')
def do_suspend(cs, args):
"""Suspend a server."""
_find_server(cs, args.server).suspend()
@utils.arg('server', metavar='<server>', help='Name or ID of server.')
def do_resume(cs, args):
"""Resume a server."""
_find_server(cs, args.server).resume()
@utils.arg('server', metavar='<server>', help='Name or ID of server.')
def do_rescue(cs, args):
"""Rescue a server."""
_find_server(cs, args.server).rescue()
@utils.arg('server', metavar='<server>', help='Name or ID of server.')
def do_unrescue(cs, args):
"""Unrescue a server."""
_find_server(cs, args.server).unrescue()
@utils.arg('server', metavar='<server>', help='Name or ID of server.')
def do_diagnostics(cs, args):
"""Retrieve server diagnostics."""
utils.print_dict(cs.servers.diagnostics(args.server)[1])
@utils.arg('server', metavar='<server>', help='Name or ID of server.')
def do_actions(cs, args):
"""Retrieve server actions."""
utils.print_list(
cs.servers.actions(args.server),
["Created_At", "Action", "Error"])
@utils.arg('server', metavar='<server>', help='Name or ID of server.')
def do_resize_confirm(cs, args):
"""Confirm a previous resize."""
_find_server(cs, args.server).confirm_resize()
@utils.arg('server', metavar='<server>', help='Name or ID of server.')
def do_resize_revert(cs, args):
"""Revert a previous resize (and return to the previous VM)."""
_find_server(cs, args.server).revert_resize()
@utils.arg('server', metavar='<server>', help='Name or ID of server.')
def do_root_password(cs, args):
"""
Change the root password for a server.
"""
server = _find_server(cs, args.server)
p1 = getpass.getpass('New password: ')
p2 = getpass.getpass('Again: ')
if p1 != p2:
raise exceptions.CommandError("Passwords do not match.")
server.update(password=p1)
@utils.arg('server', metavar='<server>', help='Name or ID of server.')
def do_show(cs, args):
"""Show details about the given server."""
s = _find_server(cs, args.server)
info = s._info.copy()
addresses = info.pop('addresses')
for addrtype in addresses:
info['%s ip' % addrtype] = ', '.join(addresses[addrtype])
flavorId = info.get('flavorId', None)
if flavorId:
info['flavor'] = _find_flavor(cs, info.pop('flavorId')).name
imageId = info.get('imageId', None)
if imageId:
info['image'] = _find_image(cs, info.pop('imageId')).name
utils.print_dict(info)
@utils.arg('server', metavar='<server>', help='Name or ID of server.')
def do_delete(cs, args):
"""Immediately shut down and delete a server."""
_find_server(cs, args.server).delete()
# --zone_username is required since --username is already used.
@utils.arg('zone', metavar='<zone_id>', help='ID of the zone', default=None)
@utils.arg('--api_url', dest='api_url', default=None, help='New URL.')
@utils.arg('--zone_username', dest='zone_username', default=None,
help='New zone username.')
@utils.arg('--password', dest='password', default=None, help='New password.')
@utils.arg('--weight_offset', dest='weight_offset', default=None,
help='Child Zone weight offset.')
@utils.arg('--weight_scale', dest='weight_scale', default=None,
help='Child Zone weight scale.')
def do_zone(cs, args):
"""Show or edit a child zone. No zone arg for this zone."""
zone = cs.zones.get(args.zone)
# If we have some flags, update the zone
zone_delta = {}
if args.api_url:
zone_delta['api_url'] = args.api_url
if args.zone_username:
zone_delta['username'] = args.zone_username
if args.password:
zone_delta['password'] = args.password
if args.weight_offset:
zone_delta['weight_offset'] = args.weight_offset
if args.weight_scale:
zone_delta['weight_scale'] = args.weight_scale
if zone_delta:
zone.update(**zone_delta)
else:
utils.print_dict(zone._info)
def do_zone_info(cs, args):
"""Get this zones name and capabilities."""
zone = cs.zones.info()
utils.print_dict(zone._info)
@utils.arg('api_url', metavar='<api_url>', help="URL for the Zone's API")
@utils.arg('zone_username', metavar='<zone_username>',
help='Authentication username.')
@utils.arg('password', metavar='<password>', help='Authentication password.')
@utils.arg('weight_offset', metavar='<weight_offset>',
help='Child Zone weight offset (typically 0.0).')
@utils.arg('weight_scale', metavar='<weight_scale>',
help='Child Zone weight scale (typically 1.0).')
def do_zone_add(cs, args):
"""Add a new child zone."""
zone = cs.zones.create(args.api_url, args.zone_username,
args.password, args.weight_offset,
args.weight_scale)
utils.print_dict(zone._info)
@utils.arg('zone', metavar='<zone>', help='Name or ID of the zone')
def do_zone_delete(cs, args):
"""Delete a zone."""
cs.zones.delete(args.zone)
def do_zone_list(cs, args):
"""List the children of a zone."""
utils.print_list(cs.zones.list(), ['ID', 'Name', 'Is Active', \
'API URL', 'Weight Offset', 'Weight Scale'])
@utils.arg('server', metavar='<server>', help='Name or ID of server.')
@utils.arg('network_id', metavar='<network_id>', help='Network ID.')
def do_add_fixed_ip(cs, args):
"""Add new IP address to network."""
server = _find_server(cs, args.server)
server.add_fixed_ip(args.network_id)
@utils.arg('server', metavar='<server>', help='Name or ID of server.')
@utils.arg('address', metavar='<address>', help='IP Address.')
def do_remove_fixed_ip(cs, args):
"""Remove an IP address from a server."""
server = _find_server(cs, args.server)
server.remove_fixed_ip(args.address)
def _find_server(cs, server):
"""Get a server by name or ID."""
return _find_resource(cs.servers, server)
def _find_ipgroup(cs, group):
"""Get an IP group by name or ID."""
return _find_resource(cs.ipgroups, group)
def _find_image(cs, image):
"""Get an image by name or ID."""
return _find_resource(cs.images, image)
def _find_flavor(cs, flavor):
"""Get a flavor by name, ID, or RAM size."""
try:
return _find_resource(cs.flavors, flavor)
except exceptions.NotFound:
return cs.flavors.find(ram=flavor)
def _find_resource(manager, name_or_id):
"""Helper for the _find_* methods."""
try:
if isinstance(name_or_id, int) or name_or_id.isdigit():
return manager.get(int(name_or_id))
try:
uuid.UUID(name_or_id)
return manager.get(name_or_id)
except ValueError:
return manager.find(name=name_or_id)
except exceptions.NotFound:
raise exceptions.CommandError("No %s with a name or ID of '%s' exists." %
(manager.resource_class.__name__.lower(), name_or_id))

View File

@ -18,6 +18,7 @@ Zone interface.
"""
from novaclient import base
from novaclient.v1_0 import base as local_base
class Weighting(base.Resource):
@ -64,7 +65,7 @@ class Zone(base.Resource):
weight_offset, weight_scale)
class ZoneManager(base.BootingManagerWithFind):
class ZoneManager(local_base.BootingManagerWithFind):
resource_class = Zone
def info(self):

View File

@ -0,0 +1 @@
from novaclient.v1_1.client import Client

231
novaclient/v1_1/base.py Normal file
View File

@ -0,0 +1,231 @@
# Copyright 2010 Jacob Kaplan-Moss
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Base utilities to build API operation managers and objects on top of.
"""
from novaclient import exceptions
# Python 2.4 compat
try:
all
except NameError:
def all(iterable):
return True not in (not x for x in iterable)
def getid(obj):
"""
Abstracts the common pattern of allowing both an object or an object's ID
(UUID) as a parameter when dealing with relationships.
"""
# Try to return the object's UUID first, if we have a UUID.
try:
if obj.uuid:
return obj.uuid
except AttributeError:
pass
try:
return obj.id
except AttributeError:
return obj
class Manager(object):
"""
Managers interact with a particular type of API (servers, flavors, images,
etc.) and provide CRUD operations for them.
"""
resource_class = None
def __init__(self, api):
self.api = api
def _list(self, url, response_key, obj_class=None, body=None):
resp = None
if body:
resp, body = self.api.client.post(url, body=body)
else:
resp, body = self.api.client.get(url)
if obj_class is None:
obj_class = self.resource_class
return [obj_class(self, res)
for res in body[response_key] if res]
def _get(self, url, response_key):
resp, body = self.api.client.get(url)
return self.resource_class(self, body[response_key])
def _create(self, url, body, response_key, return_raw=False):
resp, body = self.api.client.post(url, body=body)
if return_raw:
return body[response_key]
return self.resource_class(self, body[response_key])
def _delete(self, url):
resp, body = self.api.client.delete(url)
def _update(self, url, body):
resp, body = self.api.client.put(url, body=body)
class ManagerWithFind(Manager):
"""
Like a `Manager`, but with additional `find()`/`findall()` methods.
"""
def find(self, **kwargs):
"""
Find a single item with attributes matching ``**kwargs``.
This isn't very efficient: it loads the entire list then filters on
the Python side.
"""
rl = self.findall(**kwargs)
try:
return rl[0]
except IndexError:
raise exceptions.NotFound(404, "No %s matching %s." %
(self.resource_class.__name__, kwargs))
def findall(self, **kwargs):
"""
Find all items with attributes matching ``**kwargs``.
This isn't very efficient: it loads the entire list then filters on
the Python side.
"""
found = []
searches = kwargs.items()
for obj in self.list():
try:
if all(getattr(obj, attr) == value
for (attr, value) in searches):
found.append(obj)
except AttributeError:
continue
return found
class BootingManagerWithFind(ManagerWithFind):
"""Like a `ManagerWithFind`, but has the ability to boot servers."""
def _boot(self, resource_url, response_key, name, image, flavor,
meta=None, files=None, zone_blob=None,
reservation_id=None, return_raw=False, min_count=None,
max_count=None):
"""
Create (boot) a new server.
:param name: Something to name the server.
:param image: The :class:`Image` to boot with.
:param flavor: The :class:`Flavor` to boot onto.
:param meta: A dict of arbitrary key/value metadata to store for this
server. A maximum of five entries is allowed, and both
keys and values must be 255 characters or less.
:param files: A dict of files to overrwrite on the server upon boot.
Keys are file names (i.e. ``/etc/passwd``) and values
are the file contents (either as a string or as a
file-like object). A maximum of five entries is allowed,
and each file must be 10k or less.
:param zone_blob: a single (encrypted) string which is used internally
by Nova for routing between Zones. Users cannot populate
this field.
:param reservation_id: a UUID for the set of servers being requested.
:param return_raw: If True, don't try to coearse the result into
a Resource object.
"""
body = {"server": {
"name": name,
"imageRef": getid(image),
"flavorRef": getid(flavor),
}}
if meta:
body["server"]["metadata"] = meta
if reservation_id:
body["server"]["reservation_id"] = reservation_id
if zone_blob:
body["server"]["zone_blob"] = zone_blob
if not min_count:
min_count = 1
if not max_count:
max_count = min_count
body["server"]["min_count"] = min_count
body["server"]["max_count"] = max_count
# Files are a slight bit tricky. They're passed in a "personality"
# list to the POST. Each item is a dict giving a file name and the
# base64-encoded contents of the file. We want to allow passing
# either an open file *or* some contents as files here.
if files:
personality = body['server']['personality'] = []
for filepath, file_or_string in files.items():
if hasattr(file_or_string, 'read'):
data = file_or_string.read()
else:
data = file_or_string
personality.append({
'path': filepath,
'contents': data.encode('base64'),
})
return self._create(resource_url, body, response_key,
return_raw=return_raw)
class Resource(object):
"""
A resource represents a particular instance of an object (server, flavor,
etc). This is pretty much just a bag for attributes.
"""
def __init__(self, manager, info):
self.manager = manager
self._info = info
self._add_details(info)
def _add_details(self, info):
for (k, v) in info.iteritems():
setattr(self, k, v)
def __getattr__(self, k):
self.get()
if k not in self.__dict__:
raise AttributeError(k)
else:
return self.__dict__[k]
def __repr__(self):
reprkeys = sorted(k for k in self.__dict__.keys() if k[0] != '_' and
k != 'manager')
info = ", ".join("%s=%s" % (k, getattr(self, k)) for k in reprkeys)
return "<%s %s>" % (self.__class__.__name__, info)
def get(self):
new = self.manager.get(self.id)
if new:
self._add_details(new._info)
def __eq__(self, other):
if not isinstance(other, self.__class__):
return False
if hasattr(self, 'id') and hasattr(other, 'id'):
return self.id == other.id
return self._info == other._info

50
novaclient/v1_1/client.py Normal file
View File

@ -0,0 +1,50 @@
from novaclient import client
from novaclient.v1_1 import flavors
from novaclient.v1_1 import images
from novaclient.v1_1 import servers
from novaclient.v1_1 import zones
class Client(object):
"""
Top-level object to access the OpenStack Compute API.
Create an instance with your creds::
>>> client = Client(USERNAME, API_KEY, PROJECT_ID, AUTH_URL)
Then call methods on its managers::
>>> client.servers.list()
...
>>> client.flavors.list()
...
"""
def __init__(self, username, api_key, project_id, auth_url, timeout=None):
self.flavors = flavors.FlavorManager(self)
self.images = images.ImageManager(self)
self.servers = servers.ServerManager(self)
self.zones = zones.ZoneManager(self)
self.client = client.HTTPClient(username,
api_key,
project_id,
auth_url,
timeout=timeout)
def authenticate(self):
"""
Authenticate against the server.
Normally this is called automatically when you first access the API,
but you can call this method to force authentication right now.
Returns on success; raises :exc:`exceptions.Unauthorized` if the
credentials are wrong.
"""
self.client.authenticate()

View File

@ -0,0 +1,41 @@
# Copyright 2010 Jacob Kaplan-Moss
"""
Flavor interface.
"""
from novaclient import base
class Flavor(base.Resource):
"""
A flavor is an available hardware configuration for a server.
"""
def __repr__(self):
return "<Flavor: %s>" % self.name
class FlavorManager(base.ManagerWithFind):
"""
Manage :class:`Flavor` resources.
"""
resource_class = Flavor
def list(self, detailed=True):
"""
Get a list of all flavors.
:rtype: list of :class:`Flavor`.
"""
if detailed is True:
return self._list("/flavors/detail", "flavors")
else:
return self._list("/flavors", "flavors")
def get(self, flavor):
"""
Get a specific flavor.
:param flavor: The ID of the :class:`Flavor` to get.
:rtype: :class:`Flavor`
"""
return self._get("/flavors/%s" % base.getid(flavor), "flavor")

58
novaclient/v1_1/images.py Normal file
View File

@ -0,0 +1,58 @@
# Copyright 2010 Jacob Kaplan-Moss
"""
Image interface.
"""
from novaclient import base
class Image(base.Resource):
"""
An image is a collection of files used to create or rebuild a server.
"""
def __repr__(self):
return "<Image: %s>" % self.name
def delete(self):
"""
Delete this image.
"""
return self.manager.delete(self)
class ImageManager(base.ManagerWithFind):
"""
Manage :class:`Image` resources.
"""
resource_class = Image
def get(self, image):
"""
Get an image.
:param image: The ID of the image to get.
:rtype: :class:`Image`
"""
return self._get("/images/%s" % base.getid(image), "image")
def list(self, detailed=True):
"""
Get a list of all images.
:rtype: list of :class:`Image`
"""
if detailed is True:
return self._list("/images/detail", "images")
else:
return self._list("/images", "images")
def delete(self, image):
"""
Delete an image.
It should go without saying that you can't delete an image
that you didn't create.
:param image: The :class:`Image` (or its ID) to delete.
"""
self._delete("/images/%s" % base.getid(image))

509
novaclient/v1_1/servers.py Normal file
View File

@ -0,0 +1,509 @@
# Copyright 2010 Jacob Kaplan-Moss
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Server interface.
"""
import urllib
from novaclient import base
from novaclient.v1_1 import base as local_base
REBOOT_SOFT, REBOOT_HARD = 'SOFT', 'HARD'
class Server(base.Resource):
def __repr__(self):
return "<Server: %s>" % self.name
def delete(self):
"""
Delete (i.e. shut down and delete the image) this server.
"""
self.manager.delete(self)
def update(self, name=None):
"""
Update the name or the password for this server.
:param name: Update the server's name.
:param password: Update the root password.
"""
self.manager.update(self, name=name)
def add_fixed_ip(self, network_id):
"""
Add an IP address on a network.
:param network_id: The ID of the network the IP should be on.
"""
self.manager.add_fixed_ip(self, network_id)
def pause(self):
"""
Pause -- Pause the running server.
"""
self.manager.pause(self)
def unpause(self):
"""
Unpause -- Unpause the paused server.
"""
self.manager.unpause(self)
def suspend(self):
"""
Suspend -- Suspend the running server.
"""
self.manager.suspend(self)
def resume(self):
"""
Resume -- Resume the suspended server.
"""
self.manager.resume(self)
def rescue(self):
"""
Rescue -- Rescue the problematic server.
"""
self.manager.rescue(self)
def unrescue(self):
"""
Unrescue -- Unrescue the rescued server.
"""
self.manager.unrescue(self)
def diagnostics(self):
"""Diagnostics -- Retrieve server diagnostics."""
self.manager.diagnostics(self)
def actions(self):
"""Actions -- Retrieve server actions."""
self.manager.actions(self)
def migrate(self):
"""
Migrate a server to a new host in the same zone.
"""
self.manager.migrate(self)
def remove_fixed_ip(self, address):
"""
Remove an IP address.
:param address: The IP address to remove.
"""
self.manager.remove_fixed_ip(self, address)
def change_password(self, password):
"""
Update the password for a server.
"""
self.manager.change_password(self, password)
def reboot(self, type=REBOOT_SOFT):
"""
Reboot the server.
:param type: either :data:`REBOOT_SOFT` for a software-level reboot,
or `REBOOT_HARD` for a virtual power cycle hard reboot.
"""
self.manager.reboot(self, type)
def rebuild(self, image):
"""
Rebuild -- shut down and then re-image -- this server.
:param image: the :class:`Image` (or its ID) to re-image with.
"""
self.manager.rebuild(self, image)
def resize(self, flavor):
"""
Resize the server's resources.
:param flavor: the :class:`Flavor` (or its ID) to resize to.
Until a resize event is confirmed with :meth:`confirm_resize`, the old
server will be kept around and you'll be able to roll back to the old
flavor quickly with :meth:`revert_resize`. All resizes are
automatically confirmed after 24 hours.
"""
self.manager.resize(self, flavor)
def create_image(self, image_name, metadata):
"""
Create an image based on this server.
:param image_name: The name to assign the newly create image.
:param metadata: Metadata to assign to the image.
"""
self.manager.create_image(self, image_name, metadata)
def confirm_resize(self):
"""
Confirm that the resize worked, thus removing the original server.
"""
self.manager.confirm_resize(self)
def revert_resize(self):
"""
Revert a previous resize, switching back to the old server.
"""
self.manager.revert_resize(self)
@property
def networks(self):
"""
Generate a simplified list of addresses
"""
networks = {}
try:
for network_label, address_list in self.addresses.items():
networks[network_label] = [a['addr'] for a in address_list]
return networks
except Exception:
return {}
class ServerManager(local_base.BootingManagerWithFind):
resource_class = Server
def get(self, server):
"""
Get a server.
:param server: ID of the :class:`Server` to get.
:rtype: :class:`Server`
"""
return self._get("/servers/%s" % base.getid(server), "server")
def list(self, detailed=True, search_opts=None):
"""
Get a list of servers.
Optional detailed returns details server info.
Optional reservation_id only returns instances with that
reservation_id.
:rtype: list of :class:`Server`
"""
if search_opts is None:
search_opts = {}
qparams = {}
for opt, val in search_opts.iteritems():
if val:
qparams[opt] = val
query_string = "?%s" % urllib.urlencode(qparams) if qparams else ""
detail = ""
if detailed:
detail = "/detail"
return self._list("/servers%s%s" % (detail, query_string), "servers")
def add_fixed_ip(self, server, network_id):
"""
Add an IP address on a network.
:param server: The :class:`Server` (or its ID) to add an IP to.
:param network_id: The ID of the network the IP should be on.
"""
self._action('addFixedIp', server, {'networkId': network_id})
def remove_fixed_ip(self, server, address):
"""
Remove an IP address.
:param server: The :class:`Server` (or its ID) to add an IP to.
:param address: The IP address to remove.
"""
self._action('removeFixedIp', server, {'address': address})
def pause(self, server):
"""
Pause the server.
"""
self.api.client.post('/servers/%s/pause' % base.getid(server), body={})
def unpause(self, server):
"""
Unpause the server.
"""
self.api.client.post('/servers/%s/unpause' % base.getid(server),
body={})
def suspend(self, server):
"""
Suspend the server.
"""
self.api.client.post('/servers/%s/suspend' % base.getid(server),
body={})
def resume(self, server):
"""
Resume the server.
"""
self.api.client.post('/servers/%s/resume' % base.getid(server),
body={})
def rescue(self, server):
"""
Rescue the server.
"""
self.api.client.post('/servers/%s/rescue' % base.getid(server),
body={})
def unrescue(self, server):
"""
Unrescue the server.
"""
self.api.client.post('/servers/%s/unrescue' % base.getid(server),
body={})
def diagnostics(self, server):
"""Retrieve server diagnostics."""
return self.api.client.get("/servers/%s/diagnostics" %
base.getid(server))
def actions(self, server):
"""Retrieve server actions."""
return self._list("/servers/%s/actions" % base.getid(server),
"actions")
def create(self, name, image, flavor, meta=None, files=None,
zone_blob=None, reservation_id=None, min_count=None,
max_count=None):
"""
Create (boot) a new server.
:param name: Something to name the server.
:param image: The :class:`Image` to boot with.
:param flavor: The :class:`Flavor` to boot onto.
:param meta: A dict of arbitrary key/value metadata to store for this
server. A maximum of five entries is allowed, and both
keys and values must be 255 characters or less.
:param files: A dict of files to overrwrite on the server upon boot.
Keys are file names (i.e. ``/etc/passwd``) and values
are the file contents (either as a string or as a
file-like object). A maximum of five entries is allowed,
and each file must be 10k or less.
:param zone_blob: a single (encrypted) string which is used internally
by Nova for routing between Zones. Users cannot populate
this field.
:param reservation_id: a UUID for the set of servers being requested.
"""
if not min_count:
min_count = 1
if not max_count:
max_count = min_count
if min_count > max_count:
min_count = max_count
return self._boot("/servers", "server", name, image, flavor,
meta=meta, files=files,
zone_blob=zone_blob, reservation_id=reservation_id,
min_count=min_count, max_count=max_count)
def update(self, server, name=None):
"""
Update the name or the password for a server.
:param server: The :class:`Server` (or its ID) to update.
:param name: Update the server's name.
"""
if name is None:
return
body = {
"server": {
"name": name,
},
}
self._update("/servers/%s" % base.getid(server), body)
def change_password(self, server, password):
"""
Update the password for a server.
"""
self._action("changePassword", server, {"adminPass": password})
def delete(self, server):
"""
Delete (i.e. shut down and delete the image) this server.
"""
self._delete("/servers/%s" % base.getid(server))
def reboot(self, server, type=REBOOT_SOFT):
"""
Reboot a server.
:param server: The :class:`Server` (or its ID) to share onto.
:param type: either :data:`REBOOT_SOFT` for a software-level reboot,
or `REBOOT_HARD` for a virtual power cycle hard reboot.
"""
self._action('reboot', server, {'type': type})
def rebuild(self, server, image):
"""
Rebuild -- shut down and then re-image -- a server.
:param server: The :class:`Server` (or its ID) to share onto.
:param image: the :class:`Image` (or its ID) to re-image with.
"""
self._action('rebuild', server, {'imageRef': base.getid(image)})
def migrate(self, server):
"""
Migrate a server to a new host in the same zone.
:param server: The :class:`Server` (or its ID).
"""
self._action('migrate', server)
def resize(self, server, flavor):
"""
Resize a server's resources.
:param server: The :class:`Server` (or its ID) to share onto.
:param flavor: the :class:`Flavor` (or its ID) to resize to.
Until a resize event is confirmed with :meth:`confirm_resize`, the old
server will be kept around and you'll be able to roll back to the old
flavor quickly with :meth:`revert_resize`. All resizes are
automatically confirmed after 24 hours.
"""
<<<<<<< HEAD:novaclient/servers.py
self._action('resize', server, {'flavorId': base.getid(flavor)})
def backup(self, server, image_name, backup_type, rotation):
"""
Create a server backup.
:param server: The :class:`Server` (or its ID).
:param image_name: The name to assign the newly create image.
:param backup_type: 'daily' or 'weekly'
:param rotation: number of backups of type 'backup_type' to keep
:returns Newly created :class:`Image` object
"""
if not rotation:
raise Exception("rotation is required for backups")
elif not backup_type:
raise Exception("backup_type required for backups")
elif backup_type not in ("daily", "weekly"):
raise Exception("Invalid backup_type: must be daily or weekly")
data = {
"name": image_name,
"rotation": rotation,
"backup_type": backup_type,
}
self._action('createBackup', server, data)
def pause(self, server):
"""
Pause the server.
"""
self.api.client.post('/servers/%s/pause' % base.getid(server), body={})
def unpause(self, server):
"""
Unpause the server.
"""
self.api.client.post('/servers/%s/unpause' % base.getid(server),
body={})
def suspend(self, server):
"""
Suspend the server.
"""
self.api.client.post('/servers/%s/suspend' % base.getid(server),
body={})
def resume(self, server):
"""
Resume the server.
"""
self.api.client.post('/servers/%s/resume' % base.getid(server),
body={})
def rescue(self, server):
"""
Rescue the server.
"""
self.api.client.post('/servers/%s/rescue' % base.getid(server),
body={})
def unrescue(self, server):
"""
Unrescue the server.
"""
self.api.client.post('/servers/%s/unrescue' % base.getid(server),
body={})
def diagnostics(self, server):
"""Retrieve server diagnostics."""
return self.api.client.get("/servers/%s/diagnostics" %
base.getid(server))
def actions(self, server):
"""Retrieve server actions."""
return self._list("/servers/%s/actions" % base.getid(server),
"actions")
=======
self._action('resize', server, {'flavorRef': base.getid(flavor)})
>>>>>>> blamar/v1.1-split-and-support:novaclient/v1_1/servers.py
def confirm_resize(self, server):
"""
Confirm that the resize worked, thus removing the original server.
:param server: The :class:`Server` (or its ID) to share onto.
"""
self._action('confirmResize', server)
def revert_resize(self, server):
"""
Revert a previous resize, switching back to the old server.
:param server: The :class:`Server` (or its ID) to share onto.
"""
self._action('revertResize', server)
def create_image(self, server, image_name, metadata=None):
"""
Snapshot a server.
:param server: The :class:`Server` (or its ID) to share onto.
:param image_name: Name to give the snapshot image
:param meta: Metadata to give newly-created image entity
"""
self._action('createImage', server,
{'name': image_name, 'metadata': metadata or {}})
def _action(self, action, server, info=None):
"""
Perform a server "action" -- reboot/rebuild/resize/etc.
"""
self.api.client.post('/servers/%s/action' % base.getid(server),
body={action: info})

555
novaclient/v1_1/shell.py Normal file
View File

@ -0,0 +1,555 @@
# Copyright 2010 Jacob Kaplan-Moss
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import getpass
import os
import uuid
from novaclient import exceptions
from novaclient import utils
from novaclient.v1_1 import client
from novaclient.v1_1 import servers
CLIENT_CLASS = client.Client
AUTO_KEY = object()
def _boot(cs, args, reservation_id=None, min_count=None, max_count=None):
"""Boot a new server."""
if min_count is None:
min_count = 1
if max_count is None:
max_count = min_count
if min_count > max_count:
raise exceptions.CommandError("min_instances should be <= max_instances")
if not min_count or not max_count:
raise exceptions.CommandError("min_instances nor max_instances should be 0")
flavor = args.flavor or cs.flavors.find(ram=256)
image = args.image or cs.images.find(name="Ubuntu 10.04 LTS "\
"(lucid)")
metadata = dict(v.split('=') for v in args.meta)
files = {}
for f in args.files:
dst, src = f.split('=', 1)
try:
files[dst] = open(src)
except IOError, e:
raise exceptions.CommandError("Can't open '%s': %s" % (src, e))
if args.key is AUTO_KEY:
possible_keys = [os.path.join(os.path.expanduser('~'), '.ssh', k)
for k in ('id_dsa.pub', 'id_rsa.pub')]
for k in possible_keys:
if os.path.exists(k):
keyfile = k
break
else:
raise exceptions.CommandError("Couldn't find a key file: tried "
"~/.ssh/id_dsa.pub or ~/.ssh/id_rsa.pub")
elif args.key:
keyfile = args.key
else:
keyfile = None
if keyfile:
try:
files['/root/.ssh/authorized_keys2'] = open(keyfile)
except IOError, e:
raise exceptions.CommandError("Can't open '%s': %s" % (keyfile, e))
return (args.name, image, flavor, metadata, files,
reservation_id, min_count, max_count)
@utils.arg('--flavor',
default=None,
metavar='<flavor>',
help="Flavor ID (see 'nova flavors'). "\
"Defaults to 256MB RAM instance.")
@utils.arg('--image',
default=None,
metavar='<image>',
help="Image ID (see 'nova images'). "\
"Defaults to Ubuntu 10.04 LTS.")
@utils.arg('--meta',
metavar="<key=value>",
action='append',
default=[],
help="Record arbitrary key/value metadata. "\
"May be give multiple times.")
@utils.arg('--file',
metavar="<dst-path=src-path>",
action='append',
dest='files',
default=[],
help="Store arbitrary files from <src-path> locally to <dst-path> "\
"on the new server. You may store up to 5 files.")
@utils.arg('--key',
metavar='<path>',
nargs='?',
const=AUTO_KEY,
help="Key the server with an SSH keypair. "\
"Looks in ~/.ssh for a key, "\
"or takes an explicit <path> to one.")
@utils.arg('name', metavar='<name>', help='Name for the new server')
def do_boot(cs, args):
"""Boot a new server."""
name, image, flavor, metadata, files, reservation_id, \
min_count, max_count = _boot(cs, args)
server = cs.servers.create(args.name, image, flavor,
meta=metadata,
files=files,
min_count=min_count,
max_count=max_count)
utils.print_dict(server._info)
@utils.arg('--flavor',
default=None,
metavar='<flavor>',
help="Flavor ID (see 'nova flavors'). "\
"Defaults to 256MB RAM instance.")
@utils.arg('--image',
default=None,
metavar='<image>',
help="Image ID (see 'nova images'). "\
"Defaults to Ubuntu 10.04 LTS.")
@utils.arg('--meta',
metavar="<key=value>",
action='append',
default=[],
help="Record arbitrary key/value metadata. "\
"May be give multiple times.")
@utils.arg('--file',
metavar="<dst-path=src-path>",
action='append',
dest='files',
default=[],
help="Store arbitrary files from <src-path> locally to <dst-path> "\
"on the new server. You may store up to 5 files.")
@utils.arg('--key',
metavar='<path>',
nargs='?',
const=AUTO_KEY,
help="Key the server with an SSH keypair. "\
"Looks in ~/.ssh for a key, "\
"or takes an explicit <path> to one.")
@utils.arg('--reservation_id',
default=None,
metavar='<reservation_id>',
help="Reservation ID (a UUID). "\
"If unspecified will be generated by the server.")
@utils.arg('--min_instances',
default=None,
type=int,
metavar='<number>',
help="The minimum number of instances to build. "\
"Defaults to 1.")
@utils.arg('--max_instances',
default=None,
type=int,
metavar='<number>',
help="The maximum number of instances to build. "\
"Defaults to 'min_instances' setting.")
@utils.arg('name', metavar='<name>', help='Name for the new server')
def do_zone_boot(cs, args):
"""Boot a new server, potentially across Zones."""
reservation_id = args.reservation_id
min_count = args.min_instances
max_count = args.max_instances
name, image, flavor, metadata, \
files, reservation_id, min_count, max_count = \
_boot(cs, args,
reservation_id=reservation_id,
min_count=min_count,
max_count=max_count)
reservation_id = cs.zones.boot(args.name, image, flavor,
meta=metadata,
files=files,
reservation_id=reservation_id,
min_count=min_count,
max_count=max_count)
print "Reservation ID=", reservation_id
def _translate_flavor_keys(collection):
convert = [('ram', 'memory_mb'), ('disk', 'local_gb')]
for item in collection:
keys = item.__dict__.keys()
for from_key, to_key in convert:
if from_key in keys and to_key not in keys:
setattr(item, to_key, item._info[from_key])
def do_flavor_list(cs, args):
"""Print a list of available 'flavors' (sizes of servers)."""
flavors = cs.flavors.list()
_translate_flavor_keys(flavors)
utils.print_list(flavors, [
'ID',
'Name',
'Memory_MB',
'Swap',
'Local_GB',
'VCPUs',
'RXTX_Quota',
'RXTX_Cap'])
def do_image_list(cs, args):
"""Print a list of available images to boot from."""
utils.print_list(cs.images.list(), ['ID', 'Name', 'Status'])
@utils.arg('image', metavar='<image>', help='Name or ID of image.')
def do_image_delete(cs, args):
"""
Delete an image.
It should go without saying, but you can only delete images you
created.
"""
image = _find_image(cs, args.image)
image.delete()
@utils.arg('--fixed_ip',
dest='fixed_ip',
metavar='<fixed_ip>',
default=None,
help='Only match against fixed IP.')
@utils.arg('--reservation_id',
dest='reservation_id',
metavar='<reservation_id>',
default=None,
help='Only return instances that match reservation_id.')
@utils.arg('--recurse_zones',
dest='recurse_zones',
metavar='<0|1>',
nargs='?',
type=int,
const=1,
default=0,
help='Recurse through all zones if set.')
@utils.arg('--ip',
dest='ip',
metavar='<ip_regexp>',
default=None,
help='Search with regular expression match by IP address')
@utils.arg('--ip6',
dest='ip6',
metavar='<ip6_regexp>',
default=None,
help='Search with regular expression match by IPv6 address')
@utils.arg('--server_name',
dest='server_name',
metavar='<name_regexp>',
default=None,
help='Search with regular expression match by server name')
@utils.arg('--name',
dest='display_name',
metavar='<name_regexp>',
default=None,
help='Search with regular expression match by display name')
@utils.arg('--instance_name',
dest='name',
metavar='<name_regexp>',
default=None,
help='Search with regular expression match by instance name')
def do_list(cs, args):
"""List active servers."""
recurse_zones = args.recurse_zones
search_opts = {
'reservation_id': args.reservation_id,
'fixed_ip': args.fixed_ip,
'recurse_zones': recurse_zones,
'ip': args.ip,
'ip6': args.ip6,
'name': args.name,
'server_name': args.server_name,
'display_name': args.display_name}
if recurse_zones:
id_col = 'UUID'
else:
id_col = 'ID'
columns = [id_col, 'Name', 'Status', 'Networks']
formatters = {'Networks': _format_servers_list_networks}
utils.print_list(cs.servers.list(search_opts=search_opts), columns, formatters)
def _format_servers_list_networks(server):
output = []
for (network, addresses) in server.networks.items():
if len(addresses) == 0:
continue
addresses_csv = ', '.join(addresses)
group = "%s=%s" % (network, addresses_csv)
output.append(group)
return '; '.join(output)
@utils.arg('--hard',
dest='reboot_type',
action='store_const',
const=servers.REBOOT_HARD,
default=servers.REBOOT_SOFT,
help='Perform a hard reboot (instead of a soft one).')
@utils.arg('server', metavar='<server>', help='Name or ID of server.')
def do_reboot(cs, args):
"""Reboot a server."""
_find_server(cs, args.server).reboot(args.reboot_type)
@utils.arg('server', metavar='<server>', help='Name or ID of server.')
@utils.arg('image', metavar='<image>', help="Name or ID of new image.")
def do_rebuild(cs, args):
"""Shutdown, re-image, and re-boot a server."""
server = _find_server(cs, args.server)
image = _find_image(cs, args.image)
server.rebuild(image)
@utils.arg('server', metavar='<server>', help='Name (old name) or ID of server.')
@utils.arg('name', metavar='<name>', help='New name for the server.')
def do_rename(cs, args):
"""Rename a server."""
_find_server(cs, args.server).update(name=args.name)
@utils.arg('server', metavar='<server>', help='Name or ID of server.')
@utils.arg('flavor', metavar='<flavor>', help="Name or ID of new flavor.")
def do_resize(cs, args):
"""Resize a server."""
server = _find_server(cs, args.server)
flavor = _find_flavor(cs, args.flavor)
server.resize(flavor)
@utils.arg('server', metavar='<server>', help='Name or ID of server.')
def do_resize_confirm(cs, args):
"""Confirm a previous resize."""
_find_server(cs, args.server).confirm_resize()
@utils.arg('server', metavar='<server>', help='Name or ID of server.')
def do_resize_revert(cs, args):
"""Revert a previous resize (and return to the previous VM)."""
_find_server(cs, args.server).revert_resize()
@utils.arg('server', metavar='<server>', help='Name or ID of server.')
def do_migrate(cs, args):
"""Migrate a server."""
_find_server(cs, args.server).migrate()
@utils.arg('server', metavar='<server>', help='Name or ID of server.')
def do_pause(cs, args):
"""Pause a server."""
_find_server(cs, args.server).pause()
@utils.arg('server', metavar='<server>', help='Name or ID of server.')
def do_unpause(cs, args):
"""Unpause a server."""
_find_server(cs, args.server).unpause()
@utils.arg('server', metavar='<server>', help='Name or ID of server.')
def do_suspend(cs, args):
"""Suspend a server."""
_find_server(cs, args.server).suspend()
@utils.arg('server', metavar='<server>', help='Name or ID of server.')
def do_resume(cs, args):
"""Resume a server."""
_find_server(cs, args.server).resume()
@utils.arg('server', metavar='<server>', help='Name or ID of server.')
def do_rescue(cs, args):
"""Rescue a server."""
_find_server(cs, args.server).rescue()
@utils.arg('server', metavar='<server>', help='Name or ID of server.')
def do_unrescue(cs, args):
"""Unrescue a server."""
_find_server(cs, args.server).unrescue()
@utils.arg('server', metavar='<server>', help='Name or ID of server.')
def do_diagnostics(cs, args):
"""Retrieve server diagnostics."""
utils.print_dict(cs.servers.diagnostics(args.server)[1])
@utils.arg('server', metavar='<server>', help='Name or ID of server.')
def do_actions(cs, args):
"""Retrieve server actions."""
utils.print_list(
cs.servers.actions(args.server),
["Created_At", "Action", "Error"])
@utils.arg('server', metavar='<server>', help='Name or ID of server.')
def do_root_password(cs, args):
"""
Change the root password for a server.
"""
server = _find_server(cs, args.server)
p1 = getpass.getpass('New password: ')
p2 = getpass.getpass('Again: ')
if p1 != p2:
raise exceptions.CommandError("Passwords do not match.")
server.change_password(p1)
@utils.arg('server', metavar='<server>', help='Name or ID of server.')
@utils.arg('name', metavar='<name>', help='Name of snapshot.')
def do_image_create(cs, args):
"""Create a new image by taking a snapshot of a running server."""
server = _find_server(cs, args.server)
cs.servers.create_image(server, args.name)
@utils.arg('server', metavar='<server>', help='Name or ID of server.')
def do_show(cs, args):
"""Show details about the given server."""
s = _find_server(cs, args.server)
networks = s.networks
info = s._info.copy()
for network_label, address_list in networks.items():
info['%s network' % network_label] = ', '.join(address_list)
flavor = info.get('flavor', {})
flavor_id = flavor.get('id', '')
info['flavor'] = _find_flavor(cs, flavor_id).name
image = info.get('image', {})
image_id = image.get('id', '')
info['image'] = _find_image(cs, image_id).name
info.pop('links', None)
info.pop('addresses', None)
utils.print_dict(info)
@utils.arg('server', metavar='<server>', help='Name or ID of server.')
def do_delete(cs, args):
"""Immediately shut down and delete a server."""
_find_server(cs, args.server).delete()
def _find_server(cs, server):
"""Get a server by name or ID."""
return _find_resource(cs.servers, server)
def _find_image(cs, image):
"""Get an image by name or ID."""
return _find_resource(cs.images, image)
def _find_flavor(cs, flavor):
"""Get a flavor by name, ID, or RAM size."""
try:
return _find_resource(cs.flavors, flavor)
except exceptions.NotFound:
return cs.flavors.find(ram=flavor)
def _find_resource(manager, name_or_id):
"""Helper for the _find_* methods."""
try:
if isinstance(name_or_id, int) or name_or_id.isdigit():
return manager.get(int(name_or_id))
try:
uuid.UUID(name_or_id)
return manager.get(name_or_id)
except ValueError:
return manager.find(name=name_or_id)
except exceptions.NotFound:
raise exceptions.CommandError("No %s with a name or ID of '%s' exists." %
(manager.resource_class.__name__.lower(), name_or_id))
# --zone_username is required since --username is already used.
@utils.arg('zone', metavar='<zone_id>', help='ID of the zone', default=None)
@utils.arg('--api_url', dest='api_url', default=None, help='New URL.')
@utils.arg('--zone_username', dest='zone_username', default=None,
help='New zone username.')
@utils.arg('--password', dest='password', default=None, help='New password.')
@utils.arg('--weight_offset', dest='weight_offset', default=None,
help='Child Zone weight offset.')
@utils.arg('--weight_scale', dest='weight_scale', default=None,
help='Child Zone weight scale.')
def do_zone(cs, args):
"""Show or edit a child zone. No zone arg for this zone."""
zone = cs.zones.get(args.zone)
# If we have some flags, update the zone
zone_delta = {}
if args.api_url:
zone_delta['api_url'] = args.api_url
if args.zone_username:
zone_delta['username'] = args.zone_username
if args.password:
zone_delta['password'] = args.password
if args.weight_offset:
zone_delta['weight_offset'] = args.weight_offset
if args.weight_scale:
zone_delta['weight_scale'] = args.weight_scale
if zone_delta:
zone.update(**zone_delta)
else:
utils.print_dict(zone._info)
def do_zone_info(cs, args):
"""Get this zones name and capabilities."""
zone = cs.zones.info()
utils.print_dict(zone._info)
@utils.arg('api_url', metavar='<api_url>', help="URL for the Zone's API")
@utils.arg('zone_username', metavar='<zone_username>',
help='Authentication username.')
@utils.arg('password', metavar='<password>', help='Authentication password.')
@utils.arg('weight_offset', metavar='<weight_offset>',
help='Child Zone weight offset (typically 0.0).')
@utils.arg('weight_scale', metavar='<weight_scale>',
help='Child Zone weight scale (typically 1.0).')
def do_zone_add(cs, args):
"""Add a new child zone."""
zone = cs.zones.create(args.api_url, args.zone_username,
args.password, args.weight_offset,
args.weight_scale)
utils.print_dict(zone._info)
@utils.arg('zone', metavar='<zone>', help='Name or ID of the zone')
def do_zone_delete(cs, args):
"""Delete a zone."""
cs.zones.delete(args.zone)
def do_zone_list(cs, args):
"""List the children of a zone."""
utils.print_list(cs.zones.list(), ['ID', 'Name', 'Is Active', \
'API URL', 'Weight Offset', 'Weight Scale'])
@utils.arg('server', metavar='<server>', help='Name or ID of server.')
@utils.arg('network_id', metavar='<network_id>', help='Network ID.')
def do_add_fixed_ip(cs, args):
"""Add new IP address to network."""
server = _find_server(cs, args.server)
server.add_fixed_ip(args.network_id)
@utils.arg('server', metavar='<server>', help='Name or ID of server.')
@utils.arg('address', metavar='<address>', help='IP Address.')
def do_remove_fixed_ip(cs, args):
"""Remove an IP address from a server."""
server = _find_server(cs, args.server)
server.remove_fixed_ip(args.address)

195
novaclient/v1_1/zones.py Normal file
View File

@ -0,0 +1,195 @@
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Zone interface.
"""
from novaclient.v1_1 import base
class Weighting(base.Resource):
def __init__(self, manager, info):
self.name = "n/a"
super(Weighting, self).__init__(manager, info)
def __repr__(self):
return "<Weighting: %s>" % self.name
def to_dict(self):
"""Return the original info setting, which is a dict."""
return self._info
class Zone(base.Resource):
def __init__(self, manager, info):
self.name = "n/a"
self.is_active = "n/a"
self.capabilities = "n/a"
super(Zone, self).__init__(manager, info)
def __repr__(self):
return "<Zone: %s>" % self.api_url
def delete(self):
"""
Delete a child zone.
"""
self.manager.delete(self)
def update(self, api_url=None, username=None, password=None,
weight_offset=None, weight_scale=None):
"""
Update the name for this child zone.
:param api_url: Update the child zone's API URL.
:param username: Update the child zone's username.
:param password: Update the child zone's password.
:param weight_offset: Update the child zone's weight offset.
:param weight_scale: Update the child zone's weight scale.
"""
self.manager.update(self, api_url, username, password,
weight_offset, weight_scale)
class ZoneManager(base.BootingManagerWithFind):
resource_class = Zone
def info(self):
"""
Get info on this zone.
:rtype: :class:`Zone`
"""
return self._get("/zones/info", "zone")
def get(self, zone):
"""
Get a child zone.
:param server: ID of the :class:`Zone` to get.
:rtype: :class:`Zone`
"""
return self._get("/zones/%s" % base.getid(zone), "zone")
def list(self, detailed=True):
"""
Get a list of child zones.
:rtype: list of :class:`Zone`
"""
detail = ""
if detailed:
detail = "/detail"
return self._list("/zones%s" % detail, "zones")
def create(self, api_url, username, password,
weight_offset=0.0, weight_scale=1.0):
"""
Create a new child zone.
:param api_url: The child zone's API URL.
:param username: The child zone's username.
:param password: The child zone's password.
:param weight_offset: The child zone's weight offset.
:param weight_scale: The child zone's weight scale.
"""
body = {"zone": {
"api_url": api_url,
"username": username,
"password": password,
"weight_offset": weight_offset,
"weight_scale": weight_scale
}}
return self._create("/zones", body, "zone")
def boot(self, name, image, flavor, meta=None, files=None,
zone_blob=None, reservation_id=None, min_count=None,
max_count=None):
"""
Create (boot) a new server while being aware of Zones.
:param name: Something to name the server.
:param image: The :class:`Image` to boot with.
:param flavor: The :class:`Flavor` to boot onto.
:param meta: A dict of arbitrary key/value metadata to store for this
server. A maximum of five entries is allowed, and both
keys and values must be 255 characters or less.
:param files: A dict of files to overrwrite on the server upon boot.
Keys are file names (i.e. ``/etc/passwd``) and values
are the file contents (either as a string or as a
file-like object). A maximum of five entries is allowed,
and each file must be 10k or less.
:param zone_blob: a single (encrypted) string which is used internally
by Nova for routing between Zones. Users cannot populate
this field.
:param reservation_id: a UUID for the set of servers being requested.
:param min_count: minimum number of servers to create.
:param max_count: maximum number of servers to create.
"""
if not min_count:
min_count = 1
if not max_count:
max_count = min_count
return self._boot("/zones/boot", "reservation_id", name, image, flavor,
meta=meta, files=files,
zone_blob=zone_blob, reservation_id=reservation_id,
return_raw=True, min_count=min_count,
max_count=max_count)
def select(self, *args, **kwargs):
"""
Given requirements for a new instance, select hosts
in this zone that best match those requirements.
"""
# 'specs' may be passed in as None, so change to an empty string.
specs = kwargs.get("specs") or ""
url = "/zones/select"
weighting_list = self._list(url, "weights", Weighting, body=specs)
return [wt.to_dict() for wt in weighting_list]
def delete(self, zone):
"""
Delete a child zone.
"""
self._delete("/zones/%s" % base.getid(zone))
def update(self, zone, api_url=None, username=None, password=None,
weight_offset=None, weight_scale=None):
"""
Update the name or the api_url for a zone.
:param zone: The :class:`Zone` (or its ID) to update.
:param api_url: Update the API URL.
:param username: Update the username.
:param password: Update the password.
:param weight_offset: Update the child zone's weight offset.
:param weight_scale: Update the child zone's weight scale.
"""
body = {"zone": {}}
if api_url:
body["zone"]["api_url"] = api_url
if username:
body["zone"]["username"] = username
if password:
body["zone"]["password"] = password
if weight_offset:
body["zone"]["weight_offset"] = weight_offset
if weight_scale:
body["zone"]["weight_scale"] = weight_scale
if not len(body["zone"]):
return
self._update("/zones/%s" % base.getid(zone), body)

View File

@ -11,16 +11,16 @@ if sys.version_info < (2, 6):
requirements.append('simplejson')
setup(
name="python-novaclient",
version="2.5.9",
description="Client library for OpenStack Nova API",
long_description=read('README.rst'),
url='https://github.com/rackspace/python-novaclient',
license='Apache',
author='Rackspace, based on work by Jacob Kaplan-Moss',
author_email='github@racklabs.com',
packages=find_packages(exclude=['tests']),
classifiers=[
name = "python-novaclient",
version = "2.6.0",
description = "Client library for OpenStack Nova API",
long_description = read('README.rst'),
url = 'https://github.com/rackspace/python-novaclient',
license = 'Apache',
author = 'Rackspace, based on work by Jacob Kaplan-Moss',
author_email = 'github@racklabs.com',
packages = find_packages(exclude=['tests']),
classifiers = [
'Development Status :: 5 - Production/Stable',
'Environment :: Console',
'Intended Audience :: Developers',
@ -29,12 +29,12 @@ setup(
'Operating System :: OS Independent',
'Programming Language :: Python',
],
install_requires=requirements,
install_requires = requirements,
tests_require=["nose", "mock"],
test_suite="nose.collector",
tests_require = ["nose", "mock"],
test_suite = "nose.collector",
entry_points={
entry_points = {
'console_scripts': ['nova = novaclient.shell:main']
}
)

73
tests/fakes.py Normal file
View File

@ -0,0 +1,73 @@
"""
A fake server that "responds" to API methods with pre-canned responses.
All of these responses come from the spec, so if for some reason the spec's
wrong the tests might raise AssertionError. I've indicated in comments the places where actual
behavior differs from the spec.
"""
import novaclient.client
def assert_has_keys(dict, required=[], optional=[]):
keys = dict.keys()
for k in required:
try:
assert k in keys
except AssertionError:
allowed_keys = set(required) | set(optional)
extra_keys = set(keys).difference(set(required + optional))
raise AssertionError("found unexpected keys: %s" % list(extra_keys))
class FakeClient(object):
def assert_called(self, method, url, body=None):
"""
Assert than an API method was just called.
"""
expected = (method, url)
called = self.client.callstack[-1][0:2]
assert self.client.callstack, \
"Expected %s %s but no calls were made." % expected
assert expected == called, 'Expected %s %s; got %s %s' % \
(expected + called)
if body is not None:
assert self.client.callstack[-1][2] == body
self.client.callstack = []
def assert_called_anytime(self, method, url, body=None):
"""
Assert than an API method was called anytime in the test.
"""
expected = (method, url)
assert self.client.callstack, \
"Expected %s %s but no calls were made." % expected
found = False
for entry in self.client.callstack:
called = entry[0:2]
if expected == entry[0:2]:
found = True
break
assert found, 'Expected %s %s; got %s' % \
(expected, self.client.callstack)
if body is not None:
try:
assert entry[2] == body
except AssertionError:
print entry[2]
print "!="
print body
raise
self.client.callstack = []
def authenticate(self):
pass

View File

@ -1,23 +0,0 @@
import StringIO
from nose.tools import assert_equal
from fakeserver import FakeServer
from novaclient import Account
cs = FakeServer()
def test_instance_creation_for_account():
s = cs.accounts.create_instance_for(
account_id='test_account',
name="My server",
image=1,
flavor=1,
meta={'foo': 'bar'},
ipgroup=1,
files={
'/etc/passwd': 'some data', # a file
'/tmp/foo.txt': StringIO.StringIO('data') # a stream
})
cs.assert_called('POST', '/accounts/test_account/create_instance')

View File

@ -1,69 +0,0 @@
import mock
import novaclient
import httplib2
from nose.tools import assert_raises, assert_equal
def test_authenticate_success():
cs = novaclient.OpenStack("username", "apikey", "project_id")
auth_response = httplib2.Response({
'status': 204,
'x-server-management-url':
'https://servers.api.rackspacecloud.com/v1.0/443470',
'x-auth-token': '1b751d74-de0c-46ae-84f0-915744b582d1',
})
mock_request = mock.Mock(return_value=(auth_response, None))
@mock.patch.object(httplib2.Http, "request", mock_request)
def test_auth_call():
cs.client.authenticate()
mock_request.assert_called_with(cs.client.auth_url, 'GET',
headers={
'X-Auth-User': 'username',
'X-Auth-Key': 'apikey',
'X-Auth-Project-Id': 'project_id',
'User-Agent': cs.client.USER_AGENT
})
assert_equal(cs.client.management_url,
auth_response['x-server-management-url'])
assert_equal(cs.client.auth_token, auth_response['x-auth-token'])
test_auth_call()
def test_authenticate_failure():
cs = novaclient.OpenStack("username", "apikey", "project_id")
auth_response = httplib2.Response({'status': 401})
mock_request = mock.Mock(return_value=(auth_response, None))
@mock.patch.object(httplib2.Http, "request", mock_request)
def test_auth_call():
assert_raises(novaclient.Unauthorized, cs.client.authenticate)
test_auth_call()
def test_auth_automatic():
client = novaclient.OpenStack("username", "apikey", "project_id").client
client.management_url = ''
mock_request = mock.Mock(return_value=(None, None))
@mock.patch.object(client, 'request', mock_request)
@mock.patch.object(client, 'authenticate')
def test_auth_call(m):
client.get('/')
m.assert_called()
mock_request.assert_called()
test_auth_call()
def test_auth_manual():
cs = novaclient.OpenStack("username", "apikey", "project_id")
@mock.patch.object(cs.client, 'authenticate')
def test_auth_call(m):
cs.authenticate()
m.assert_called()
test_auth_call()

View File

@ -1,58 +0,0 @@
from novaclient.backup_schedules import *
from fakeserver import FakeServer
from utils import assert_isinstance
cs = FakeServer()
def test_get_backup_schedule():
s = cs.servers.get(1234)
# access via manager
b = cs.backup_schedules.get(server=s)
assert_isinstance(b, BackupSchedule)
cs.assert_called('GET', '/servers/1234/backup_schedule')
b = cs.backup_schedules.get(server=1234)
assert_isinstance(b, BackupSchedule)
cs.assert_called('GET', '/servers/1234/backup_schedule')
# access via instance
assert_isinstance(s.backup_schedule, BackupSchedule)
cs.assert_called('GET', '/servers/1234/backup_schedule')
# Just for coverage's sake
b = s.backup_schedule.get()
cs.assert_called('GET', '/servers/1234/backup_schedule')
def test_create_update_backup_schedule():
s = cs.servers.get(1234)
# create/update via manager
cs.backup_schedules.update(
server=s,
enabled=True,
weekly=BACKUP_WEEKLY_THURSDAY,
daily=BACKUP_DAILY_H_1000_1200
)
cs.assert_called('POST', '/servers/1234/backup_schedule')
# and via instance
s.backup_schedule.update(enabled=False)
cs.assert_called('POST', '/servers/1234/backup_schedule')
def test_delete_backup_schedule():
s = cs.servers.get(1234)
# delete via manager
cs.backup_schedules.delete(s)
cs.assert_called('DELETE', '/servers/1234/backup_schedule')
cs.backup_schedules.delete(1234)
cs.assert_called('DELETE', '/servers/1234/backup_schedule')
# and via instance
s.backup_schedule.delete()
cs.assert_called('DELETE', '/servers/1234/backup_schedule')

View File

@ -1,59 +1,60 @@
import mock
import novaclient.base
from novaclient import Flavor
from novaclient.exceptions import NotFound
from novaclient.base import Resource
from nose.tools import assert_equal, assert_not_equal, assert_raises
from fakeserver import FakeServer
cs = FakeServer()
from novaclient import base
from novaclient import exceptions
from novaclient.v1_0 import flavors
from tests.v1_0 import fakes
from tests import utils
def test_resource_repr():
r = Resource(None, dict(foo="bar", baz="spam"))
assert_equal(repr(r), "<Resource baz=spam, foo=bar>")
cs = fakes.FakeClient()
def test_getid():
assert_equal(novaclient.base.getid(4), 4)
class BaseTest(utils.TestCase):
class O(object):
id = 4
assert_equal(novaclient.base.getid(O), 4)
def test_resource_repr(self):
r = base.Resource(None, dict(foo="bar", baz="spam"))
self.assertEqual(repr(r), "<Resource baz=spam, foo=bar>")
def test_getid(self):
self.assertEqual(base.getid(4), 4)
def test_resource_lazy_getattr():
f = Flavor(cs.flavors, {'id': 1})
assert_equal(f.name, '256 MB Server')
cs.assert_called('GET', '/flavors/1')
class TmpObject(object):
id = 4
self.assertEqual(base.getid(TmpObject), 4)
# Missing stuff still fails after a second get
assert_raises(AttributeError, getattr, f, 'blahblah')
cs.assert_called('GET', '/flavors/1')
def test_resource_lazy_getattr(self):
f = flavors.Flavor(cs.flavors, {'id': 1})
self.assertEqual(f.name, '256 MB Server')
cs.assert_called('GET', '/flavors/1')
# Missing stuff still fails after a second get
self.assertRaises(AttributeError, getattr, f, 'blahblah')
cs.assert_called('GET', '/flavors/1')
def test_eq():
# Two resources of the same type with the same id: equal
r1 = Resource(None, {'id': 1, 'name': 'hi'})
r2 = Resource(None, {'id': 1, 'name': 'hello'})
assert_equal(r1, r2)
def test_eq(self):
# Two resources of the same type with the same id: equal
r1 = base.Resource(None, {'id': 1, 'name': 'hi'})
r2 = base.Resource(None, {'id': 1, 'name': 'hello'})
self.assertEqual(r1, r2)
# Two resoruces of different types: never equal
r1 = Resource(None, {'id': 1})
r2 = Flavor(None, {'id': 1})
assert_not_equal(r1, r2)
# Two resoruces of different types: never equal
r1 = base.Resource(None, {'id': 1})
r2 = flavors.Flavor(None, {'id': 1})
self.assertNotEqual(r1, r2)
# Two resources with no ID: equal if their info is equal
r1 = Resource(None, {'name': 'joe', 'age': 12})
r2 = Resource(None, {'name': 'joe', 'age': 12})
assert_equal(r1, r2)
# Two resources with no ID: equal if their info is equal
r1 = base.Resource(None, {'name': 'joe', 'age': 12})
r2 = base.Resource(None, {'name': 'joe', 'age': 12})
self.assertEqual(r1, r2)
def test_findall_invalid_attribute(self):
# Make sure findall with an invalid attribute doesn't cause errors.
# The following should not raise an exception.
cs.flavors.findall(vegetable='carrot')
def test_findall_invalid_attribute():
# Make sure findall with an invalid attribute doesn't cause errors.
# The following should not raise an exception.
cs.flavors.findall(vegetable='carrot')
# However, find() should raise an error
assert_raises(NotFound, cs.flavors.find, vegetable='carrot')
# However, find() should raise an error
self.assertRaises(exceptions.NotFound,
cs.flavors.find,
vegetable='carrot')

View File

@ -1,51 +0,0 @@
import mock
import httplib2
from novaclient.client import OpenStackClient
from nose.tools import assert_equal
fake_response = httplib2.Response({"status": 200})
fake_body = '{"hi": "there"}'
mock_request = mock.Mock(return_value=(fake_response, fake_body))
def client():
cl = OpenStackClient("username", "apikey", "project_id", "auth_test")
cl.management_url = "http://example.com"
cl.auth_token = "token"
return cl
def test_get():
cl = client()
@mock.patch.object(httplib2.Http, "request", mock_request)
@mock.patch('time.time', mock.Mock(return_value=1234))
def test_get_call():
resp, body = cl.get("/hi")
mock_request.assert_called_with("http://example.com/hi?fresh=1234",
"GET",
headers={"X-Auth-Token": "token",
"X-Auth-Project-Id": "project_id",
"User-Agent": cl.USER_AGENT})
# Automatic JSON parsing
assert_equal(body, {"hi": "there"})
test_get_call()
def test_post():
cl = client()
@mock.patch.object(httplib2.Http, "request", mock_request)
def test_post_call():
cl.post("/hi", body=[1, 2, 3])
mock_request.assert_called_with("http://example.com/hi", "POST",
headers={
"X-Auth-Token": "token",
"X-Auth-Project-Id": "project_id",
"Content-Type": "application/json",
"User-Agent": cl.USER_AGENT},
body='[1, 2, 3]'
)
test_post_call()

View File

@ -1,37 +0,0 @@
from novaclient import Flavor, NotFound
from fakeserver import FakeServer
from utils import assert_isinstance
from nose.tools import assert_raises, assert_equal
cs = FakeServer()
def test_list_flavors():
fl = cs.flavors.list()
cs.assert_called('GET', '/flavors/detail')
[assert_isinstance(f, Flavor) for f in fl]
def test_list_flavors_undetailed():
fl = cs.flavors.list(detailed=False)
cs.assert_called('GET', '/flavors')
[assert_isinstance(f, Flavor) for f in fl]
def test_get_flavor_details():
f = cs.flavors.get(1)
cs.assert_called('GET', '/flavors/1')
assert_isinstance(f, Flavor)
assert_equal(f.ram, 256)
assert_equal(f.disk, 10)
def test_find():
f = cs.flavors.find(ram=256)
cs.assert_called('GET', '/flavors/detail')
assert_equal(f.name, '256 MB Server')
f = cs.flavors.find(disk=20)
assert_equal(f.name, '512 MB Server')
assert_raises(NotFound, cs.flavors.find, disk=12345)

58
tests/test_http.py Normal file
View File

@ -0,0 +1,58 @@
import httplib2
import mock
from novaclient import client
from tests import utils
fake_response = httplib2.Response({"status": 200})
fake_body = '{"hi": "there"}'
mock_request = mock.Mock(return_value=(fake_response, fake_body))
def get_client():
cl = client.HTTPClient("username", "apikey",
"project_id", "auth_test")
cl.management_url = "http://example.com"
cl.auth_token = "token"
return cl
class ClientTest(utils.TestCase):
def test_get(self):
cl = get_client()
@mock.patch.object(httplib2.Http, "request", mock_request)
@mock.patch('time.time', mock.Mock(return_value=1234))
def test_get_call():
resp, body = cl.get("/hi")
headers={"X-Auth-Token": "token",
"X-Auth-Project-Id": "project_id",
"User-Agent": cl.USER_AGENT,
}
mock_request.assert_called_with("http://example.com/hi?fresh=1234",
"GET", headers=headers)
# Automatic JSON parsing
self.assertEqual(body, {"hi": "there"})
test_get_call()
def test_post(self):
cl = get_client()
@mock.patch.object(httplib2.Http, "request", mock_request)
def test_post_call():
cl.post("/hi", body=[1, 2, 3])
headers={
"X-Auth-Token": "token",
"X-Auth-Project-Id": "project_id",
"Content-Type": "application/json",
"User-Agent": cl.USER_AGENT
}
mock_request.assert_called_with("http://example.com/hi", "POST",
headers=headers, body='[1, 2, 3]')
test_post_call()

View File

@ -1,47 +0,0 @@
from novaclient import Image
from fakeserver import FakeServer
from utils import assert_isinstance
from nose.tools import assert_equal
cs = FakeServer()
def test_list_images():
il = cs.images.list()
cs.assert_called('GET', '/images/detail')
[assert_isinstance(i, Image) for i in il]
def test_list_images_undetailed():
il = cs.images.list(detailed=False)
cs.assert_called('GET', '/images')
[assert_isinstance(i, Image) for i in il]
def test_get_image_details():
i = cs.images.get(1)
cs.assert_called('GET', '/images/1')
assert_isinstance(i, Image)
assert_equal(i.id, 1)
assert_equal(i.name, 'CentOS 5.2')
def test_create_image():
i = cs.images.create(server=1234, name="Just in case")
cs.assert_called('POST', '/images')
assert_isinstance(i, Image)
def test_delete_image():
cs.images.delete(1)
cs.assert_called('DELETE', '/images/1')
def test_find():
i = cs.images.find(name="CentOS 5.2")
assert_equal(i.id, 1)
cs.assert_called('GET', '/images/detail')
iml = cs.images.findall(status='SAVING')
assert_equal(len(iml), 1)
assert_equal(iml[0].name, 'My Server Backup')

View File

@ -1,48 +0,0 @@
from novaclient import IPGroup
from fakeserver import FakeServer
from utils import assert_isinstance
from nose.tools import assert_equal
cs = FakeServer()
def test_list_ipgroups():
ipl = cs.ipgroups.list()
cs.assert_called('GET', '/shared_ip_groups/detail')
[assert_isinstance(ipg, IPGroup) for ipg in ipl]
def test_list_ipgroups_undetailed():
ipl = cs.ipgroups.list(detailed=False)
cs.assert_called('GET', '/shared_ip_groups')
[assert_isinstance(ipg, IPGroup) for ipg in ipl]
def test_get_ipgroup():
ipg = cs.ipgroups.get(1)
cs.assert_called('GET', '/shared_ip_groups/1')
assert_isinstance(ipg, IPGroup)
def test_create_ipgroup():
ipg = cs.ipgroups.create("My group", 1234)
cs.assert_called('POST', '/shared_ip_groups')
assert_isinstance(ipg, IPGroup)
def test_delete_ipgroup():
ipg = cs.ipgroups.get(1)
ipg.delete()
cs.assert_called('DELETE', '/shared_ip_groups/1')
cs.ipgroups.delete(ipg)
cs.assert_called('DELETE', '/shared_ip_groups/1')
cs.ipgroups.delete(1)
cs.assert_called('DELETE', '/shared_ip_groups/1')
def test_find():
ipg = cs.ipgroups.find(name='group1')
cs.assert_called('GET', '/shared_ip_groups/detail')
assert_equal(ipg.name, 'group1')
ipgl = cs.ipgroups.findall(id=1)
assert_equal(ipgl, [IPGroup(None, {'id': 1})])

View File

@ -1,182 +0,0 @@
import StringIO
from nose.tools import assert_equal
from fakeserver import FakeServer
from utils import assert_isinstance
from novaclient import Server
cs = FakeServer()
def test_list_servers():
sl = cs.servers.list()
cs.assert_called('GET', '/servers/detail')
[assert_isinstance(s, Server) for s in sl]
def test_list_servers_undetailed():
sl = cs.servers.list(detailed=False)
cs.assert_called('GET', '/servers')
[assert_isinstance(s, Server) for s in sl]
def test_get_server_details():
s = cs.servers.get(1234)
cs.assert_called('GET', '/servers/1234')
assert_isinstance(s, Server)
assert_equal(s.id, 1234)
assert_equal(s.status, 'BUILD')
def test_create_server():
s = cs.servers.create(
name="My server",
image=1,
flavor=1,
meta={'foo': 'bar'},
ipgroup=1,
files={
'/etc/passwd': 'some data', # a file
'/tmp/foo.txt': StringIO.StringIO('data') # a stream
}
)
cs.assert_called('POST', '/servers')
assert_isinstance(s, Server)
def test_update_server():
s = cs.servers.get(1234)
# Update via instance
s.update(name='hi')
cs.assert_called('PUT', '/servers/1234')
s.update(name='hi', password='there')
cs.assert_called('PUT', '/servers/1234')
# Silly, but not an error
s.update()
# Update via manager
cs.servers.update(s, name='hi')
cs.assert_called('PUT', '/servers/1234')
cs.servers.update(1234, password='there')
cs.assert_called('PUT', '/servers/1234')
cs.servers.update(s, name='hi', password='there')
cs.assert_called('PUT', '/servers/1234')
def test_delete_server():
s = cs.servers.get(1234)
s.delete()
cs.assert_called('DELETE', '/servers/1234')
cs.servers.delete(1234)
cs.assert_called('DELETE', '/servers/1234')
cs.servers.delete(s)
cs.assert_called('DELETE', '/servers/1234')
def test_find():
s = cs.servers.find(name='sample-server')
cs.assert_called('GET', '/servers/detail')
assert_equal(s.name, 'sample-server')
# Find with multiple results arbitraility returns the first item
s = cs.servers.find(flavorId=1)
sl = cs.servers.findall(flavorId=1)
assert_equal(sl[0], s)
assert_equal([s.id for s in sl], [1234, 5678])
def test_share_ip():
s = cs.servers.get(1234)
# Share via instance
s.share_ip(ipgroup=1, address='1.2.3.4')
cs.assert_called('PUT', '/servers/1234/ips/public/1.2.3.4')
# Share via manager
cs.servers.share_ip(s, ipgroup=1, address='1.2.3.4', configure=False)
cs.assert_called('PUT', '/servers/1234/ips/public/1.2.3.4')
def test_unshare_ip():
s = cs.servers.get(1234)
# Unshare via instance
s.unshare_ip('1.2.3.4')
cs.assert_called('DELETE', '/servers/1234/ips/public/1.2.3.4')
# Unshare via manager
cs.servers.unshare_ip(s, '1.2.3.4')
cs.assert_called('DELETE', '/servers/1234/ips/public/1.2.3.4')
def test_reboot_server():
s = cs.servers.get(1234)
s.reboot()
cs.assert_called('POST', '/servers/1234/action')
cs.servers.reboot(s, type='HARD')
cs.assert_called('POST', '/servers/1234/action')
def test_rebuild_server():
s = cs.servers.get(1234)
s.rebuild(image=1)
cs.assert_called('POST', '/servers/1234/action')
cs.servers.rebuild(s, image=1)
cs.assert_called('POST', '/servers/1234/action')
def test_resize_server():
s = cs.servers.get(1234)
s.resize(flavor=1)
cs.assert_called('POST', '/servers/1234/action')
cs.servers.resize(s, flavor=1)
cs.assert_called('POST', '/servers/1234/action')
def test_confirm_resized_server():
s = cs.servers.get(1234)
s.confirm_resize()
cs.assert_called('POST', '/servers/1234/action')
cs.servers.confirm_resize(s)
cs.assert_called('POST', '/servers/1234/action')
def test_revert_resized_server():
s = cs.servers.get(1234)
s.revert_resize()
cs.assert_called('POST', '/servers/1234/action')
cs.servers.revert_resize(s)
cs.assert_called('POST', '/servers/1234/action')
def test_backup_server():
s = cs.servers.get(1234)
s.backup("ImageName", "daily", 10)
cs.assert_called('POST', '/servers/1234/action')
cs.servers.backup(s, "ImageName", "daily", 10)
cs.assert_called('POST', '/servers/1234/action')
def test_migrate_server():
s = cs.servers.get(1234)
s.migrate()
cs.assert_called('POST', '/servers/1234/action')
cs.servers.migrate(s)
cs.assert_called('POST', '/servers/1234/action')
def test_add_fixed_ip():
s = cs.servers.get(1234)
s.add_fixed_ip(1)
cs.assert_called('POST', '/servers/1234/action')
cs.servers.add_fixed_ip(s, 1)
cs.assert_called('POST', '/servers/1234/action')
def test_remove_fixed_ip():
s = cs.servers.get(1234)
s.remove_fixed_ip('10.0.0.1')
cs.assert_called('POST', '/servers/1234/action')
cs.servers.remove_fixed_ip(s, '10.0.0.1')
cs.assert_called('POST', '/servers/1234/action')

View File

@ -1,372 +1,39 @@
import os
import mock
import httplib2
from nose.tools import assert_raises, assert_equal
from novaclient.shell import OpenStackShell, CommandError
from fakeserver import FakeServer
from utils import assert_in
from novaclient.shell import OpenStackComputeShell
from novaclient import exceptions
from tests import utils
# Patch os.environ to avoid required auth info.
def setup():
global _old_env
fake_env = {
'NOVA_USERNAME': 'username',
'NOVA_API_KEY': 'password',
'NOVA_PROJECT_ID': 'project_id'
}
_old_env, os.environ = os.environ, fake_env.copy()
class ShellTest(utils.TestCase):
# Make a fake shell object, a helping wrapper to call it, and a quick way
# of asserting that certain API calls were made.
global shell, _shell, assert_called, assert_called_anytime
_shell = OpenStackShell()
_shell._api_class = FakeServer
assert_called = lambda m, u, b=None: _shell.cs.assert_called(m, u, b)
assert_called_anytime = lambda m, u, b=None: \
_shell.cs.assert_called_anytime(m, u, b)
shell = lambda cmd: _shell.main(cmd.split())
def teardown():
global _old_env
os.environ = _old_env
def test_backup_schedule():
shell('backup-schedule 1234')
assert_called('GET', '/servers/1234/backup_schedule')
shell('backup-schedule sample-server --weekly monday')
assert_called(
'POST', '/servers/1234/backup_schedule',
{'backupSchedule': {'enabled': True, 'daily': 'DISABLED',
'weekly': 'MONDAY'}}
)
shell('backup-schedule sample-server '
'--weekly disabled --daily h_0000_0200')
assert_called(
'POST', '/servers/1234/backup_schedule',
{'backupSchedule': {'enabled': True, 'daily': 'H_0000_0200',
'weekly': 'DISABLED'}}
)
shell('backup-schedule sample-server --disable')
assert_called(
'POST', '/servers/1234/backup_schedule',
{'backupSchedule': {'enabled': False, 'daily': 'DISABLED',
'weekly': 'DISABLED'}}
)
def test_backup_schedule_delete():
shell('backup-schedule-delete 1234')
assert_called('DELETE', '/servers/1234/backup_schedule')
def test_boot():
shell('boot --image 1 some-server')
assert_called(
'POST', '/servers',
{'server': {'flavorId': 1, 'name': 'some-server', 'imageId': '1',
'min_count': 1, 'max_count': 1}}
)
shell('boot --image 1 --meta foo=bar --meta spam=eggs some-server ')
assert_called(
'POST', '/servers',
{'server': {'flavorId': 1, 'name': 'some-server', 'imageId': '1',
'min_count': 1, 'max_count': 1,
'metadata': {'foo': 'bar', 'spam': 'eggs'}}}
)
def test_boot_files():
testfile = os.path.join(os.path.dirname(__file__), 'testfile.txt')
expected_file_data = open(testfile).read().encode('base64')
shell('boot some-server --image 1 --file /tmp/foo=%s --file /tmp/bar=%s' %
(testfile, testfile))
assert_called(
'POST', '/servers',
{'server': {'flavorId': 1, 'name': 'some-server', 'imageId': '1',
'min_count': 1, 'max_count': 1,
'personality': [
{'path': '/tmp/bar', 'contents': expected_file_data},
{'path': '/tmp/foo', 'contents': expected_file_data}
]}
# Patch os.environ to avoid required auth info.
def setUp(self):
global _old_env
fake_env = {
'NOVA_USERNAME': 'username',
'NOVA_API_KEY': 'password',
'NOVA_PROJECT_ID': 'project_id',
}
)
_old_env, os.environ = os.environ, fake_env.copy()
# Make a fake shell object, a helping wrapper to call it, and a quick way
# of asserting that certain API calls were made.
global shell, _shell, assert_called, assert_called_anytime
_shell = OpenStackComputeShell()
shell = lambda cmd: _shell.main(cmd.split())
def test_boot_invalid_file():
invalid_file = os.path.join(os.path.dirname(__file__), 'asdfasdfasdfasdf')
assert_raises(CommandError, shell, 'boot some-server --image 1 '
'--file /foo=%s' % invalid_file)
def tearDown(self):
global _old_env
os.environ = _old_env
def test_help_unknown_command(self):
self.assertRaises(exceptions.CommandError, shell, 'help foofoo')
def test_boot_key_auto():
mock_exists = mock.Mock(return_value=True)
mock_open = mock.Mock()
mock_open.return_value = mock.Mock()
mock_open.return_value.read = mock.Mock(return_value='SSHKEY')
@mock.patch('os.path.exists', mock_exists)
@mock.patch('__builtin__.open', mock_open)
def test_shell_call():
shell('boot some-server --image 1 --key')
assert_called(
'POST', '/servers',
{'server': {'flavorId': 1, 'name': 'some-server', 'imageId': '1',
'min_count': 1, 'max_count': 1,
'personality': [{
'path': '/root/.ssh/authorized_keys2',
'contents': ('SSHKEY').encode('base64')},
]}
}
)
test_shell_call()
def test_boot_key_auto_no_keys():
mock_exists = mock.Mock(return_value=False)
@mock.patch('os.path.exists', mock_exists)
def test_shell_call():
assert_raises(CommandError, shell, 'boot some-server --image 1 --key')
test_shell_call()
def test_boot_key_file():
testfile = os.path.join(os.path.dirname(__file__), 'testfile.txt')
expected_file_data = open(testfile).read().encode('base64')
shell('boot some-server --image 1 --key %s' % testfile)
assert_called(
'POST', '/servers',
{'server': {'flavorId': 1, 'name': 'some-server', 'imageId': '1',
'min_count': 1, 'max_count': 1,
'personality': [
{'path': '/root/.ssh/authorized_keys2', 'contents':
expected_file_data},
]}
}
)
def test_boot_invalid_keyfile():
invalid_file = os.path.join(os.path.dirname(__file__), 'asdfasdfasdfasdf')
assert_raises(CommandError, shell, 'boot some-server '
'--image 1 --key %s' % invalid_file)
def test_boot_ipgroup():
shell('boot --image 1 --ipgroup 1 some-server')
assert_called(
'POST', '/servers',
{'server': {'flavorId': 1, 'name': 'some-server', 'imageId': '1',
'sharedIpGroupId': 1, 'min_count': 1, 'max_count': 1}}
)
def test_boot_ipgroup_name():
shell('boot --image 1 --ipgroup group1 some-server')
assert_called(
'POST', '/servers',
{'server': {'flavorId': 1, 'name': 'some-server', 'imageId': '1',
'sharedIpGroupId': 1, 'min_count': 1, 'max_count': 1}}
)
def test_flavor_list():
shell('flavor-list')
assert_called_anytime('GET', '/flavors/detail')
def test_image_list():
shell('image-list')
assert_called('GET', '/images/detail')
def test_snapshot_create():
shell('image-create sample-server mysnapshot')
assert_called(
'POST', '/images',
{'image': {'name': 'mysnapshot', 'serverId': 1234}}
)
def test_image_delete():
shell('image-delete 1')
assert_called('DELETE', '/images/1')
def test_ip_share():
shell('ip-share sample-server 1 1.2.3.4')
assert_called(
'PUT', '/servers/1234/ips/public/1.2.3.4',
{'shareIp': {'sharedIpGroupId': 1, 'configureServer': True}}
)
def test_ip_unshare():
shell('ip-unshare sample-server 1.2.3.4')
assert_called('DELETE', '/servers/1234/ips/public/1.2.3.4')
def test_ipgroup_list():
shell('ipgroup-list')
assert_in(('GET', '/shared_ip_groups/detail', None),
_shell.cs.client.callstack)
assert_called('GET', '/servers/5678')
def test_ipgroup_show():
shell('ipgroup-show 1')
assert_called('GET', '/shared_ip_groups/1')
shell('ipgroup-show group2')
# does a search, not a direct GET
assert_called('GET', '/shared_ip_groups/detail')
def test_ipgroup_create():
shell('ipgroup-create a-group')
assert_called(
'POST', '/shared_ip_groups',
{'sharedIpGroup': {'name': 'a-group'}}
)
shell('ipgroup-create a-group sample-server')
assert_called(
'POST', '/shared_ip_groups',
{'sharedIpGroup': {'name': 'a-group', 'server': 1234}}
)
def test_ipgroup_delete():
shell('ipgroup-delete group1')
assert_called('DELETE', '/shared_ip_groups/1')
def test_list():
shell('list')
assert_called('GET', '/servers/detail')
def test_reboot():
shell('reboot sample-server')
assert_called('POST', '/servers/1234/action', {'reboot': {'type': 'SOFT'}})
shell('reboot sample-server --hard')
assert_called('POST', '/servers/1234/action', {'reboot': {'type': 'HARD'}})
def test_rebuild():
shell('rebuild sample-server 1')
assert_called('POST', '/servers/1234/action', {'rebuild': {'imageId': 1}})
def test_rename():
shell('rename sample-server newname')
assert_called('PUT', '/servers/1234', {'server': {'name': 'newname'}})
def test_resize():
shell('resize sample-server 1')
assert_called('POST', '/servers/1234/action', {'resize': {'flavorId': 1}})
def test_resize_confirm():
shell('resize-confirm sample-server')
assert_called('POST', '/servers/1234/action', {'confirmResize': None})
def test_resize_revert():
shell('resize-revert sample-server')
assert_called('POST', '/servers/1234/action', {'revertResize': None})
def test_backup():
shell('backup sample-server mybackup daily 1')
assert_called(
'POST', '/servers/1234/action',
{'createBackup': {'name': 'mybackup', 'backup_type': 'daily',
'rotation': 1}}
)
@mock.patch('getpass.getpass', mock.Mock(return_value='p'))
def test_root_password():
shell('root-password sample-server')
assert_called('PUT', '/servers/1234', {'server': {'adminPass': 'p'}})
def test_show():
shell('show 1234')
# XXX need a way to test multiple calls
# assert_called('GET', '/servers/1234')
assert_called('GET', '/images/2')
def test_delete():
shell('delete 1234')
assert_called('DELETE', '/servers/1234')
shell('delete sample-server')
assert_called('DELETE', '/servers/1234')
def test_zone():
shell('zone 1')
assert_called('GET', '/zones/1')
shell('zone 1 --api_url=http://zzz --zone_username=frank --password=xxx')
assert_called(
'PUT', '/zones/1',
{'zone': {'api_url': 'http://zzz', 'username': 'frank',
'password': 'xxx'}}
)
def test_zone_add():
shell('zone-add http://zzz frank xxx 0.0 1.0')
assert_called(
'POST', '/zones',
{'zone': {'api_url': 'http://zzz', 'username': 'frank',
'password': 'xxx',
'weight_offset': '0.0', 'weight_scale': '1.0'}}
)
def test_zone_delete():
shell('zone-delete 1')
assert_called('DELETE', '/zones/1')
def test_zone_list():
shell('zone-list')
assert_in(('GET', '/zones/detail', None),
_shell.cs.client.callstack)
def test_help():
@mock.patch.object(_shell.parser, 'print_help')
def test_help(m):
shell('help')
m.assert_called()
@mock.patch.object(_shell.subcommands['delete'], 'print_help')
def test_help_delete(m):
shell('help delete')
m.assert_called()
test_help()
test_help_delete()
assert_raises(CommandError, shell, 'help foofoo')
def test_debug():
httplib2.debuglevel = 0
shell('--debug list')
assert httplib2.debuglevel == 1
def test_debug(self):
httplib2.debuglevel = 0
shell('--debug help')
assert httplib2.debuglevel == 1

View File

@ -1,78 +0,0 @@
import StringIO
from nose.tools import assert_equal
from fakeserver import FakeServer
from utils import assert_isinstance
from novaclient import Zone
os = FakeServer()
def test_list_zones():
sl = os.zones.list()
os.assert_called('GET', '/zones/detail')
[assert_isinstance(s, Zone) for s in sl]
def test_list_zones_undetailed():
sl = os.zones.list(detailed=False)
os.assert_called('GET', '/zones')
[assert_isinstance(s, Zone) for s in sl]
def test_get_zone_details():
s = os.zones.get(1)
os.assert_called('GET', '/zones/1')
assert_isinstance(s, Zone)
assert_equal(s.id, 1)
assert_equal(s.api_url, 'http://foo.com')
def test_create_zone():
s = os.zones.create(api_url="http://foo.com", username='bob',
password='xxx')
os.assert_called('POST', '/zones')
assert_isinstance(s, Zone)
def test_update_zone():
s = os.zones.get(1)
# Update via instance
s.update(api_url='http://blah.com')
os.assert_called('PUT', '/zones/1')
s.update(api_url='http://blah.com', username='alice', password='xxx')
os.assert_called('PUT', '/zones/1')
# Silly, but not an error
s.update()
# Update via manager
os.zones.update(s, api_url='http://blah.com')
os.assert_called('PUT', '/zones/1')
os.zones.update(1, api_url='http://blah.com')
os.assert_called('PUT', '/zones/1')
os.zones.update(s, api_url='http://blah.com', username='fred',
password='zip')
os.assert_called('PUT', '/zones/1')
def test_delete_zone():
s = os.zones.get(1)
s.delete()
os.assert_called('DELETE', '/zones/1')
os.zones.delete(1)
os.assert_called('DELETE', '/zones/1')
os.zones.delete(s)
os.assert_called('DELETE', '/zones/1')
def test_find_zone():
s = os.zones.find(password='qwerty')
os.assert_called('GET', '/zones/detail')
assert_equal(s.username, 'bob')
# Find with multiple results returns the first item
s = os.zones.find(api_url='http://foo.com')
sl = os.zones.findall(api_url='http://foo.com')
assert_equal(sl[0], s)
assert_equal([s.id for s in sl], [1, 2])

View File

@ -1 +0,0 @@
OH HAI!

View File

@ -1,29 +1,5 @@
from nose.tools import ok_
import unittest
def fail(msg):
raise AssertionError(msg)
def assert_in(thing, seq, msg=None):
msg = msg or "'%s' not found in %s" % (thing, seq)
ok_(thing in seq, msg)
def assert_not_in(thing, seq, msg=None):
msg = msg or "unexpected '%s' found in %s" % (thing, seq)
ok_(thing not in seq, msg)
def assert_has_keys(dict, required=[], optional=[]):
keys = dict.keys()
for k in required:
assert_in(k, keys, "required key %s missing from %s" % (k, dict))
allowed_keys = set(required) | set(optional)
extra_keys = set(keys).difference(set(required + optional))
if extra_keys:
fail("found unexpected keys: %s" % list(extra_keys))
def assert_isinstance(thing, kls):
ok_(isinstance(thing, kls), "%s is not an instance of %s" % (thing, kls))
class TestCase(unittest.TestCase):
pass

0
tests/v1_0/__init__.py Normal file
View File

View File

@ -1,74 +1,22 @@
"""
A fake server that "responds" to API methods with pre-canned responses.
All of these responses come from the spec, so if for some reason the spec's
wrong the tests might fail. I've indicated in comments the places where actual
behavior differs from the spec.
"""
import httplib2
import urlparse
import urllib
from nose.tools import assert_equal
from novaclient import OpenStack
from novaclient.client import OpenStackClient
from utils import fail, assert_in, assert_not_in, assert_has_keys
import urlparse
from novaclient import client as base_client
from novaclient.v1_0 import client
from tests import fakes
class FakeServer(OpenStack):
def __init__(self, username=None, password=None, project_id=None,
auth_url=None):
super(FakeServer, self).__init__('username', 'apikey',
'project_id', 'auth_url')
self.client = FakeClient()
class FakeClient(fakes.FakeClient, client.Client):
def assert_called(self, method, url, body=None):
"""
Assert than an API method was just called.
"""
expected = (method, url)
called = self.client.callstack[-1][0:2]
assert self.client.callstack, \
"Expected %s %s but no calls were made." % expected
assert expected == called, 'Expected %s %s; got %s %s' % \
(expected + called)
if body is not None:
assert_equal(self.client.callstack[-1][2], body)
self.client.callstack = []
def assert_called_anytime(self, method, url, body=None):
"""
Assert than an API method was called anytime in the test.
"""
expected = (method, url)
assert self.client.callstack, \
"Expected %s %s but no calls were made." % expected
found = False
for entry in self.client.callstack:
called = entry[0:2]
if expected == entry[0:2]:
found = True
break
assert found, 'Expected %s %s; got %s' % \
(expected, self.client.callstack)
if body is not None:
assert_equal(entry[2], body)
self.client.callstack = []
def authenticate(self):
pass
def __init__(self, *args, **kwargs):
client.Client.__init__(self, 'username', 'apikey',
'project_id', 'auth_url')
self.client = FakeHTTPClient(**kwargs)
class FakeClient(OpenStackClient):
def __init__(self):
class FakeHTTPClient(base_client.HTTPClient):
def __init__(self, **kwargs):
self.username = 'username'
self.apikey = 'apikey'
self.auth_url = 'auth_url'
@ -77,15 +25,15 @@ class FakeClient(OpenStackClient):
def _cs_request(self, url, method, **kwargs):
# Check that certain things are called correctly
if method in ['GET', 'DELETE']:
assert_not_in('body', kwargs)
assert 'body' not in kwargs
elif method in ['PUT', 'POST']:
assert_in('body', kwargs)
assert 'body' in kwargs
# Call the method
munged_url = url.strip('/').replace('/', '_').replace('.', '_')
callback = "%s_%s" % (method.lower(), munged_url)
if not hasattr(self, callback):
fail('Called unknown API method: %s %s' % (method, url))
raise AssertionError('Called unknown API method: %s %s' % (method, url))
# Note the call
self.callstack.append((method, url, kwargs.get('body', None)))
@ -202,14 +150,14 @@ class FakeClient(OpenStackClient):
]})
def post_servers(self, body, **kw):
assert_equal(body.keys(), ['server'])
assert_has_keys(body['server'],
required=['name', 'imageId', 'flavorId'],
optional=['sharedIpGroupId', 'metadata',
'personality', 'min_count', 'max_count'])
assert body.keys() == ['server']
fakes.assert_has_keys(body['server'],
required=['name', 'imageId', 'flavorId'],
optional=['sharedIpGroupId', 'metadata',
'personality', 'min_count', 'max_count'])
if 'personality' in body['server']:
for pfile in body['server']['personality']:
assert_has_keys(pfile, required=['path', 'contents'])
fakes.assert_has_keys(pfile, required=['path', 'contents'])
return (202, self.get_servers_1234()[1])
def get_servers_1234(self, **kw):
@ -221,8 +169,8 @@ class FakeClient(OpenStackClient):
return (200, r)
def put_servers_1234(self, body, **kw):
assert_equal(body.keys(), ['server'])
assert_has_keys(body['server'], optional=['name', 'adminPass'])
assert body.keys() == ['server']
fakes.assert_has_keys(body['server'], optional=['name', 'adminPass'])
return (204, None)
def delete_servers_1234(self, **kw):
@ -245,8 +193,8 @@ class FakeClient(OpenStackClient):
self.get_servers_1234_ips()[1]['addresses']['private']})
def put_servers_1234_ips_public_1_2_3_4(self, body, **kw):
assert_equal(body.keys(), ['shareIp'])
assert_has_keys(body['shareIp'], required=['sharedIpGroupId',
assert body.keys() == ['shareIp']
fakes.assert_has_keys(body['shareIp'], required=['sharedIpGroupId',
'configureServer'])
return (202, None)
@ -258,32 +206,32 @@ class FakeClient(OpenStackClient):
#
def post_servers_1234_action(self, body, **kw):
assert_equal(len(body.keys()), 1)
assert len(body.keys()) == 1
action = body.keys()[0]
if action == 'reboot':
assert_equal(body[action].keys(), ['type'])
assert_in(body[action]['type'], ['HARD', 'SOFT'])
assert body[action].keys() == ['type']
assert body[action]['type'] in ['HARD', 'SOFT']
elif action == 'rebuild':
assert_equal(body[action].keys(), ['imageId'])
assert body[action].keys() == ['imageId']
elif action == 'resize':
assert_equal(body[action].keys(), ['flavorId'])
assert body[action].keys() == ['flavorId']
elif action == 'createBackup':
assert_equal(set(body[action].keys()),
set(['name', 'rotation', 'backup_type']))
assert set(body[action].keys()) == \
set(['name', 'rotation', 'backup_type'])
elif action == 'confirmResize':
assert_equal(body[action], None)
assert body[action] is None
# This one method returns a different response code
return (204, None)
elif action == 'revertResize':
assert_equal(body[action], None)
assert body[action] is None
elif action == 'migrate':
assert_equal(body[action], None)
assert body[action] is None
elif action == 'addFixedIp':
assert_equal(body[action].keys(), ['networkId'])
assert body[action].keys() == ['networkId']
elif action == 'removeFixedIp':
assert_equal(body[action].keys(), ['address'])
assert body[action].keys() == ['address']
else:
fail("Unexpected server action: %s" % action)
raise AssertionError("Unexpected server action: %s" % action)
return (202, None)
#
@ -344,8 +292,8 @@ class FakeClient(OpenStackClient):
return (200, {'image': self.get_images_detail()[1]['images'][1]})
def post_images(self, body, **kw):
assert_equal(body.keys(), ['image'])
assert_has_keys(body['image'], required=['serverId', 'name'])
assert body.keys() == ['image']
fakes.assert_has_keys(body['image'], required=['serverId', 'name'])
return (202, self.get_images_1()[1])
def delete_images_1(self, **kw):
@ -362,8 +310,8 @@ class FakeClient(OpenStackClient):
}})
def post_servers_1234_backup_schedule(self, body, **kw):
assert_equal(body.keys(), ['backupSchedule'])
assert_has_keys(body['backupSchedule'], required=['enabled'],
assert body.keys() == ['backupSchedule']
fakes.assert_has_keys(body['backupSchedule'], required=['enabled'],
optional=['weekly', 'daily'])
return (204, None)
@ -390,8 +338,8 @@ class FakeClient(OpenStackClient):
self.get_shared_ip_groups_detail()[1]['sharedIpGroups'][0]})
def post_shared_ip_groups(self, body, **kw):
assert_equal(body.keys(), ['sharedIpGroup'])
assert_has_keys(body['sharedIpGroup'], required=['name'],
assert body.keys() == ['sharedIpGroup']
fakes.assert_has_keys(body['sharedIpGroup'], required=['name'],
optional=['server'])
return (201, {'sharedIpGroup': {
'id': 10101,
@ -429,16 +377,16 @@ class FakeClient(OpenStackClient):
return (200, r)
def post_zones(self, body, **kw):
assert_equal(body.keys(), ['zone'])
assert_has_keys(body['zone'],
assert body.keys() == ['zone']
fakes.assert_has_keys(body['zone'],
required=['api_url', 'username', 'password'],
optional=['weight_offset', 'weight_scale'])
return (202, self.get_zones_1()[1])
def put_zones_1(self, body, **kw):
assert_equal(body.keys(), ['zone'])
assert_has_keys(body['zone'], optional=['api_url', 'username',
assert body.keys() == ['zone']
fakes.assert_has_keys(body['zone'], optional=['api_url', 'username',
'password',
'weight_offset',
'weight_scale'])
@ -451,12 +399,14 @@ class FakeClient(OpenStackClient):
# Accounts
#
def post_accounts_test_account_create_instance(self, body, **kw):
assert_equal(body.keys(), ['server'])
assert_has_keys(body['server'],
assert body.keys() == ['server']
fakes.assert_has_keys(body['server'],
required=['name', 'imageId', 'flavorId'],
optional=['sharedIpGroupId', 'metadata',
'personality', 'min_count', 'max_count'])
if 'personality' in body['server']:
for pfile in body['server']['personality']:
assert_has_keys(pfile, required=['path', 'contents'])
fakes.assert_has_keys(pfile, required=['path', 'contents'])
return (202, self.get_servers_1234()[1])

View File

@ -0,0 +1,25 @@
import StringIO
from tests.v1_0 import fakes
from tests import utils
cs = fakes.FakeClient()
class AccountsTest(utils.TestCase):
def test_instance_creation_for_account(self):
s = cs.accounts.create_instance_for(
account_id='test_account',
name="My server",
image=1,
flavor=1,
meta={'foo': 'bar'},
ipgroup=1,
files={
'/etc/passwd': 'some data', # a file
'/tmp/foo.txt': StringIO.StringIO('data') # a stream
})
cs.assert_called('POST', '/accounts/test_account/create_instance')

74
tests/v1_0/test_auth.py Normal file
View File

@ -0,0 +1,74 @@
import httplib2
import mock
from novaclient.v1_0 import client
from novaclient import exceptions
from tests import utils
class AuthenticationTests(utils.TestCase):
def test_authenticate_success(self):
cs = client.Client("username", "apikey", "project_id")
management_url = 'https://servers.api.rackspacecloud.com/v1.0/443470'
auth_response = httplib2.Response({
'status': 204,
'x-server-management-url': management_url,
'x-auth-token': '1b751d74-de0c-46ae-84f0-915744b582d1',
})
mock_request = mock.Mock(return_value=(auth_response, None))
@mock.patch.object(httplib2.Http, "request", mock_request)
def test_auth_call():
cs.client.authenticate()
headers={
'X-Auth-User': 'username',
'X-Auth-Key': 'apikey',
'X-Auth-Project-Id': 'project_id',
'User-Agent': cs.client.USER_AGENT
}
mock_request.assert_called_with(cs.client.auth_url, 'GET',
headers=headers)
self.assertEqual(cs.client.management_url,
auth_response['x-server-management-url'])
self.assertEqual(cs.client.auth_token,
auth_response['x-auth-token'])
test_auth_call()
def test_authenticate_failure(self):
cs = client.Client("username", "apikey", "project_id")
auth_response = httplib2.Response({'status': 401})
mock_request = mock.Mock(return_value=(auth_response, None))
@mock.patch.object(httplib2.Http, "request", mock_request)
def test_auth_call():
self.assertRaises(exceptions.Unauthorized, cs.client.authenticate)
test_auth_call()
def test_auth_automatic(self):
cs = client.Client("username", "apikey", "project_id")
http_client = cs.client
http_client.management_url = ''
mock_request = mock.Mock(return_value=(None, None))
@mock.patch.object(http_client, 'request', mock_request)
@mock.patch.object(http_client, 'authenticate')
def test_auth_call(m):
http_client.get('/')
m.assert_called()
mock_request.assert_called()
test_auth_call()
def test_auth_manual(self):
cs = client.Client("username", "apikey", "project_id")
@mock.patch.object(cs.client, 'authenticate')
def test_auth_call(m):
cs.authenticate()
m.assert_called()
test_auth_call()

View File

@ -0,0 +1,60 @@
from novaclient.v1_0 import backup_schedules
from tests.v1_0 import fakes
from tests import utils
cs = fakes.FakeClient()
class BackupSchedulesTest(utils.TestCase):
def test_get_backup_schedule(self):
s = cs.servers.get(1234)
# access via manager
b = cs.backup_schedules.get(server=s)
self.assertTrue(isinstance(b, backup_schedules.BackupSchedule))
cs.assert_called('GET', '/servers/1234/backup_schedule')
b = cs.backup_schedules.get(server=1234)
self.assertTrue(isinstance(b, backup_schedules.BackupSchedule))
cs.assert_called('GET', '/servers/1234/backup_schedule')
# access via instance
self.assertTrue(isinstance(s.backup_schedule,
backup_schedules.BackupSchedule))
cs.assert_called('GET', '/servers/1234/backup_schedule')
# Just for coverage's sake
b = s.backup_schedule.get()
cs.assert_called('GET', '/servers/1234/backup_schedule')
def test_create_update_backup_schedule(self):
s = cs.servers.get(1234)
# create/update via manager
cs.backup_schedules.update(
server=s,
enabled=True,
weekly=backup_schedules.BACKUP_WEEKLY_THURSDAY,
daily=backup_schedules.BACKUP_DAILY_H_1000_1200
)
cs.assert_called('POST', '/servers/1234/backup_schedule')
# and via instance
s.backup_schedule.update(enabled=False)
cs.assert_called('POST', '/servers/1234/backup_schedule')
def test_delete_backup_schedule(self):
s = cs.servers.get(1234)
# delete via manager
cs.backup_schedules.delete(s)
cs.assert_called('DELETE', '/servers/1234/backup_schedule')
cs.backup_schedules.delete(1234)
cs.assert_called('DELETE', '/servers/1234/backup_schedule')
# and via instance
s.backup_schedule.delete()
cs.assert_called('DELETE', '/servers/1234/backup_schedule')

View File

@ -0,0 +1,38 @@
from novaclient import exceptions
from novaclient.v1_0 import flavors
from tests.v1_0 import fakes
from tests import utils
cs = fakes.FakeClient()
class FlavorsTest(utils.TestCase):
def test_list_flavors(self):
fl = cs.flavors.list()
cs.assert_called('GET', '/flavors/detail')
[self.assertTrue(isinstance(f, flavors.Flavor)) for f in fl]
def test_list_flavors_undetailed(self):
fl = cs.flavors.list(detailed=False)
cs.assert_called('GET', '/flavors')
[self.assertTrue(isinstance(f, flavors.Flavor)) for f in fl]
def test_get_flavor_details(self):
f = cs.flavors.get(1)
cs.assert_called('GET', '/flavors/1')
self.assertTrue(isinstance(f, flavors.Flavor))
self.assertEqual(f.ram, 256)
self.assertEqual(f.disk, 10)
def test_find(self):
f = cs.flavors.find(ram=256)
cs.assert_called('GET', '/flavors/detail')
self.assertEqual(f.name, '256 MB Server')
f = cs.flavors.find(disk=20)
self.assertEqual(f.name, '512 MB Server')
self.assertRaises(exceptions.NotFound, cs.flavors.find, disk=12345)

45
tests/v1_0/test_images.py Normal file
View File

@ -0,0 +1,45 @@
from novaclient.v1_0 import images
from tests.v1_0 import fakes
from tests import utils
cs = fakes.FakeClient()
class ImagesTest(utils.TestCase):
def test_list_images(self):
il = cs.images.list()
cs.assert_called('GET', '/images/detail')
[self.assertTrue(isinstance(i, images.Image)) for i in il]
def test_list_images_undetailed(self):
il = cs.images.list(detailed=False)
cs.assert_called('GET', '/images')
[self.assertTrue(isinstance(i, images.Image)) for i in il]
def test_get_image_details(self):
i = cs.images.get(1)
cs.assert_called('GET', '/images/1')
self.assertTrue(isinstance(i, images.Image))
self.assertEqual(i.id, 1)
self.assertEqual(i.name, 'CentOS 5.2')
def test_create_image(self):
i = cs.images.create(server=1234, name="Just in case")
cs.assert_called('POST', '/images')
self.assertTrue(isinstance(i, images.Image))
def test_delete_image(self):
cs.images.delete(1)
cs.assert_called('DELETE', '/images/1')
def test_find(self):
i = cs.images.find(name="CentOS 5.2")
self.assertEqual(i.id, 1)
cs.assert_called('GET', '/images/detail')
iml = cs.images.findall(status='SAVING')
self.assertEqual(len(iml), 1)
self.assertEqual(iml[0].name, 'My Server Backup')

View File

@ -0,0 +1,48 @@
from novaclient.v1_0 import ipgroups
from tests.v1_0 import fakes
from tests import utils
cs = fakes.FakeClient()
class IPGroupTest(utils.TestCase):
def test_list_ipgroups(self):
ipl = cs.ipgroups.list()
cs.assert_called('GET', '/shared_ip_groups/detail')
[self.assertTrue(isinstance(ipg, ipgroups.IPGroup)) \
for ipg in ipl]
def test_list_ipgroups_undetailed(self):
ipl = cs.ipgroups.list(detailed=False)
cs.assert_called('GET', '/shared_ip_groups')
[self.assertTrue(isinstance(ipg, ipgroups.IPGroup)) \
for ipg in ipl]
def test_get_ipgroup(self):
ipg = cs.ipgroups.get(1)
cs.assert_called('GET', '/shared_ip_groups/1')
self.assertTrue(isinstance(ipg, ipgroups.IPGroup))
def test_create_ipgroup(self):
ipg = cs.ipgroups.create("My group", 1234)
cs.assert_called('POST', '/shared_ip_groups')
self.assertTrue(isinstance(ipg, ipgroups.IPGroup))
def test_delete_ipgroup(self):
ipg = cs.ipgroups.get(1)
ipg.delete()
cs.assert_called('DELETE', '/shared_ip_groups/1')
cs.ipgroups.delete(ipg)
cs.assert_called('DELETE', '/shared_ip_groups/1')
cs.ipgroups.delete(1)
cs.assert_called('DELETE', '/shared_ip_groups/1')
def test_find(self):
ipg = cs.ipgroups.find(name='group1')
cs.assert_called('GET', '/shared_ip_groups/detail')
self.assertEqual(ipg.name, 'group1')
ipgl = cs.ipgroups.findall(id=1)
self.assertEqual(ipgl, [ipgroups.IPGroup(None, {'id': 1})])

169
tests/v1_0/test_servers.py Normal file
View File

@ -0,0 +1,169 @@
import StringIO
from novaclient.v1_0 import servers
from tests.v1_0 import fakes
from tests import utils
cs = fakes.FakeClient()
class ServersTest(utils.TestCase):
def test_list_servers(self):
sl = cs.servers.list()
cs.assert_called('GET', '/servers/detail')
[self.assertTrue(isinstance(s, servers.Server)) for s in sl]
def test_list_servers_undetailed(self):
sl = cs.servers.list(detailed=False)
cs.assert_called('GET', '/servers')
[self.assertTrue(isinstance(s, servers.Server)) for s in sl]
def test_get_server_details(self):
s = cs.servers.get(1234)
cs.assert_called('GET', '/servers/1234')
self.assertTrue(isinstance(s, servers.Server))
self.assertEqual(s.id, 1234)
self.assertEqual(s.status, 'BUILD')
def test_create_server(self):
s = cs.servers.create(
name="My server",
image=1,
flavor=1,
meta={'foo': 'bar'},
ipgroup=1,
files={
'/etc/passwd': 'some data', # a file
'/tmp/foo.txt': StringIO.StringIO('data') # a stream
}
)
cs.assert_called('POST', '/servers')
self.assertTrue(isinstance(s, servers.Server))
def test_update_server(self):
s = cs.servers.get(1234)
# Update via instance
s.update(name='hi')
cs.assert_called('PUT', '/servers/1234')
s.update(name='hi', password='there')
cs.assert_called('PUT', '/servers/1234')
# Silly, but not an error
s.update()
# Update via manager
cs.servers.update(s, name='hi')
cs.assert_called('PUT', '/servers/1234')
cs.servers.update(1234, password='there')
cs.assert_called('PUT', '/servers/1234')
cs.servers.update(s, name='hi', password='there')
cs.assert_called('PUT', '/servers/1234')
def test_delete_server(self):
s = cs.servers.get(1234)
s.delete()
cs.assert_called('DELETE', '/servers/1234')
cs.servers.delete(1234)
cs.assert_called('DELETE', '/servers/1234')
cs.servers.delete(s)
cs.assert_called('DELETE', '/servers/1234')
def test_find(self):
s = cs.servers.find(name='sample-server')
cs.assert_called('GET', '/servers/detail')
self.assertEqual(s.name, 'sample-server')
# Find with multiple results arbitraility returns the first item
s = cs.servers.find(flavorId=1)
sl = cs.servers.findall(flavorId=1)
self.assertEqual(sl[0], s)
self.assertEqual([s.id for s in sl], [1234, 5678])
def test_share_ip(self):
s = cs.servers.get(1234)
# Share via instance
s.share_ip(ipgroup=1, address='1.2.3.4')
cs.assert_called('PUT', '/servers/1234/ips/public/1.2.3.4')
# Share via manager
cs.servers.share_ip(s, ipgroup=1, address='1.2.3.4', configure=False)
cs.assert_called('PUT', '/servers/1234/ips/public/1.2.3.4')
def test_unshare_ip(self):
s = cs.servers.get(1234)
# Unshare via instance
s.unshare_ip('1.2.3.4')
cs.assert_called('DELETE', '/servers/1234/ips/public/1.2.3.4')
# Unshare via manager
cs.servers.unshare_ip(s, '1.2.3.4')
cs.assert_called('DELETE', '/servers/1234/ips/public/1.2.3.4')
def test_reboot_server(self):
s = cs.servers.get(1234)
s.reboot()
cs.assert_called('POST', '/servers/1234/action')
cs.servers.reboot(s, type='HARD')
cs.assert_called('POST', '/servers/1234/action')
def test_rebuild_server(self):
s = cs.servers.get(1234)
s.rebuild(image=1)
cs.assert_called('POST', '/servers/1234/action')
cs.servers.rebuild(s, image=1)
cs.assert_called('POST', '/servers/1234/action')
def test_resize_server(self):
s = cs.servers.get(1234)
s.resize(flavor=1)
cs.assert_called('POST', '/servers/1234/action')
cs.servers.resize(s, flavor=1)
cs.assert_called('POST', '/servers/1234/action')
def test_confirm_resized_server(self):
s = cs.servers.get(1234)
s.confirm_resize()
cs.assert_called('POST', '/servers/1234/action')
cs.servers.confirm_resize(s)
cs.assert_called('POST', '/servers/1234/action')
def test_revert_resized_server(self):
s = cs.servers.get(1234)
s.revert_resize()
cs.assert_called('POST', '/servers/1234/action')
cs.servers.revert_resize(s)
cs.assert_called('POST', '/servers/1234/action')
def test_backup_server(self):
s = cs.servers.get(1234)
s.backup("ImageName", "daily", 10)
cs.assert_called('POST', '/servers/1234/action')
cs.servers.backup(s, "ImageName", "daily", 10)
cs.assert_called('POST', '/servers/1234/action')
def test_migrate_server(self):
s = cs.servers.get(1234)
s.migrate()
cs.assert_called('POST', '/servers/1234/action')
cs.servers.migrate(s)
cs.assert_called('POST', '/servers/1234/action')
def test_add_fixed_ip(self):
s = cs.servers.get(1234)
s.add_fixed_ip(1)
cs.assert_called('POST', '/servers/1234/action')
cs.servers.add_fixed_ip(s, 1)
cs.assert_called('POST', '/servers/1234/action')
def test_remove_fixed_ip(self):
s = cs.servers.get(1234)
s.remove_fixed_ip('10.0.0.1')
cs.assert_called('POST', '/servers/1234/action')
cs.servers.remove_fixed_ip(s, '10.0.0.1')
cs.assert_called('POST', '/servers/1234/action')

316
tests/v1_0/test_shell.py Normal file
View File

@ -0,0 +1,316 @@
import os
import mock
from novaclient.shell import OpenStackComputeShell
from novaclient import exceptions
from tests.v1_0 import fakes
from tests import utils
class ShellTest(utils.TestCase):
def setUp(self):
"""Run before each test."""
self.old_environment = os.environ.copy()
os.environ = {
'NOVA_USERNAME': 'username',
'NOVA_API_KEY': 'password',
'NOVA_PROJECT_ID': 'project_id',
'NOVA_VERSION': '1.0',
}
self.shell = OpenStackComputeShell()
self.shell.get_api_class = lambda *_: fakes.FakeClient
def tearDown(self):
os.environ = self.old_environment
def run_command(self, cmd):
self.shell.main(cmd.split())
def assert_called(self, method, url, body=None):
return self.shell.cs.assert_called(method, url, body)
def assert_called_anytime(self, method, url, body=None):
return self.shell.cs.assert_called_anytime(method, url, body)
def test_backup_schedule(self):
self.run_command('backup-schedule 1234')
self.assert_called('GET', '/servers/1234/backup_schedule')
self.run_command('backup-schedule sample-server --weekly monday')
self.assert_called(
'POST', '/servers/1234/backup_schedule',
{'backupSchedule': {'enabled': True, 'daily': 'DISABLED',
'weekly': 'MONDAY'}}
)
self.run_command('backup-schedule sample-server '
'--weekly disabled --daily h_0000_0200')
self.assert_called(
'POST', '/servers/1234/backup_schedule',
{'backupSchedule': {'enabled': True, 'daily': 'H_0000_0200',
'weekly': 'DISABLED'}}
)
self.run_command('backup-schedule sample-server --disable')
self.assert_called(
'POST', '/servers/1234/backup_schedule',
{'backupSchedule': {'enabled': False, 'daily': 'DISABLED',
'weekly': 'DISABLED'}}
)
def test_backup_schedule_delete(self):
self.run_command('backup-schedule-delete 1234')
self.assert_called('DELETE', '/servers/1234/backup_schedule')
def test_boot(self):
self.run_command('boot --image 1 some-server')
self.assert_called(
'POST', '/servers',
{'server': {'flavorId': 1, 'name': 'some-server', 'imageId': '1',
'min_count': 1, 'max_count': 1}}
)
self.run_command('boot --image 1 --meta foo=bar --meta spam=eggs some-server ')
self.assert_called(
'POST', '/servers',
{'server': {'flavorId': 1, 'name': 'some-server', 'imageId': '1',
'min_count': 1, 'max_count': 1,
'metadata': {'foo': 'bar', 'spam': 'eggs'}}}
)
def test_boot_files(self):
testfile = os.path.join(os.path.dirname(__file__), 'testfile.txt')
expected_file_data = open(testfile).read().encode('base64')
self.run_command('boot some-server --image 1 --file /tmp/foo=%s --file /tmp/bar=%s' %
(testfile, testfile))
self.assert_called(
'POST', '/servers',
{'server': {'flavorId': 1, 'name': 'some-server', 'imageId': '1',
'min_count': 1, 'max_count': 1,
'personality': [
{'path': '/tmp/bar', 'contents': expected_file_data},
{'path': '/tmp/foo', 'contents': expected_file_data}
]}
}
)
def test_boot_invalid_file(self):
invalid_file = os.path.join(os.path.dirname(__file__), 'asdfasdfasdfasdf')
self.assertRaises(exceptions.CommandError, self.run_command, 'boot some-server --image 1 '
'--file /foo=%s' % invalid_file)
def test_boot_key_auto(self):
mock_exists = mock.Mock(return_value=True)
mock_open = mock.Mock()
mock_open.return_value = mock.Mock()
mock_open.return_value.read = mock.Mock(return_value='SSHKEY')
@mock.patch('os.path.exists', mock_exists)
@mock.patch('__builtin__.open', mock_open)
def test_shell_call():
self.run_command('boot some-server --image 1 --key')
self.assert_called(
'POST', '/servers',
{'server': {'flavorId': 1, 'name': 'some-server', 'imageId': '1',
'min_count': 1, 'max_count': 1,
'personality': [{
'path': '/root/.ssh/authorized_keys2',
'contents': ('SSHKEY').encode('base64')},
]}
}
)
test_shell_call()
def test_boot_key_auto_no_keys(self):
mock_exists = mock.Mock(return_value=False)
@mock.patch('os.path.exists', mock_exists)
def test_shell_call():
self.assertRaises(exceptions.CommandError, self.run_command,
'boot some-server --image 1 --key')
test_shell_call()
def test_boot_key_file(self):
testfile = os.path.join(os.path.dirname(__file__), 'testfile.txt')
expected_file_data = open(testfile).read().encode('base64')
self.run_command('boot some-server --image 1 --key %s' % testfile)
self.assert_called(
'POST', '/servers',
{'server': {'flavorId': 1, 'name': 'some-server', 'imageId': '1',
'min_count': 1, 'max_count': 1,
'personality': [
{'path': '/root/.ssh/authorized_keys2', 'contents':
expected_file_data},
]}
}
)
def test_boot_invalid_keyfile(self):
invalid_file = os.path.join(os.path.dirname(__file__), 'asdfasdfasdfasdf')
self.assertRaises(exceptions.CommandError, self.run_command, 'boot some-server '
'--image 1 --key %s' % invalid_file)
def test_boot_ipgroup(self):
self.run_command('boot --image 1 --ipgroup 1 some-server')
self.assert_called(
'POST', '/servers',
{'server': {'flavorId': 1, 'name': 'some-server', 'imageId': '1',
'sharedIpGroupId': 1, 'min_count': 1, 'max_count': 1}}
)
def test_boot_ipgroup_name(self):
self.run_command('boot --image 1 --ipgroup group1 some-server')
self.assert_called(
'POST', '/servers',
{'server': {'flavorId': 1, 'name': 'some-server', 'imageId': '1',
'sharedIpGroupId': 1, 'min_count': 1, 'max_count': 1}}
)
def test_flavor_list(self):
self.run_command('flavor-list')
self.assert_called_anytime('GET', '/flavors/detail')
def test_image_list(self):
self.run_command('image-list')
self.assert_called('GET', '/images/detail')
def test_snapshot_create(self):
self.run_command('image-create sample-server mysnapshot')
self.assert_called(
'POST', '/images',
{'image': {'name': 'mysnapshot', 'serverId': 1234}}
)
def test_image_delete(self):
self.run_command('image-delete 1')
self.assert_called('DELETE', '/images/1')
def test_ip_share(self):
self.run_command('ip-share sample-server 1 1.2.3.4')
self.assert_called(
'PUT', '/servers/1234/ips/public/1.2.3.4',
{'shareIp': {'sharedIpGroupId': 1, 'configureServer': True}}
)
def test_ip_unshare(self):
self.run_command('ip-unshare sample-server 1.2.3.4')
self.assert_called('DELETE', '/servers/1234/ips/public/1.2.3.4')
def test_ipgroup_list(self):
self.run_command('ipgroup-list')
assert ('GET', '/shared_ip_groups/detail', None) in \
self.shell.cs.client.callstack
self.assert_called('GET', '/servers/5678')
def test_ipgroup_show(self):
self.run_command('ipgroup-show 1')
self.assert_called('GET', '/shared_ip_groups/1')
self.run_command('ipgroup-show group2')
# does a search, not a direct GET
self.assert_called('GET', '/shared_ip_groups/detail')
def test_ipgroup_create(self):
self.run_command('ipgroup-create a-group')
self.assert_called(
'POST', '/shared_ip_groups',
{'sharedIpGroup': {'name': 'a-group'}}
)
self.run_command('ipgroup-create a-group sample-server')
self.assert_called(
'POST', '/shared_ip_groups',
{'sharedIpGroup': {'name': 'a-group', 'server': 1234}}
)
def test_ipgroup_delete(self):
self.run_command('ipgroup-delete group1')
self.assert_called('DELETE', '/shared_ip_groups/1')
def test_list(self):
self.run_command('list')
self.assert_called('GET', '/servers/detail')
def test_reboot(self):
self.run_command('reboot sample-server')
self.assert_called('POST', '/servers/1234/action', {'reboot': {'type': 'SOFT'}})
self.run_command('reboot sample-server --hard')
self.assert_called('POST', '/servers/1234/action', {'reboot': {'type': 'HARD'}})
def test_rebuild(self):
self.run_command('rebuild sample-server 1')
self.assert_called('POST', '/servers/1234/action', {'rebuild': {'imageId': 1}})
def test_rename(self):
self.run_command('rename sample-server newname')
self.assert_called('PUT', '/servers/1234', {'server': {'name': 'newname'}})
def test_resize(self):
self.run_command('resize sample-server 1')
self.assert_called('POST', '/servers/1234/action', {'resize': {'flavorId': 1}})
def test_resize_confirm(self):
self.run_command('resize-confirm sample-server')
self.assert_called('POST', '/servers/1234/action', {'confirmResize': None})
def test_resize_revert(self):
self.run_command('resize-revert sample-server')
self.assert_called('POST', '/servers/1234/action', {'revertResize': None})
def test_backup(self):
self.run_command('backup sample-server mybackup daily 1')
self.assert_called(
'POST', '/servers/1234/action',
{'createBackup': {'name': 'mybackup', 'backup_type': 'daily',
'rotation': 1}}
)
@mock.patch('getpass.getpass', mock.Mock(return_value='p'))
def test_root_password(self):
self.run_command('root-password sample-server')
self.assert_called('PUT', '/servers/1234', {'server': {'adminPass': 'p'}})
def test_show(self):
self.run_command('show 1234')
# XXX need a way to test multiple calls
# self.assert_called('GET', '/servers/1234')
self.assert_called('GET', '/images/2')
def test_delete(self):
self.run_command('delete 1234')
self.assert_called('DELETE', '/servers/1234')
self.run_command('delete sample-server')
self.assert_called('DELETE', '/servers/1234')
def test_zone(self):
self.run_command('zone 1')
self.assert_called('GET', '/zones/1')
self.run_command('zone 1 --api_url=http://zzz --zone_username=frank --password=xxx')
self.assert_called(
'PUT', '/zones/1',
{'zone': {'api_url': 'http://zzz', 'username': 'frank',
'password': 'xxx'}}
)
def test_zone_add(self):
self.run_command('zone-add http://zzz frank xxx 0.0 1.0')
self.assert_called(
'POST', '/zones',
{'zone': {'api_url': 'http://zzz', 'username': 'frank',
'password': 'xxx',
'weight_offset': '0.0', 'weight_scale': '1.0'}}
)
def test_zone_delete(self):
self.run_command('zone-delete 1')
self.assert_called('DELETE', '/zones/1')
def test_zone_list(self):
self.run_command('zone-list')
assert ('GET', '/zones/detail', None) in self.shell.cs.client.callstack

76
tests/v1_0/test_zones.py Normal file
View File

@ -0,0 +1,76 @@
import StringIO
from novaclient.v1_0 import zones
from tests.v1_0 import fakes
from tests import utils
os = fakes.FakeClient()
class ZonesTest(utils.TestCase):
def test_list_zones(self):
sl = os.zones.list()
os.assert_called('GET', '/zones/detail')
[self.assertTrue(isinstance(s, zones.Zone)) for s in sl]
def test_list_zones_undetailed(self):
sl = os.zones.list(detailed=False)
os.assert_called('GET', '/zones')
[self.assertTrue(isinstance(s, zones.Zone)) for s in sl]
def test_get_zone_details(self):
s = os.zones.get(1)
os.assert_called('GET', '/zones/1')
self.assertTrue(isinstance(s, zones.Zone))
self.assertEqual(s.id, 1)
self.assertEqual(s.api_url, 'http://foo.com')
def test_create_zone(self):
s = os.zones.create(api_url="http://foo.com", username='bob',
password='xxx')
os.assert_called('POST', '/zones')
self.assertTrue(isinstance(s, zones.Zone))
def test_update_zone(self):
s = os.zones.get(1)
# Update via instance
s.update(api_url='http://blah.com')
os.assert_called('PUT', '/zones/1')
s.update(api_url='http://blah.com', username='alice', password='xxx')
os.assert_called('PUT', '/zones/1')
# Silly, but not an error
s.update()
# Update via manager
os.zones.update(s, api_url='http://blah.com')
os.assert_called('PUT', '/zones/1')
os.zones.update(1, api_url='http://blah.com')
os.assert_called('PUT', '/zones/1')
os.zones.update(s, api_url='http://blah.com', username='fred',
password='zip')
os.assert_called('PUT', '/zones/1')
def test_delete_zone(self):
s = os.zones.get(1)
s.delete()
os.assert_called('DELETE', '/zones/1')
os.zones.delete(1)
os.assert_called('DELETE', '/zones/1')
os.zones.delete(s)
os.assert_called('DELETE', '/zones/1')
def test_find_zone(self):
s = os.zones.find(password='qwerty')
os.assert_called('GET', '/zones/detail')
self.assertEqual(s.username, 'bob')
# Find with multiple results returns the first item
s = os.zones.find(api_url='http://foo.com')
sl = os.zones.findall(api_url='http://foo.com')
self.assertEqual(sl[0], s)
self.assertEqual([s.id for s in sl], [1, 2])

1
tests/v1_0/testfile.txt Normal file
View File

@ -0,0 +1 @@
BLAH

29
tests/v1_0/utils.py Normal file
View File

@ -0,0 +1,29 @@
from nose.tools import ok_
def fail(msg):
raise AssertionError(msg)
def assert_in(thing, seq, msg=None):
msg = msg or "'%s' not found in %s" % (thing, seq)
ok_(thing in seq, msg)
def assert_not_in(thing, seq, msg=None):
msg = msg or "unexpected '%s' found in %s" % (thing, seq)
ok_(thing not in seq, msg)
def assert_has_keys(dict, required=[], optional=[]):
keys = dict.keys()
for k in required:
assert_in(k, keys, "required key %s missing from %s" % (k, dict))
allowed_keys = set(required) | set(optional)
extra_keys = set(keys).difference(set(required + optional))
if extra_keys:
fail("found unexpected keys: %s" % list(extra_keys))
def assert_isinstance(thing, kls):
ok_(isinstance(thing, kls), "%s is not an instance of %s" % (thing, kls))

0
tests/v1_1/__init__.py Normal file
View File

373
tests/v1_1/fakes.py Normal file
View File

@ -0,0 +1,373 @@
import httplib2
import urllib
import urlparse
from novaclient import client as base_client
from novaclient.v1_1 import client
from tests import fakes
class FakeClient(fakes.FakeClient, client.Client):
def __init__(self, *args, **kwargs):
client.Client.__init__(self, 'username', 'apikey',
'project_id', 'auth_url')
self.client = FakeHTTPClient(**kwargs)
class FakeHTTPClient(base_client.HTTPClient):
def __init__(self, **kwargs):
self.username = 'username'
self.apikey = 'apikey'
self.auth_url = 'auth_url'
self.callstack = []
def _cs_request(self, url, method, **kwargs):
# Check that certain things are called correctly
if method in ['GET', 'DELETE']:
assert 'body' not in kwargs
elif method in ['PUT', 'POST']:
assert 'body' in kwargs
# Call the method
munged_url = url.strip('/').replace('/', '_').replace('.', '_')
callback = "%s_%s" % (method.lower(), munged_url)
if not hasattr(self, callback):
raise AssertionError('Called unknown API method: %s %s' % (method, url))
# Note the call
self.callstack.append((method, url, kwargs.get('body', None)))
status, body = getattr(self, callback)(**kwargs)
return httplib2.Response({"status": status}), body
def _munge_get_url(self, url):
return url
#
# Limits
#
def get_limits(self, **kw):
return (200, {"limits": {
"rate": [
{
"verb": "POST",
"URI": "*",
"regex": ".*",
"value": 10,
"remaining": 2,
"unit": "MINUTE",
"resetTime": 1244425439
},
{
"verb": "POST",
"URI": "*/servers",
"regex": "^/servers",
"value": 50,
"remaining": 49,
"unit": "DAY", "resetTime": 1244511839
},
{
"verb": "PUT",
"URI": "*",
"regex": ".*",
"value": 10,
"remaining": 2,
"unit": "MINUTE",
"resetTime": 1244425439
},
{
"verb": "GET",
"URI": "*changes-since*",
"regex": "changes-since",
"value": 3,
"remaining": 3,
"unit": "MINUTE",
"resetTime": 1244425439
},
{
"verb": "DELETE",
"URI": "*",
"regex": ".*",
"value": 100,
"remaining": 100,
"unit": "MINUTE",
"resetTime": 1244425439
}
],
"absolute": {
"maxTotalRAMSize": 51200,
"maxIPGroups": 50,
"maxIPGroupMembers": 25
}
}})
#
# Servers
#
def get_servers(self, **kw):
return (200, {"servers": [
{'id': 1234, 'name': 'sample-server'},
{'id': 5678, 'name': 'sample-server2'}
]})
def get_servers_detail(self, **kw):
return (200, {"servers": [
{
"id": 1234,
"name": "sample-server",
"image": {
"id": 2,
"name": "sample image",
},
"flavor": {
"id": 1,
"name": "256 MB Server",
},
"hostId": "e4d909c290d0fb1ca068ffaddf22cbd0",
"status": "BUILD",
"progress": 60,
"addresses": {
"public": [{
"version": 4,
"addr": "1.2.3.4",
},
{
"version": 4,
"addr": "5.6.7.8",
}],
"private": [{
"version": 4,
"addr": "10.11.12.13",
}],
},
"metadata": {
"Server Label": "Web Head 1",
"Image Version": "2.1"
}
},
{
"id": 5678,
"name": "sample-server2",
"image": {
"id": 2,
"name": "sample image",
},
"flavor": {
"id": 1,
"name": "256 MB Server",
},
"hostId": "9e107d9d372bb6826bd81d3542a419d6",
"status": "ACTIVE",
"addresses": {
"public": [{
"version": 4,
"addr": "4.5.6.7",
},
{
"version": 4,
"addr": "5.6.9.8",
}],
"private": [{
"version": 4,
"addr": "10.13.12.13",
}],
},
"metadata": {
"Server Label": "DB 1"
}
}
]})
def post_servers(self, body, **kw):
assert body.keys() == ['server']
fakes.assert_has_keys(body['server'],
required=['name', 'imageRef', 'flavorRef'],
optional=['metadata', 'personality'])
if 'personality' in body['server']:
for pfile in body['server']['personality']:
fakes.assert_has_keys(pfile, required=['path', 'contents'])
return (202, self.get_servers_1234()[1])
def get_servers_1234(self, **kw):
r = {'server': self.get_servers_detail()[1]['servers'][0]}
return (200, r)
def get_servers_5678(self, **kw):
r = {'server': self.get_servers_detail()[1]['servers'][1]}
return (200, r)
def put_servers_1234(self, body, **kw):
assert body.keys() == ['server']
fakes.assert_has_keys(body['server'], optional=['name', 'adminPass'])
return (204, None)
def delete_servers_1234(self, **kw):
return (202, None)
#
# Server Addresses
#
def get_servers_1234_ips(self, **kw):
return (200, {'addresses':
self.get_servers_1234()[1]['server']['addresses']})
def get_servers_1234_ips_public(self, **kw):
return (200, {'public':
self.get_servers_1234_ips()[1]['addresses']['public']})
def get_servers_1234_ips_private(self, **kw):
return (200, {'private':
self.get_servers_1234_ips()[1]['addresses']['private']})
def delete_servers_1234_ips_public_1_2_3_4(self, **kw):
return (202, None)
#
# Server actions
#
def post_servers_1234_action(self, body, **kw):
assert len(body.keys()) == 1
action = body.keys()[0]
if action == 'reboot':
assert body[action].keys() == ['type']
assert body[action]['type'] in ['HARD', 'SOFT']
elif action == 'rebuild':
assert body[action].keys() == ['imageRef']
elif action == 'resize':
assert body[action].keys() == ['flavorRef']
elif action == 'confirmResize':
assert body[action] is None
# This one method returns a different response code
return (204, None)
elif action == 'revertResize':
assert body[action] is None
elif action == 'migrate':
assert body[action] is None
elif action == 'addFixedIp':
assert body[action].keys() == ['networkId']
elif action == 'removeFixedIp':
assert body[action].keys() == ['address']
elif action == 'createImage':
assert set(body[action].keys()) == set(['name', 'metadata'])
elif action == 'changePassword':
assert body[action].keys() == ['adminPass']
else:
raise AssertionError("Unexpected server action: %s" % action)
return (202, None)
#
# Flavors
#
def get_flavors(self, **kw):
return (200, {'flavors': [
{'id': 1, 'name': '256 MB Server'},
{'id': 2, 'name': '512 MB Server'}
]})
def get_flavors_detail(self, **kw):
return (200, {'flavors': [
{'id': 1, 'name': '256 MB Server', 'ram': 256, 'disk': 10},
{'id': 2, 'name': '512 MB Server', 'ram': 512, 'disk': 20}
]})
def get_flavors_1(self, **kw):
return (200, {'flavor': self.get_flavors_detail()[1]['flavors'][0]})
def get_flavors_2(self, **kw):
return (200, {'flavor': self.get_flavors_detail()[1]['flavors'][1]})
#
# Images
#
def get_images(self, **kw):
return (200, {'images': [
{'id': 1, 'name': 'CentOS 5.2'},
{'id': 2, 'name': 'My Server Backup'}
]})
def get_images_detail(self, **kw):
return (200, {'images': [
{
'id': 1,
'name': 'CentOS 5.2',
"updated": "2010-10-10T12:00:00Z",
"created": "2010-08-10T12:00:00Z",
"status": "ACTIVE"
},
{
"id": 743,
"name": "My Server Backup",
"serverId": 12,
"updated": "2010-10-10T12:00:00Z",
"created": "2010-08-10T12:00:00Z",
"status": "SAVING",
"progress": 80
}
]})
def get_images_1(self, **kw):
return (200, {'image': self.get_images_detail()[1]['images'][0]})
def get_images_2(self, **kw):
return (200, {'image': self.get_images_detail()[1]['images'][1]})
def post_images(self, body, **kw):
assert body.keys() == ['image']
fakes.assert_has_keys(body['image'], required=['serverId', 'name'])
return (202, self.get_images_1()[1])
def delete_images_1(self, **kw):
return (204, None)
#
# Zones
#
def get_zones(self, **kw):
return (200, {'zones': [
{'id': 1, 'api_url': 'http://foo.com', 'username': 'bob'},
{'id': 2, 'api_url': 'http://foo.com', 'username': 'alice'},
]})
def get_zones_detail(self, **kw):
return (200, {'zones': [
{'id': 1, 'api_url': 'http://foo.com', 'username': 'bob',
'password': 'qwerty'},
{'id': 2, 'api_url': 'http://foo.com', 'username': 'alice',
'password': 'password'}
]})
def get_zones_1(self, **kw):
r = {'zone': self.get_zones_detail()[1]['zones'][0]}
return (200, r)
def get_zones_2(self, **kw):
r = {'zone': self.get_zones_detail()[1]['zones'][1]}
return (200, r)
def post_zones(self, body, **kw):
assert body.keys() == ['zone']
fakes.assert_has_keys(body['zone'],
required=['api_url', 'username', 'password'],
optional=['weight_offset', 'weight_scale'])
return (202, self.get_zones_1()[1])
def put_zones_1(self, body, **kw):
assert body.keys() == ['zone']
fakes.assert_has_keys(body['zone'], optional=['api_url', 'username',
'password',
'weight_offset',
'weight_scale'])
return (204, None)
def delete_zones_1(self, **kw):
return (202, None)

74
tests/v1_1/test_auth.py Normal file
View File

@ -0,0 +1,74 @@
import httplib2
import mock
from novaclient.v1_1 import client
from novaclient import exceptions
from tests import utils
class AuthenticationTests(utils.TestCase):
def test_authenticate_success(self):
cs = client.Client("username", "apikey", "project_id", "auth_url")
management_url = 'https://servers.api.rackspacecloud.com/v1.1/443470'
auth_response = httplib2.Response({
'status': 204,
'x-server-management-url': management_url,
'x-auth-token': '1b751d74-de0c-46ae-84f0-915744b582d1',
})
mock_request = mock.Mock(return_value=(auth_response, None))
@mock.patch.object(httplib2.Http, "request", mock_request)
def test_auth_call():
cs.client.authenticate()
headers={
'X-Auth-User': 'username',
'X-Auth-Key': 'apikey',
'X-Auth-Project-Id': 'project_id',
'User-Agent': cs.client.USER_AGENT
}
mock_request.assert_called_with(cs.client.auth_url, 'GET',
headers=headers)
self.assertEqual(cs.client.management_url,
auth_response['x-server-management-url'])
self.assertEqual(cs.client.auth_token,
auth_response['x-auth-token'])
test_auth_call()
def test_authenticate_failure(self):
cs = client.Client("username", "apikey", "project_id", "auth_url")
auth_response = httplib2.Response({'status': 401})
mock_request = mock.Mock(return_value=(auth_response, None))
@mock.patch.object(httplib2.Http, "request", mock_request)
def test_auth_call():
self.assertRaises(exceptions.Unauthorized, cs.client.authenticate)
test_auth_call()
def test_auth_automatic(self):
cs = client.Client("username", "apikey", "project_id", "auth_url")
http_client = cs.client
http_client.management_url = ''
mock_request = mock.Mock(return_value=(None, None))
@mock.patch.object(http_client, 'request', mock_request)
@mock.patch.object(http_client, 'authenticate')
def test_auth_call(m):
http_client.get('/')
m.assert_called()
mock_request.assert_called()
test_auth_call()
def test_auth_manual(self):
cs = client.Client("username", "apikey", "project_id", "auth_url")
@mock.patch.object(cs.client, 'authenticate')
def test_auth_call(m):
cs.authenticate()
m.assert_called()
test_auth_call()

View File

@ -0,0 +1,38 @@
from novaclient import exceptions
from novaclient.v1_1 import flavors
from tests.v1_1 import fakes
from tests import utils
cs = fakes.FakeClient()
class FlavorsTest(utils.TestCase):
def test_list_flavors(self):
fl = cs.flavors.list()
cs.assert_called('GET', '/flavors/detail')
[self.assertTrue(isinstance(f, flavors.Flavor)) for f in fl]
def test_list_flavors_undetailed(self):
fl = cs.flavors.list(detailed=False)
cs.assert_called('GET', '/flavors')
[self.assertTrue(isinstance(f, flavors.Flavor)) for f in fl]
def test_get_flavor_details(self):
f = cs.flavors.get(1)
cs.assert_called('GET', '/flavors/1')
self.assertTrue(isinstance(f, flavors.Flavor))
self.assertEqual(f.ram, 256)
self.assertEqual(f.disk, 10)
def test_find(self):
f = cs.flavors.find(ram=256)
cs.assert_called('GET', '/flavors/detail')
self.assertEqual(f.name, '256 MB Server')
f = cs.flavors.find(disk=20)
self.assertEqual(f.name, '512 MB Server')
self.assertRaises(exceptions.NotFound, cs.flavors.find, disk=12345)

40
tests/v1_1/test_images.py Normal file
View File

@ -0,0 +1,40 @@
from novaclient.v1_1 import images
from tests.v1_1 import fakes
from tests import utils
cs = fakes.FakeClient()
class ImagesTest(utils.TestCase):
def test_list_images(self):
il = cs.images.list()
cs.assert_called('GET', '/images/detail')
[self.assertTrue(isinstance(i, images.Image)) for i in il]
def test_list_images_undetailed(self):
il = cs.images.list(detailed=False)
cs.assert_called('GET', '/images')
[self.assertTrue(isinstance(i, images.Image)) for i in il]
def test_get_image_details(self):
i = cs.images.get(1)
cs.assert_called('GET', '/images/1')
self.assertTrue(isinstance(i, images.Image))
self.assertEqual(i.id, 1)
self.assertEqual(i.name, 'CentOS 5.2')
def test_delete_image(self):
cs.images.delete(1)
cs.assert_called('DELETE', '/images/1')
def test_find(self):
i = cs.images.find(name="CentOS 5.2")
self.assertEqual(i.id, 1)
cs.assert_called('GET', '/images/detail')
iml = cs.images.findall(status='SAVING')
self.assertEqual(len(iml), 1)
self.assertEqual(iml[0].name, 'My Server Backup')

135
tests/v1_1/test_servers.py Normal file
View File

@ -0,0 +1,135 @@
import StringIO
from novaclient.v1_1 import servers
from tests.v1_1 import fakes
from tests import utils
cs = fakes.FakeClient()
class ServersTest(utils.TestCase):
def test_list_servers(self):
sl = cs.servers.list()
cs.assert_called('GET', '/servers/detail')
[self.assertTrue(isinstance(s, servers.Server)) for s in sl]
def test_list_servers_undetailed(self):
sl = cs.servers.list(detailed=False)
cs.assert_called('GET', '/servers')
[self.assertTrue(isinstance(s, servers.Server)) for s in sl]
def test_get_server_details(self):
s = cs.servers.get(1234)
cs.assert_called('GET', '/servers/1234')
self.assertTrue(isinstance(s, servers.Server))
self.assertEqual(s.id, 1234)
self.assertEqual(s.status, 'BUILD')
def test_create_server(self):
s = cs.servers.create(
name="My server",
image=1,
flavor=1,
meta={'foo': 'bar'},
files={
'/etc/passwd': 'some data', # a file
'/tmp/foo.txt': StringIO.StringIO('data') # a stream
}
)
cs.assert_called('POST', '/servers')
self.assertTrue(isinstance(s, servers.Server))
def test_update_server(self):
s = cs.servers.get(1234)
# Update via instance
s.update(name='hi')
cs.assert_called('PUT', '/servers/1234')
s.update(name='hi')
cs.assert_called('PUT', '/servers/1234')
# Silly, but not an error
s.update()
# Update via manager
cs.servers.update(s, name='hi')
cs.assert_called('PUT', '/servers/1234')
def test_delete_server(self):
s = cs.servers.get(1234)
s.delete()
cs.assert_called('DELETE', '/servers/1234')
cs.servers.delete(1234)
cs.assert_called('DELETE', '/servers/1234')
cs.servers.delete(s)
cs.assert_called('DELETE', '/servers/1234')
def test_find(self):
s = cs.servers.find(name='sample-server')
cs.assert_called('GET', '/servers/detail')
self.assertEqual(s.name, 'sample-server')
# Find with multiple results arbitraility returns the first item
s = cs.servers.find(flavor={"id": 1, "name": "256 MB Server"})
sl = cs.servers.findall(flavor={"id": 1, "name": "256 MB Server"})
self.assertEqual(sl[0], s)
self.assertEqual([s.id for s in sl], [1234, 5678])
def test_reboot_server(self):
s = cs.servers.get(1234)
s.reboot()
cs.assert_called('POST', '/servers/1234/action')
cs.servers.reboot(s, type='HARD')
cs.assert_called('POST', '/servers/1234/action')
def test_rebuild_server(self):
s = cs.servers.get(1234)
s.rebuild(image=1)
cs.assert_called('POST', '/servers/1234/action')
cs.servers.rebuild(s, image=1)
cs.assert_called('POST', '/servers/1234/action')
def test_resize_server(self):
s = cs.servers.get(1234)
s.resize(flavor=1)
cs.assert_called('POST', '/servers/1234/action')
cs.servers.resize(s, flavor=1)
cs.assert_called('POST', '/servers/1234/action')
def test_confirm_resized_server(self):
s = cs.servers.get(1234)
s.confirm_resize()
cs.assert_called('POST', '/servers/1234/action')
cs.servers.confirm_resize(s)
cs.assert_called('POST', '/servers/1234/action')
def test_revert_resized_server(self):
s = cs.servers.get(1234)
s.revert_resize()
cs.assert_called('POST', '/servers/1234/action')
cs.servers.revert_resize(s)
cs.assert_called('POST', '/servers/1234/action')
def test_migrate_server(self):
s = cs.servers.get(1234)
s.migrate()
cs.assert_called('POST', '/servers/1234/action')
cs.servers.migrate(s)
cs.assert_called('POST', '/servers/1234/action')
def test_add_fixed_ip(self):
s = cs.servers.get(1234)
s.add_fixed_ip(1)
cs.assert_called('POST', '/servers/1234/action')
cs.servers.add_fixed_ip(s, 1)
cs.assert_called('POST', '/servers/1234/action')
def test_remove_fixed_ip(self):
s = cs.servers.get(1234)
s.remove_fixed_ip('10.0.0.1')
cs.assert_called('POST', '/servers/1234/action')
cs.servers.remove_fixed_ip(s, '10.0.0.1')
cs.assert_called('POST', '/servers/1234/action')

217
tests/v1_1/test_shell.py Normal file
View File

@ -0,0 +1,217 @@
import os
import mock
from novaclient.shell import OpenStackComputeShell
from novaclient import exceptions
from tests.v1_1 import fakes
from tests import utils
class ShellTest(utils.TestCase):
# Patch os.environ to avoid required auth info.
def setUp(self):
"""Run before each test."""
self.old_environment = os.environ.copy()
os.environ = {
'NOVA_USERNAME': 'username',
'NOVA_API_KEY': 'password',
'NOVA_PROJECT_ID': 'project_id',
'NOVA_VERSION': '1.1',
}
self.shell = OpenStackComputeShell()
self.shell.get_api_class = lambda *_: fakes.FakeClient
def tearDown(self):
os.environ = self.old_environment
def run_command(self, cmd):
self.shell.main(cmd.split())
def assert_called(self, method, url, body=None):
return self.shell.cs.assert_called(method, url, body)
def assert_called_anytime(self, method, url, body=None):
return self.shell.cs.assert_called_anytime(method, url, body)
def test_boot(self):
self.run_command('boot --image 1 some-server')
self.assert_called_anytime(
'POST', '/servers',
{'server': {
'flavorRef': 1,
'name': 'some-server',
'imageRef': '1',
'min_count': 1,
'max_count': 1,
}}
)
self.run_command('boot --image 1 --meta foo=bar --meta spam=eggs some-server ')
self.assert_called_anytime(
'POST', '/servers',
{'server': {
'flavorRef': 1,
'name': 'some-server',
'imageRef': '1',
'metadata': {'foo': 'bar', 'spam': 'eggs'},
'min_count': 1,
'max_count': 1,
}}
)
def test_boot_files(self):
testfile = os.path.join(os.path.dirname(__file__), 'testfile.txt')
expected_file_data = open(testfile).read().encode('base64')
cmd = 'boot some-server --image 1 --file /tmp/foo=%s --file /tmp/bar=%s'
self.run_command(cmd % (testfile, testfile))
self.assert_called_anytime(
'POST', '/servers',
{'server': {
'flavorRef': 1,
'name': 'some-server',
'imageRef': '1',
'min_count': 1,
'max_count': 1,
'personality': [
{'path': '/tmp/bar', 'contents': expected_file_data},
{'path': '/tmp/foo', 'contents': expected_file_data}
]}
}
)
def test_boot_invalid_file(self):
invalid_file = os.path.join(os.path.dirname(__file__), 'asdfasdfasdfasdf')
cmd = 'boot some-server --image 1 --file /foo=%s' % invalid_file
self.assertRaises(exceptions.CommandError, self.run_command, cmd)
def test_boot_key_auto(self):
mock_exists = mock.Mock(return_value=True)
mock_open = mock.Mock()
mock_open.return_value = mock.Mock()
mock_open.return_value.read = mock.Mock(return_value='SSHKEY')
@mock.patch('os.path.exists', mock_exists)
@mock.patch('__builtin__.open', mock_open)
def test_shell_call():
self.run_command('boot some-server --image 1 --key')
self.assert_called_anytime(
'POST', '/servers',
{'server': {
'flavorRef': 1,
'name': 'some-server',
'imageRef': '1',
'min_count': 1,
'max_count': 1,
'personality': [{
'path': '/root/.ssh/authorized_keys2',
'contents': ('SSHKEY').encode('base64')},
]}
}
)
test_shell_call()
def test_boot_key_auto_no_keys(self):
mock_exists = mock.Mock(return_value=False)
@mock.patch('os.path.exists', mock_exists)
def test_shell_call():
self.assertRaises(exceptions.CommandError, self.run_command,
'boot some-server --image 1 --key')
test_shell_call()
def test_boot_key_file(self):
testfile = os.path.join(os.path.dirname(__file__), 'testfile.txt')
expected_file_data = open(testfile).read().encode('base64')
self.run_command('boot some-server --image 1 --key %s' % testfile)
self.assert_called_anytime(
'POST', '/servers',
{'server': {
'flavorRef': 1,
'name': 'some-server',
'imageRef': '1',
'min_count': 1,
'max_count': 1,
'personality': [
{'path': '/root/.ssh/authorized_keys2',
'contents':expected_file_data},
]}
}
)
def test_boot_invalid_keyfile(self):
invalid_file = os.path.join(os.path.dirname(__file__), 'asdfasdfasdfasdf')
self.assertRaises(exceptions.CommandError, self.run_command, 'boot some-server '
'--image 1 --key %s' % invalid_file)
def test_flavor_list(self):
self.run_command('flavor-list')
self.assert_called_anytime('GET', '/flavors/detail')
def test_image_list(self):
self.run_command('image-list')
self.assert_called('GET', '/images/detail')
def test_create_image(self):
self.run_command('image-create sample-server mysnapshot')
self.assert_called(
'POST', '/servers/1234/action',
{'createImage': {'name': 'mysnapshot', 'metadata': {}}}
)
def test_image_delete(self):
self.run_command('image-delete 1')
self.assert_called('DELETE', '/images/1')
def test_list(self):
self.run_command('list')
self.assert_called('GET', '/servers/detail')
def test_reboot(self):
self.run_command('reboot sample-server')
self.assert_called('POST', '/servers/1234/action', {'reboot': {'type': 'SOFT'}})
self.run_command('reboot sample-server --hard')
self.assert_called('POST', '/servers/1234/action', {'reboot': {'type': 'HARD'}})
def test_rebuild(self):
self.run_command('rebuild sample-server 1')
self.assert_called('POST', '/servers/1234/action', {'rebuild': {'imageRef': 1}})
def test_rename(self):
self.run_command('rename sample-server newname')
self.assert_called('PUT', '/servers/1234', {'server': {'name': 'newname'}})
def test_resize(self):
self.run_command('resize sample-server 1')
self.assert_called('POST', '/servers/1234/action', {'resize': {'flavorRef': 1}})
def test_resize_confirm(self):
self.run_command('resize-confirm sample-server')
self.assert_called('POST', '/servers/1234/action', {'confirmResize': None})
def test_resize_revert(self):
self.run_command('resize-revert sample-server')
self.assert_called('POST', '/servers/1234/action', {'revertResize': None})
@mock.patch('getpass.getpass', mock.Mock(return_value='p'))
def test_root_password(self):
self.run_command('root-password sample-server')
self.assert_called('POST', '/servers/1234/action', {'changePassword': {'adminPass': 'p'}})
def test_show(self):
self.run_command('show 1234')
# XXX need a way to test multiple calls
# assert_called('GET', '/servers/1234')
self.assert_called('GET', '/images/2')
def test_delete(self):
self.run_command('delete 1234')
self.assert_called('DELETE', '/servers/1234')
self.run_command('delete sample-server')
self.assert_called('DELETE', '/servers/1234')

76
tests/v1_1/test_zones.py Normal file
View File

@ -0,0 +1,76 @@
import StringIO
from novaclient.v1_1 import zones
from tests.v1_1 import fakes
from tests import utils
os = fakes.FakeClient()
class ZonesTest(utils.TestCase):
def test_list_zones(self):
sl = os.zones.list()
os.assert_called('GET', '/zones/detail')
[self.assertTrue(isinstance(s, zones.Zone)) for s in sl]
def test_list_zones_undetailed(self):
sl = os.zones.list(detailed=False)
os.assert_called('GET', '/zones')
[self.assertTrue(isinstance(s, zones.Zone)) for s in sl]
def test_get_zone_details(self):
s = os.zones.get(1)
os.assert_called('GET', '/zones/1')
self.assertTrue(isinstance(s, zones.Zone))
self.assertEqual(s.id, 1)
self.assertEqual(s.api_url, 'http://foo.com')
def test_create_zone(self):
s = os.zones.create(api_url="http://foo.com", username='bob',
password='xxx')
os.assert_called('POST', '/zones')
self.assertTrue(isinstance(s, zones.Zone))
def test_update_zone(self):
s = os.zones.get(1)
# Update via instance
s.update(api_url='http://blah.com')
os.assert_called('PUT', '/zones/1')
s.update(api_url='http://blah.com', username='alice', password='xxx')
os.assert_called('PUT', '/zones/1')
# Silly, but not an error
s.update()
# Update via manager
os.zones.update(s, api_url='http://blah.com')
os.assert_called('PUT', '/zones/1')
os.zones.update(1, api_url='http://blah.com')
os.assert_called('PUT', '/zones/1')
os.zones.update(s, api_url='http://blah.com', username='fred',
password='zip')
os.assert_called('PUT', '/zones/1')
def test_delete_zone(self):
s = os.zones.get(1)
s.delete()
os.assert_called('DELETE', '/zones/1')
os.zones.delete(1)
os.assert_called('DELETE', '/zones/1')
os.zones.delete(s)
os.assert_called('DELETE', '/zones/1')
def test_find_zone(self):
s = os.zones.find(password='qwerty')
os.assert_called('GET', '/zones/detail')
self.assertEqual(s.username, 'bob')
# Find with multiple results returns the first item
s = os.zones.find(api_url='http://foo.com')
sl = os.zones.findall(api_url='http://foo.com')
self.assertEqual(sl[0], s)
self.assertEqual([s.id for s in sl], [1, 2])

1
tests/v1_1/testfile.txt Normal file
View File

@ -0,0 +1 @@
BLAH

29
tests/v1_1/utils.py Normal file
View File

@ -0,0 +1,29 @@
from nose.tools import ok_
def fail(msg):
raise AssertionError(msg)
def assert_in(thing, seq, msg=None):
msg = msg or "'%s' not found in %s" % (thing, seq)
ok_(thing in seq, msg)
def assert_not_in(thing, seq, msg=None):
msg = msg or "unexpected '%s' found in %s" % (thing, seq)
ok_(thing not in seq, msg)
def assert_has_keys(dict, required=[], optional=[]):
keys = dict.keys()
for k in required:
assert_in(k, keys, "required key %s missing from %s" % (k, dict))
allowed_keys = set(required) | set(optional)
extra_keys = set(keys).difference(set(required + optional))
if extra_keys:
fail("found unexpected keys: %s" % list(extra_keys))
def assert_isinstance(thing, kls):
ok_(isinstance(thing, kls), "%s is not an instance of %s" % (thing, kls))