add keystone support, new command interface, API v2.0

blueprint new-cli
Bug #1001053

Implement new commands interface, ready for v2.0. adopt cliff arch. new
client binary is quantumv2. After it is stable, we will remove quantum
binary. Httplibs2 is used.

usage: https://docs.google.com/document/d/1e_4UtnhFfgtnsB8EVB31BZKldaVzl_BlsGnGBrKmcDk/edit

Patch 2: add license header
Patch 3: add v1.0 support, fix show net details
Patch 4: quantumclient network api v2.0
Patch 5: subnet and port commands for api v2.0, add fields selector
Patch 6: add test cases
Patch 7: fix interactive mode, modify according to comments and https://review.openstack.org/#/c/8366/, add two tasks to BP: noauth and openstack common
Patch 8: fix log problem
Patch 9: modify according to the comments by dan on patch 5
Patch 10: just trigger jenkins
Patch 11: pep 1.3 fix
Patch 12: cliff and prettytable to more than 0.6.0
Patch 13: change setup.py to include more packages
Patch 14: pep check on jenkins
Patch 15: add license text to empty __init__.py files
Patch 16: fix v1.1 test cases after server changes

Change-Id: Ibbbdd834371c6a023b31e4797718fc0fe9786d89
This commit is contained in:
Yong Sheng Gong 2012-05-18 09:02:29 +08:00
parent f7086ed40a
commit dd803f8e26
32 changed files with 3549 additions and 113 deletions

View File

@ -30,11 +30,46 @@ gettext.install('quantumclient', unicode=1)
from quantumclient.common import exceptions
from quantumclient.common.serializer import Serializer
from quantumclient.common import utils
net_filters_v11_opt = []
net_filters_v11_opt.append({'--name':
{'help': _('name filter'), }, })
net_filters_v11_opt.append({'--op-status':
{'help': _('op-status filter'), }, })
net_filters_v11_opt.append({'--port-op-status':
{'help': _('port-op-status filter'), }, })
net_filters_v11_opt.append({'--port-state':
{'help': _('port-state filter'), }, })
net_filters_v11_opt.append({'--has-attachment':
{'help': _('has-attachment filter'), }, })
net_filters_v11_opt.append({'--attachment':
{'help': _('attachment filter'), }, })
net_filters_v11_opt.append({'--port':
{'help': _('port filter'), }, })
port_filters_v11_opt = []
port_filters_v11_opt.append({'--name':
{'help': _('name filter'), }, })
port_filters_v11_opt.append({'--op-status':
{'help': _('op-status filter'), }, })
port_filters_v11_opt.append({'--has-attachment':
{'help': _('has-attachment filter'), }, })
port_filters_v11_opt.append({'--attachement':
{'help': _('attachment filter'), }, })
net_filters_v11 = []
for arg in net_filters_v11_opt:
for key in arg.iterkeys():
net_filters_v11.append(key.split('--', 2)[1])
port_filters_v11 = []
for arg in port_filters_v11_opt:
for key in arg.iterkeys():
port_filters_v11.append(key.split('--', 2)[1])
LOG = logging.getLogger('quantumclient')
AUTH_TOKEN_HEADER = "X-Auth-Token"
@ -55,12 +90,11 @@ def exception_handler_v10(status_code, error_content):
430: 'portNotFound',
431: 'requestedStateInvalid',
432: 'portInUse',
440: 'alreadyAttached',
}
440: 'alreadyAttached', }
quantum_errors = {
400: exceptions.BadInputError,
401: exceptions.NotAuthorized,
401: exceptions.Unauthorized,
404: exceptions.NotFound,
420: exceptions.NetworkNotFoundClient,
421: exceptions.NetworkInUseClient,
@ -68,8 +102,7 @@ def exception_handler_v10(status_code, error_content):
431: exceptions.StateInvalidClient,
432: exceptions.PortInUseClient,
440: exceptions.AlreadyAttachedClient,
501: NotImplementedError,
}
501: NotImplementedError, }
# Find real error type
error_type = None
@ -105,7 +138,7 @@ def exception_handler_v11(status_code, error_content):
'RequestedStateInvalid': exceptions.StateInvalidClient,
'PortInUse': exceptions.PortInUseClient,
'AlreadyAttached': exceptions.AlreadyAttachedClient,
}
'QuantumServiceFault': exceptions.QuantumClientException, }
error_dict = None
if isinstance(error_content, dict):
@ -156,6 +189,33 @@ class ApiCall(object):
return with_params
class APIFilterCall(object):
def __init__(self, filters):
self.filters = filters
def __call__(self, f):
def wrapped_f(*args, **kwargs):
"""
Temporarily sets the format and tenant for this request
"""
instance = args[0]
(format, tenant) = (instance.format, instance.tenant)
if 'format' in kwargs:
instance.format = kwargs['format']
if 'format' not in self.filters:
del kwargs['format']
if 'tenant' in kwargs:
instance.tenant = kwargs['tenant']
if 'tenant' not in self.filters:
del kwargs['tenant']
ret = f(*args, **kwargs)
(instance.format, instance.tenant) = (format, tenant)
return ret
return wrapped_f
class Client(object):
"""A base client class - derived from Glance.BaseClient"""
@ -166,13 +226,10 @@ class Client(object):
"attributes": {
"network": ["id", "name"],
"port": ["id", "state"],
"attachment": ["id"]},
"attachment": ["id"], },
"plurals": {
"networks": "network",
"ports": "port",
},
},
}
"ports": "port", }, }, }
# Action query strings
networks_path = "/networks"
@ -298,19 +355,18 @@ class Client(object):
headers[AUTH_TOKEN_HEADER] = self.auth_token
# Open connection and send request, handling SSL certs
certs = {'key_file': self.key_file, 'cert_file': self.cert_file}
certs = dict((x, certs[x]) for x in certs if certs[x] != None)
certs = dict((x, certs[x]) for x in certs if certs[x] is not None)
if self.use_ssl and len(certs):
conn = connection_type(self.host, self.port, **certs)
else:
conn = connection_type(self.host, self.port)
# besides HTTP(s)Connection, we still have testConnection
if (LOG.isEnabledFor(logging.DEBUG) and
isinstance(conn, httplib.HTTPConnection)):
conn.set_debuglevel(1)
res = self._send_request(conn, method, action, body, headers)
status_code = self.get_status_code(res)
data = res.read()
utils.http_log(LOG, [method, action],
{'body': body,
'headers': headers,
}, res, data)
if self.logger:
self.logger.debug("Quantum Client Reply (code = %s) :\n %s" %
(str(status_code), data))
@ -531,7 +587,7 @@ class ClientV11(Client):
features specific to API v1.1 such as filters
"""
@ApiCall
@APIFilterCall(net_filters_v11)
def list_networks(self, **filters):
"""
Fetches a list of all networks for a tenant
@ -539,14 +595,14 @@ class ClientV11(Client):
# Pass filters in "params" argument to do_request
return self.get(self.networks_path, params=filters)
@ApiCall
@APIFilterCall(net_filters_v11)
def list_networks_details(self, **filters):
"""
Fetches a detailed list of all networks for a tenant
"""
return self.get(self.networks_path + self.detail_path, params=filters)
@ApiCall
@APIFilterCall(port_filters_v11)
def list_ports(self, network, **filters):
"""
Fetches a list of ports on a given network
@ -554,7 +610,7 @@ class ClientV11(Client):
# Pass filters in "params" argument to do_request
return self.get(self.ports_path % (network), params=filters)
@ApiCall
@APIFilterCall(port_filters_v11)
def list_ports_details(self, network, **filters):
"""
Fetches a detailed list of ports on a given network

View File

@ -29,7 +29,8 @@ from quantumclient import Client
from quantumclient import ClientV11
from quantumclient.common import exceptions
from quantumclient.common import utils
from quantumclient import net_filters_v11
from quantumclient import port_filters_v11
# Configure logger for client - cli logger is a child of it
# NOTE(salvatore-orlando): logger name does not map to package
@ -43,107 +44,80 @@ FORMAT = 'json'
commands_v10 = {
"list_nets": {
"func": cli_lib.list_nets,
"args": ["tenant-id"],
},
"args": ["tenant-id"], },
"list_nets_detail": {
"func": cli_lib.list_nets_detail,
"args": ["tenant-id"],
},
"args": ["tenant-id"], },
"create_net": {
"func": cli_lib.create_net,
"args": ["tenant-id", "net-name"],
},
"args": ["tenant-id", "net-name"], },
"delete_net": {
"func": cli_lib.delete_net,
"args": ["tenant-id", "net-id"],
},
"args": ["tenant-id", "net-id"], },
"show_net": {
"func": cli_lib.show_net,
"args": ["tenant-id", "net-id"],
},
"args": ["tenant-id", "net-id"], },
"show_net_detail": {
"func": cli_lib.show_net_detail,
"args": ["tenant-id", "net-id"],
},
"args": ["tenant-id", "net-id"], },
"update_net": {
"func": cli_lib.update_net,
"args": ["tenant-id", "net-id", "new-name"],
},
"args": ["tenant-id", "net-id", "new-name"], },
"list_ports": {
"func": cli_lib.list_ports,
"args": ["tenant-id", "net-id"],
},
"args": ["tenant-id", "net-id"], },
"list_ports_detail": {
"func": cli_lib.list_ports_detail,
"args": ["tenant-id", "net-id"],
},
"args": ["tenant-id", "net-id"], },
"create_port": {
"func": cli_lib.create_port,
"args": ["tenant-id", "net-id"],
},
"args": ["tenant-id", "net-id"], },
"delete_port": {
"func": cli_lib.delete_port,
"args": ["tenant-id", "net-id", "port-id"],
},
"args": ["tenant-id", "net-id", "port-id"], },
"update_port": {
"func": cli_lib.update_port,
"args": ["tenant-id", "net-id", "port-id", "params"],
},
"args": ["tenant-id", "net-id", "port-id", "params"], },
"show_port": {
"func": cli_lib.show_port,
"args": ["tenant-id", "net-id", "port-id"],
},
"args": ["tenant-id", "net-id", "port-id"], },
"show_port_detail": {
"func": cli_lib.show_port_detail,
"args": ["tenant-id", "net-id", "port-id"],
},
"args": ["tenant-id", "net-id", "port-id"], },
"plug_iface": {
"func": cli_lib.plug_iface,
"args": ["tenant-id", "net-id", "port-id", "iface-id"],
},
"args": ["tenant-id", "net-id", "port-id", "iface-id"], },
"unplug_iface": {
"func": cli_lib.unplug_iface,
"args": ["tenant-id", "net-id", "port-id"],
},
"args": ["tenant-id", "net-id", "port-id"], },
"show_iface": {
"func": cli_lib.show_iface,
"args": ["tenant-id", "net-id", "port-id"],
},
}
"args": ["tenant-id", "net-id", "port-id"], }, }
commands_v11 = commands_v10.copy()
commands_v11.update({
"list_nets": {
"func": cli_lib.list_nets_v11,
"args": ["tenant-id"],
"filters": ["name", "op-status", "port-op-status", "port-state",
"has-attachment", "attachment", "port"],
},
"filters": net_filters_v11, },
"list_nets_detail": {
"func": cli_lib.list_nets_detail_v11,
"args": ["tenant-id"],
"filters": ["name", "op-status", "port-op-status", "port-state",
"has-attachment", "attachment", "port"],
},
"filters": net_filters_v11, },
"list_ports": {
"func": cli_lib.list_ports_v11,
"args": ["tenant-id", "net-id"],
"filters": ["name", "op-status", "has-attachment", "attachment"],
},
"filters": port_filters_v11, },
"list_ports_detail": {
"func": cli_lib.list_ports_detail_v11,
"args": ["tenant-id", "net-id"],
"filters": ["name", "op-status", "has-attachment", "attachment"],
},
})
"filters": port_filters_v11, }, })
commands = {
'1.0': commands_v10,
'1.1': commands_v11,
}
'1.1': commands_v11, }
clients = {
'1.0': Client,
'1.1': ClientV11,
}
'1.1': ClientV11, }
def help(version):

View File

@ -201,8 +201,7 @@ interface: %(iface.id)s
plugged in Logical Port ID: %(port_id)s
on Virtual Network: %(network_id)s
for Tenant: %(tenant_id)s
""".strip(),
)
""".strip(), )
_templates_v11 = _templates_v10.copy()
_templates_v11.update(dict(
@ -236,13 +235,11 @@ operational status: %(port.op-status)s
interface: %(port.attachment.id)s
on Virtual Network: %(network_id)s
for Tenant: %(tenant_id)s
""".strip(),
))
""".strip(), ))
_templates = {
'1.0': _templates_v10,
'1.1': _templates_v11
}
'1.1': _templates_v11, }
def __init__(self, cmd, data, version):
super(CmdOutputTemplate, self).__init__(

200
quantumclient/client.py Normal file
View File

@ -0,0 +1,200 @@
# Copyright 2012 OpenStack LLC.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# vim: tabstop=4 shiftwidth=4 softtabstop=4
try:
import json
except ImportError:
import simplejson as json
import logging
import urlparse
# Python 2.5 compat fix
if not hasattr(urlparse, 'parse_qsl'):
import cgi
urlparse.parse_qsl = cgi.parse_qsl
import httplib2
from quantumclient.common import exceptions
from quantumclient.common import utils
_logger = logging.getLogger(__name__)
class ServiceCatalog(object):
"""Helper methods for dealing with a Keystone Service Catalog."""
def __init__(self, resource_dict):
self.catalog = resource_dict
def get_token(self):
"""Fetch token details fron service catalog"""
token = {'id': self.catalog['access']['token']['id'],
'expires': self.catalog['access']['token']['expires'], }
try:
token['user_id'] = self.catalog['access']['user']['id']
token['tenant_id'] = (
self.catalog['access']['token']['tenant']['id'])
except:
# just leave the tenant and user out if it doesn't exist
pass
return token
def url_for(self, attr=None, filter_value=None,
service_type='network', endpoint_type='adminURL'):
"""Fetch the admin URL from the Quantum service for
a particular endpoint attribute. If none given, return
the first. See tests for sample service catalog."""
catalog = self.catalog['access'].get('serviceCatalog', [])
matching_endpoints = []
for service in catalog:
if service['type'] != service_type:
continue
endpoints = service['endpoints']
for endpoint in endpoints:
if not filter_value or endpoint.get(attr) == filter_value:
matching_endpoints.append(endpoint)
if not matching_endpoints:
raise exceptions.EndpointNotFound()
elif len(matching_endpoints) > 1:
raise exceptions.AmbiguousEndpoints(message=matching_endpoints)
else:
return matching_endpoints[0][endpoint_type]
class HTTPClient(httplib2.Http):
"""Handles the REST calls and responses, include authn"""
USER_AGENT = 'python-quantumclient'
def __init__(self, username=None, tenant_name=None,
password=None, auth_url=None,
token=None, region_name=None, timeout=None,
endpoint_url=None, insecure=False, **kwargs):
super(HTTPClient, self).__init__(timeout=timeout)
self.username = username
self.tenant_name = tenant_name
self.password = password
self.auth_url = auth_url.rstrip('/') if auth_url else None
self.region_name = region_name
self.auth_token = token
self.content_type = 'application/json'
self.endpoint_url = endpoint_url
# httplib2 overrides
self.force_exception_to_status_code = True
self.disable_ssl_certificate_validation = insecure
def _cs_request(self, *args, **kwargs):
kargs = {}
kargs.setdefault('headers', kwargs.get('headers', {}))
kargs['headers']['User-Agent'] = self.USER_AGENT
if 'content_type' in kwargs:
kargs['headers']['Content-Type'] = kwargs['content_type']
kargs['headers']['Accept'] = kwargs['content_type']
else:
kargs['headers']['Content-Type'] = self.content_type
kargs['headers']['Accept'] = self.content_type
if 'body' in kwargs:
kargs['body'] = kwargs['body']
resp, body = self.request(*args, **kargs)
utils.http_log(_logger, args, kargs, resp, body)
status_code = self.get_status_code(resp)
if status_code == 401:
raise exceptions.Unauthorized(message=body)
elif status_code == 403:
raise exceptions.Forbidden(message=body)
return resp, body
def do_request(self, url, method, **kwargs):
if not self.endpoint_url or not self.auth_token:
self.authenticate()
# Perform the request once. If we get a 401 back then it
# might be because the auth token expired, so try to
# re-authenticate and try again. If it still fails, bail.
try:
kwargs.setdefault('headers', {})['X-Auth-Token'] = self.auth_token
resp, body = self._cs_request(self.endpoint_url + url, method,
**kwargs)
return resp, body
except exceptions.Unauthorized as ex:
if not self.endpoint_url or not self.auth_token:
self.authenticate()
resp, body = self._cs_request(
self.management_url + url, method, **kwargs)
return resp, body
else:
raise ex
def _extract_service_catalog(self, body):
""" Set the client's service catalog from the response data. """
self.service_catalog = ServiceCatalog(body)
try:
sc = self.service_catalog.get_token()
self.auth_token = sc['id']
self.auth_tenant_id = sc.get('tenant_id')
self.auth_user_id = sc.get('user_id')
except KeyError:
raise exceptions.Unauthorized()
self.endpoint_url = self.service_catalog.url_for(
attr='region', filter_value=self.region_name,
endpoint_type='adminURL')
def authenticate(self):
body = {'auth': {'passwordCredentials':
{'username': self.username,
'password': self.password, },
'tenantName': self.tenant_name, }, }
token_url = self.auth_url + "/tokens"
# Make sure we follow redirects when trying to reach Keystone
tmp_follow_all_redirects = self.follow_all_redirects
self.follow_all_redirects = True
try:
resp, body = self._cs_request(token_url, "POST",
body=json.dumps(body),
content_type="application/json")
finally:
self.follow_all_redirects = tmp_follow_all_redirects
status_code = self.get_status_code(resp)
if status_code != 200:
raise exceptions.Unauthorized(message=body)
if body:
try:
body = json.loads(body)
except ValueError:
pass
else:
body = None
self._extract_service_catalog(body)
def get_status_code(self, response):
"""
Returns the integer status code from the response, which
can be either a Webob.Response (used in testing) or httplib.Response
"""
if hasattr(response, 'status_int'):
return response.status_int
else:
return response.status

View File

@ -0,0 +1,79 @@
# Copyright 2012 OpenStack LLC.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# vim: tabstop=4 shiftwidth=4 softtabstop=4
"""Manage access to the clients, including authenticating when needed.
"""
import logging
from quantumclient.common import exceptions as exc
from quantumclient.quantum import client as quantum_client
from quantumclient.client import HTTPClient
LOG = logging.getLogger(__name__)
class ClientCache(object):
"""Descriptor class for caching created client handles.
"""
def __init__(self, factory):
self.factory = factory
self._handle = None
def __get__(self, instance, owner):
# Tell the ClientManager to login to keystone
if self._handle is None:
self._handle = self.factory(instance)
return self._handle
class ClientManager(object):
"""Manages access to API clients, including authentication.
"""
quantum = ClientCache(quantum_client.make_client)
def __init__(self, token=None, url=None,
auth_url=None,
tenant_name=None, tenant_id=None,
username=None, password=None,
region_name=None,
api_version=None,
):
self._token = token
self._url = url
self._auth_url = auth_url
self._tenant_name = tenant_name
self._tenant_id = tenant_id
self._username = username
self._password = password
self._region_name = region_name
self._api_version = api_version
self._service_catalog = None
return
def initialize(self):
if not self._url:
httpclient = HTTPClient(username=self._username,
tenant_name=self._tenant_name,
password=self._password,
region_name=self._region_name,
auth_url=self._auth_url)
httpclient.authenticate()
# Populate other password flow attributes
self._token = httpclient.auth_token
self._url = httpclient.endpoint_url

View File

@ -0,0 +1,35 @@
# Copyright 2012 OpenStack LLC.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# vim: tabstop=4 shiftwidth=4 softtabstop=4
"""
OpenStack base command
"""
from cliff.command import Command
class OpenStackCommand(Command):
"""Base class for OpenStack commands
"""
api = None
def run(self, parsed_args):
if not self.api:
return
else:
return super(OpenStackCommand, self).run(parsed_args)

View File

@ -87,10 +87,33 @@ class AlreadyAttachedClient(QuantumClientException):
pass
class NotAuthorized(QuantumClientException):
class Unauthorized(QuantumClientException):
"""
HTTP 401 - Unauthorized: bad credentials.
"""
pass
class Forbidden(QuantumClientException):
"""
HTTP 403 - Forbidden: your credentials don't give you access to this
resource.
"""
pass
class EndpointNotFound(QuantumClientException):
"""Could not find Service or Region in Service Catalog."""
pass
class AmbiguousEndpoints(QuantumClientException):
"""Found more than one matching endpoint in Service Catalog."""
def __str__(self):
return "AmbiguousEndpoints: %s" % repr(self.message)
class QuantumCLIError(QuantumClientException):
""" Exception raised when command line parsing fails """
pass
@ -120,3 +143,13 @@ class Invalid(Error):
class InvalidContentType(Invalid):
message = _("Invalid content type %(content_type)s.")
class UnsupportedVersion(Exception):
"""Indicates that the user is trying to use an unsupported
version of the API"""
pass
class CommandError(Exception):
pass

View File

@ -22,7 +22,11 @@
import datetime
import json
import logging
import os
import sys
from quantumclient.common import exceptions
def env(*vars, **kwargs):
@ -38,12 +42,12 @@ def env(*vars, **kwargs):
def to_primitive(value):
if type(value) is type([]) or type(value) is type((None,)):
if isinstance(value, list) or isinstance(value, tuple):
o = []
for v in value:
o.append(to_primitive(v))
return o
elif type(value) is type({}):
elif isinstance(value, dict):
o = {}
for k, v in value.iteritems():
o[k] = to_primitive(v)
@ -68,3 +72,88 @@ def dumps(value):
def loads(s):
return json.loads(s)
def import_class(import_str):
"""Returns a class from a string including module and class
:param import_str: a string representation of the class name
:rtype: the requested class
"""
mod_str, _sep, class_str = import_str.rpartition('.')
__import__(mod_str)
return getattr(sys.modules[mod_str], class_str)
def get_client_class(api_name, version, version_map):
"""Returns the client class for the requested API version
:param api_name: the name of the API, e.g. 'compute', 'image', etc
:param version: the requested API version
:param version_map: a dict of client classes keyed by version
:rtype: a client class for the requested API version
"""
try:
client_path = version_map[str(version)]
except (KeyError, ValueError):
msg = "Invalid %s client version '%s'. must be one of: %s" % (
(api_name, version, ', '.join(version_map.keys())))
raise exceptions.UnsupportedVersion(msg)
return import_class(client_path)
def get_item_properties(item, fields, mixed_case_fields=[], formatters={}):
"""Return a tuple containing the item properties.
:param item: a single item resource (e.g. Server, Tenant, etc)
:param fields: tuple of strings with the desired field names
:param mixed_case_fields: tuple of field names to preserve case
:param formatters: dictionary mapping field names to callables
to format the values
"""
row = []
for field in fields:
if field in formatters:
row.append(formatters[field](item))
else:
if field in mixed_case_fields:
field_name = field.replace(' ', '_')
else:
field_name = field.lower().replace(' ', '_')
if not hasattr(item, field_name) and isinstance(item, dict):
data = item[field_name]
else:
data = getattr(item, field_name, '')
row.append(data)
return tuple(row)
def __str2bool(strbool):
if strbool is None:
return None
else:
return strbool.lower() == 'true'
def http_log(_logger, args, kwargs, resp, body):
if not _logger.isEnabledFor(logging.DEBUG):
return
string_parts = ['curl -i']
for element in args:
if element in ('GET', 'POST'):
string_parts.append(' -X %s' % element)
else:
string_parts.append(' %s' % element)
for element in kwargs['headers']:
header = ' -H "%s: %s"' % (element, kwargs['headers'][element])
string_parts.append(header)
_logger.debug("REQ: %s\n" % "".join(string_parts))
if 'body' in kwargs and kwargs['body']:
_logger.debug("REQ BODY: %s\n" % (kwargs['body']))
_logger.debug("RESP:%s\n", resp)
_logger.debug("RESP BODY:%s\n", body)

View File

@ -0,0 +1,14 @@
# Copyright 2012 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

View File

@ -0,0 +1,68 @@
# Copyright 2012 OpenStack LLC.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# vim: tabstop=4 shiftwidth=4 softtabstop=4
import logging
import urlparse
from quantumclient.common import utils
LOG = logging.getLogger(__name__)
API_NAME = 'network'
API_VERSIONS = {
'1.0': 'quantumclient.Client',
'1.1': 'quantumclient.ClientV11',
'2.0': 'quantumclient.v2_0.client.Client',
}
def make_client(instance):
"""Returns an identity service client.
"""
quantum_client = utils.get_client_class(
API_NAME,
instance._api_version[API_NAME],
API_VERSIONS,
)
instance.initialize()
url = instance._url
url = url.rstrip("/")
client_full_name = (quantum_client.__module__ + "." +
quantum_client.__name__)
LOG.debug("we are using client: %s", client_full_name)
v1x = (client_full_name == API_VERSIONS['1.1'] or
client_full_name == API_VERSIONS['1.0'])
if v1x:
magic_tuple = urlparse.urlsplit(url)
scheme, netloc, path, query, frag = magic_tuple
host = magic_tuple.hostname
port = magic_tuple.port
use_ssl = scheme.lower().startswith('https')
client = quantum_client(host=host, port=port, use_ssl=use_ssl)
client.auth_token = instance._token
client.logger = LOG
return client
else:
client = quantum_client(username=instance._username,
tenant_name=instance._tenant_name,
password=instance._password,
region_name=instance._region_name,
auth_url=instance._auth_url,
endpoint_url=url,
token=instance._token)
return client

View File

@ -0,0 +1,62 @@
# Copyright 2012 OpenStack LLC.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# vim: tabstop=4 shiftwidth=4 softtabstop=4
import logging
from quantumclient.common import command
from quantumclient.common import utils
class QuantumCommand(command.OpenStackCommand):
api = 'network'
log = logging.getLogger(__name__ + '.QuantumCommand')
def get_parser(self, prog_name):
parser = super(QuantumCommand, self).get_parser(prog_name)
parser.add_argument(
'--request-format',
help=_('the xml or json request format'),
default='json',
choices=['json', 'xml', ], )
parser.add_argument(
'tenant_id', metavar='tenant-id',
help=_('the owner tenant ID'), )
return parser
class QuantumPortCommand(QuantumCommand):
api = 'network'
log = logging.getLogger(__name__ + '.QuantumPortCommand')
def get_parser(self, prog_name):
parser = super(QuantumPortCommand, self).get_parser(prog_name)
parser.add_argument(
'net_id', metavar='net-id',
help=_('the owner network ID'), )
return parser
class QuantumInterfaceCommand(QuantumPortCommand):
api = 'network'
log = logging.getLogger(__name__ + '.QuantumInterfaceCommand')
def get_parser(self, prog_name):
parser = super(QuantumInterfaceCommand, self).get_parser(prog_name)
parser.add_argument(
'port_id', metavar='port-id',
help=_('the owner Port ID'), )
return parser

View File

@ -0,0 +1,97 @@
# Copyright 2012 OpenStack LLC.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# vim: tabstop=4 shiftwidth=4 softtabstop=4
import logging
from cliff import show
from quantumclient.quantum.v1_1 import QuantumInterfaceCommand
class PlugInterface(QuantumInterfaceCommand):
"""Plug interface to a given port"""
api = 'network'
log = logging.getLogger(__name__ + '.PlugInterface')
def get_parser(self, prog_name):
parser = super(PlugInterface, self).get_parser(prog_name)
parser.add_argument(
'iface_id', metavar='iface-id',
help='_(ID of the interface to plug)', )
return parser
def run(self, parsed_args):
self.log.debug('run(%s)' % parsed_args)
quantum_client = self.app.client_manager.quantum
quantum_client.tenant = parsed_args.tenant_id
quantum_client.format = parsed_args.request_format
data = {'attachment': {'id': '%s' % parsed_args.iface_id, }, }
quantum_client.attach_resource(parsed_args.net_id,
parsed_args.port_id,
data)
print >>self.app.stdout, (_('Plugged interface %(interfaceid)s'
' into Logical Port %(portid)s')
% {'interfaceid': parsed_args.iface_id,
'portid': parsed_args.port_id, })
return
class UnPlugInterface(QuantumInterfaceCommand):
"""Unplug interface from a given port"""
api = 'network'
log = logging.getLogger(__name__ + '.UnPlugInterface')
def run(self, parsed_args):
self.log.debug('run(%s)' % parsed_args)
quantum_client = self.app.client_manager.quantum
quantum_client.tenant = parsed_args.tenant_id
quantum_client.format = parsed_args.request_format
quantum_client.detach_resource(parsed_args.net_id, parsed_args.port_id)
print >>self.app.stdout, (
_('Unplugged interface on Logical Port %(portid)s')
% {'portid': parsed_args.port_id, })
return
class ShowInterface(QuantumInterfaceCommand, show.ShowOne):
"""Show interface on a given port"""
api = 'network'
log = logging.getLogger(__name__ + '.ShowInterface')
def get_data(self, parsed_args):
self.log.debug('get_data(%s)' % parsed_args)
quantum_client = self.app.client_manager.quantum
quantum_client.tenant = parsed_args.tenant_id
quantum_client.format = parsed_args.request_format
iface = quantum_client.show_port_attachment(
parsed_args.net_id,
parsed_args.port_id)['attachment']
if iface:
if 'id' not in iface:
iface['id'] = '<none>'
else:
iface = {'': ''}
return zip(*sorted(iface.iteritems()))

View File

@ -0,0 +1,268 @@
# Copyright 2012 OpenStack LLC.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# vim: tabstop=4 shiftwidth=4 softtabstop=4
import logging
import itertools
from cliff import lister
from cliff import show
from quantumclient.common import exceptions
from quantumclient.common import utils
from quantumclient import net_filters_v11_opt
from quantumclient.quantum.v1_1 import QuantumCommand
class ListNetwork(QuantumCommand, lister.Lister):
"""List networks that belong to a given tenant"""
api = 'network'
log = logging.getLogger(__name__ + '.ListNetwork')
def get_parser(self, prog_name):
parser = super(ListNetwork, self).get_parser(prog_name)
parser.add_argument(
'--show-details',
help='show detailed info',
action='store_true',
default=False, )
for net_filter in net_filters_v11_opt:
option_key = net_filter.keys()[0]
option_defs = net_filter.get(option_key)
parser.add_argument(option_key, **option_defs)
return parser
def get_data(self, parsed_args):
self.log.debug('get_data(%s)' % parsed_args)
quantum_client = self.app.client_manager.quantum
search_opts = {
'tenant': parsed_args.tenant_id, }
for net_filter in net_filters_v11_opt:
option_key = net_filter.keys()[0]
arg = option_key[2:]
arg = arg.replace('-', '_')
arg_value = getattr(parsed_args, arg, None)
if arg_value is not None:
search_opts.update({option_key[2:]: arg_value, })
self.log.debug('search options: %s', search_opts)
quantum_client.format = parsed_args.request_format
columns = ('ID', )
data = None
if parsed_args.show_details:
data = quantum_client.list_networks_details(**search_opts)
# dict: {u'networks': [{u'op-status': u'UP',
# u'id': u'7a068b68-c736-42ab-9e43-c9d83c57627e',
# u'name': u'private'}]}
columns = ('ID', 'op-status', 'name', )
else:
data = quantum_client.list_networks(**search_opts)
# {u'networks': [{u'id':
# u'7a068b68-c736-42ab-9e43-c9d83c57627e'}]}
networks = []
if 'networks' in data:
networks = data['networks']
return (columns,
(utils.get_item_properties(
s, columns, formatters={}, ) for s in networks), )
def _format_attachment(port):
# attachment {u'id': u'gw-7a068b68-c7'}
try:
return ('attachment' in port and port['attachment'] and
'id' in port['attachment'] and
port['attachment']['id'] or '')
except Exception:
return ''
class ShowNetwork(QuantumCommand, show.ShowOne):
"""Show information of a given network"""
api = 'network'
log = logging.getLogger(__name__ + '.ShowNetwork')
def get_parser(self, prog_name):
parser = super(ShowNetwork, self).get_parser(prog_name)
parser.add_argument(
'net_id', metavar='net-id',
help='ID of network to display')
parser.add_argument(
'--show-details',
help='show detailed info of networks',
action='store_true',
default=False, )
return parser
def get_data(self, parsed_args):
self.log.debug('get_data(%s)' % parsed_args)
quantum_client = self.app.client_manager.quantum
quantum_client.tenant = parsed_args.tenant_id
quantum_client.format = parsed_args.request_format
data = None
if parsed_args.show_details:
data = quantum_client.show_network_details(parsed_args.net_id)
else:
data = quantum_client.show_network(parsed_args.net_id)
# {u'network': {u'op-status': u'UP', 'xmlns':
# u'http://openstack.org/quantum/api/v1.1', u'id':
# u'7a068b68-c736-42ab-9e43-c9d83c57627e', u'name': u'private'}}
network = {}
ports = None
network = 'network' in data and data['network'] or None
if network:
ports = network.pop('ports', None)
column_names, data = zip(*sorted(network.iteritems()))
if not parsed_args.columns:
columns_to_include = column_names
else:
columns_to_include = [c for c in column_names
if c in parsed_args.columns]
# Set up argument to compress()
selector = [(c in columns_to_include)
for c in column_names]
data = list(itertools.compress(data, selector))
formatter = self.formatters[parsed_args.formatter]
formatter.emit_one(columns_to_include, data,
self.app.stdout, parsed_args)
if ports:
print >>self.app.stdout, _('Network Ports:')
columns = ('op-status', 'state', 'id', 'attachment', )
column_names, data = (columns, (utils.get_item_properties(
s, columns, formatters={'attachment': _format_attachment}, )
for s in ports), )
if not parsed_args.columns:
columns_to_include = column_names
data_gen = data
else:
columns_to_include = [c for c in column_names
if c in parsed_args.columns]
if not columns_to_include:
raise ValueError(
'No recognized column names in %s' %
str(parsed_args.columns))
# Set up argument to compress()
selector = [(c in columns_to_include)
for c in column_names]
# Generator expression to only return the parts of a row
# of data that the user has expressed interest in
# seeing. We have to convert the compress() output to a
# list so the table formatter can ask for its length.
data_gen = (list(itertools.compress(row, selector))
for row in data)
formatter = self.formatters[parsed_args.formatter]
formatter.emit_list(columns_to_include,
data_gen, self.app.stdout, parsed_args)
return ('', [])
class CreateNetwork(QuantumCommand, show.ShowOne):
"""Create a network for a given tenant"""
api = 'network'
log = logging.getLogger(__name__ + '.CreateNetwork')
def get_parser(self, prog_name):
parser = super(CreateNetwork, self).get_parser(prog_name)
parser.add_argument(
'net_name', metavar='net-name',
help='Name of network to create')
return parser
def get_data(self, parsed_args):
self.log.debug('get_data(%s)' % parsed_args)
quantum_client = self.app.client_manager.quantum
quantum_client.tenant = parsed_args.tenant_id
quantum_client.format = parsed_args.request_format
body = {'network': {'name': parsed_args.net_name, }, }
network = quantum_client.create_network(body)
# {u'network': {u'id': u'e9424a76-6db4-4c93-97b6-ec311cd51f19'}}
info = 'network' in network and network['network'] or None
if info:
print >>self.app.stdout, _('Created a new Virtual Network:')
else:
info = {'': ''}
return zip(*sorted(info.iteritems()))
class DeleteNetwork(QuantumCommand):
"""Delete a given network"""
api = 'network'
log = logging.getLogger(__name__ + '.DeleteNetwork')
def get_parser(self, prog_name):
parser = super(DeleteNetwork, self).get_parser(prog_name)
parser.add_argument(
'net_id', metavar='net-id',
help='ID of network to delete')
return parser
def run(self, parsed_args):
self.log.debug('run(%s)' % parsed_args)
quantum_client = self.app.client_manager.quantum
quantum_client.tenant = parsed_args.tenant_id
quantum_client.format = parsed_args.request_format
quantum_client.delete_network(parsed_args.net_id)
print >>self.app.stdout, (_('Deleted Network: %(networkid)s')
% {'networkid': parsed_args.net_id})
return
class UpdateNetwork(QuantumCommand):
"""Update network's information"""
api = 'network'
log = logging.getLogger(__name__ + '.UpdateNetwork')
def get_parser(self, prog_name):
parser = super(UpdateNetwork, self).get_parser(prog_name)
parser.add_argument(
'net_id', metavar='net-id',
help='ID of network to update')
parser.add_argument(
'newvalues', metavar='field=newvalue[,field2=newvalue2]',
help='new values for the network')
return parser
def run(self, parsed_args):
self.log.debug('run(%s)' % parsed_args)
quantum_client = self.app.client_manager.quantum
quantum_client.tenant = parsed_args.tenant_id
quantum_client.format = parsed_args.request_format
field_values = parsed_args.newvalues
data = {'network': {}}
for kv in field_values.split(","):
try:
k, v = kv.split("=")
data['network'][k] = v
except ValueError:
raise exceptions.CommandError(
"malformed new values (field=newvalue): %s" % kv)
data['network']['id'] = parsed_args.net_id
quantum_client.update_network(parsed_args.net_id, data)
print >>self.app.stdout, (
_('Updated Network: %(networkid)s') %
{'networkid': parsed_args.net_id})
return

View File

@ -0,0 +1,222 @@
# Copyright 2012 OpenStack LLC.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# vim: tabstop=4 shiftwidth=4 softtabstop=4
import logging
from cliff import lister
from cliff import show
from quantumclient.common import exceptions
from quantumclient.common import utils
from quantumclient import port_filters_v11_opt
from quantumclient.quantum.v1_1 import QuantumPortCommand
class ListPort(QuantumPortCommand, lister.Lister):
"""List ports that belong to a given tenant's network"""
api = 'network'
log = logging.getLogger(__name__ + '.ListPort')
def get_parser(self, prog_name):
parser = super(ListPort, self).get_parser(prog_name)
parser.add_argument(
'--show-details',
help='show detailed info of networks',
action='store_true',
default=False, )
for item in port_filters_v11_opt:
option_key = item.keys()[0]
option_defs = item.get(option_key)
parser.add_argument(option_key, **option_defs)
return parser
def get_data(self, parsed_args):
self.log.debug('get_data(%s)' % parsed_args)
quantum_client = self.app.client_manager.quantum
quantum_client.tenant = parsed_args.tenant_id
quantum_client.format = parsed_args.request_format
search_opts = {
'tenant': parsed_args.tenant_id, }
for item in port_filters_v11_opt:
option_key = item.keys()[0]
arg = option_key[2:]
arg = arg.replace('-', '_')
arg_value = getattr(parsed_args, arg, None)
if arg_value is not None:
search_opts.update({option_key[2:]: arg_value, })
self.log.debug('search options: %s', search_opts)
columns = ('ID', )
data = None
if parsed_args.show_details:
data = quantum_client.list_ports_details(
parsed_args.net_id, **search_opts)
# dict:dict: {u'ports': [{
# u'op-status': u'DOWN',
# u'state': u'ACTIVE',
# u'id': u'479ba2b7-042f-44b9-aefb-b1550e114454'}, ]}
columns = ('ID', 'op-status', 'state')
else:
data = quantum_client.list_ports(parsed_args.net_id, **search_opts)
# {u'ports': [{u'id': u'7a068b68-c736-42ab-9e43-c9d83c57627e'}]}
ports = []
if 'ports' in data:
ports = data['ports']
return (columns,
(utils.get_item_properties(
s, columns, formatters={}, ) for s in ports), )
class ShowPort(QuantumPortCommand, show.ShowOne):
"""Show information of a given port"""
api = 'network'
log = logging.getLogger(__name__ + '.ShowPort')
def get_parser(self, prog_name):
parser = super(ShowPort, self).get_parser(prog_name)
parser.add_argument(
'port_id', metavar='port-id',
help='ID of the port to show', )
parser.add_argument(
'--show-details',
help='show detailed info',
action='store_true',
default=False, )
return parser
def get_data(self, parsed_args):
self.log.debug('get_data(%s)' % parsed_args)
quantum_client = self.app.client_manager.quantum
quantum_client.tenant = parsed_args.tenant_id
quantum_client.format = parsed_args.request_format
data = None
if parsed_args.show_details:
data = quantum_client.show_port_details(
parsed_args.net_id, parsed_args.port_id)
# {u'port': {u'op-status': u'DOWN', u'state': u'ACTIVE',
# u'id': u'479ba2b7-042f-44b9-aefb-
# b1550e114454', u'attachment': {u'id': u'gw-7a068b68-c7'}}}
else:
data = quantum_client.show_port(
parsed_args.net_id, parsed_args.port_id)
# {u'port': {u'op-status': u'DOWN', u'state': u'ACTIVE',
# u'id': u'479ba2b7-042f-44b9-aefb-b1550e114454'}}
port = 'port' in data and data['port'] or None
if port:
attachment = 'attachment' in port and port['attachment'] or None
if attachment:
interface = attachment['id']
port.update({'attachment': interface})
return zip(*sorted(port.iteritems()))
return ('', [])
class CreatePort(QuantumPortCommand, show.ShowOne):
"""Create port for a given network"""
api = 'network'
log = logging.getLogger(__name__ + '.CreatePort')
def get_parser(self, prog_name):
parser = super(CreatePort, self).get_parser(prog_name)
return parser
def get_data(self, parsed_args):
self.log.debug('get_data(%s)' % parsed_args)
quantum_client = self.app.client_manager.quantum
quantum_client.tenant = parsed_args.tenant_id
quantum_client.format = parsed_args.request_format
data = quantum_client.create_port(parsed_args.net_id)
# {u'network': {u'id': u'e9424a76-6db4-4c93-97b6-ec311cd51f19'}}
info = 'port' in data and data['port'] or None
if info:
print >>self.app.stdout, _('Created a new Logical Port:')
else:
info = {'': ''}
return zip(*sorted(info.iteritems()))
class DeletePort(QuantumPortCommand):
"""Delete a given port"""
api = 'network'
log = logging.getLogger(__name__ + '.DeletePort')
def get_parser(self, prog_name):
parser = super(DeletePort, self).get_parser(prog_name)
parser.add_argument(
'port_id', metavar='port-id',
help='ID of the port to delete', )
return parser
def run(self, parsed_args):
self.log.debug('run(%s)' % parsed_args)
quantum_client = self.app.client_manager.quantum
quantum_client.tenant = parsed_args.tenant_id
quantum_client.format = parsed_args.request_format
quantum_client.delete_port(parsed_args.net_id, parsed_args.port_id)
print >>self.app.stdout, (_('Deleted Logical Port: %(portid)s') %
{'portid': parsed_args.port_id})
return
class UpdatePort(QuantumPortCommand):
"""Update information of a given port"""
api = 'network'
log = logging.getLogger(__name__ + '.UpdatePort')
def get_parser(self, prog_name):
parser = super(UpdatePort, self).get_parser(prog_name)
parser.add_argument(
'port_id', metavar='port-id',
help='ID of the port to update', )
parser.add_argument(
'newvalues', metavar='field=newvalue[,field2=newvalue2]',
help='new values for the Port')
return parser
def run(self, parsed_args):
self.log.debug('run(%s)' % parsed_args)
quantum_client = self.app.client_manager.quantum
quantum_client.tenant = parsed_args.tenant_id
quantum_client.format = parsed_args.request_format
field_values = parsed_args.newvalues
data = {'port': {}}
for kv in field_values.split(","):
try:
k, v = kv.split("=")
data['port'][k] = v
except ValueError:
raise exceptions.CommandError(
"malformed new values (field=newvalue): %s" % kv)
data['network_id'] = parsed_args.net_id
data['port']['id'] = parsed_args.port_id
quantum_client.update_port(
parsed_args.net_id, parsed_args.port_id, data)
print >>self.app.stdout, (_('Updated Logical Port: %(portid)s') %
{'portid': parsed_args.port_id})
return

View File

@ -0,0 +1,339 @@
# Copyright 2012 OpenStack LLC.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# vim: tabstop=4 shiftwidth=4 softtabstop=4
import argparse
import logging
from cliff import lister
from cliff import show
from quantumclient.common import command
from quantumclient.common import exceptions
from quantumclient.common import utils
def add_show_list_common_argument(parser):
parser.add_argument(
'-D', '--show-details',
help='show detailed info',
action='store_true',
default=False, )
parser.add_argument(
'-F', '--fields',
help='specify the field(s) to be returned by server,'
' can be repeated',
action='append',
default=[], )
def add_extra_argument(parser, name, _help):
parser.add_argument(
name,
nargs=argparse.REMAINDER,
help=_help + ': --key1 [type=int|bool|...] value '
'[--key2 [type=int|bool|...] value ...]')
def parse_args_to_dict(values_specs):
'''It is used to analyze the extra command options to command.
Besides known options and arguments, our commands also support user to
put more options to the end of command line. For example,
list_nets -- --tag x y --key1 value1, where '-- --tag x y --key1 value1'
is extra options to our list_nets. This feature can support V2.0 API's
fields selection and filters. For example, to list networks which has name
'test4', we can have list_nets -- --name=test4.
value spec is: --key type=int|bool|... value. Type is one of Python
built-in types. By default, type is string. The key without value is
a bool option. Key with two values will be a list option.
'''
# -- is a pseudo argument
if values_specs and values_specs[0] == '--':
del values_specs[0]
_options = {}
current_arg = None
_values_specs = []
_value_number = 0
current_item = None
for _item in values_specs:
if _item.startswith('--'):
if current_arg is not None:
if _value_number > 1:
current_arg.update({'nargs': '+'})
elif _value_number == 0:
current_arg.update({'action': 'store_true'})
_temp = _item
if "=" in _item:
_item = _item.split('=')[0]
if _item in _options:
raise exceptions.CommandError(
"duplicated options %s" % ' '.join(values_specs))
else:
_options.update({_item: {}})
current_arg = _options[_item]
_item = _temp
elif _item.startswith('type='):
if current_arg is not None:
_type_str = _item.split('=', 2)[1]
current_arg.update({'type': eval(_type_str)})
if _type_str == 'bool':
current_arg.update({'type': utils.__str2bool})
continue
else:
raise exceptions.CommandError(
"invalid values_specs %s" % ' '.join(values_specs))
if not _item.startswith('--'):
if not current_item or '=' in current_item:
raise exceptions.CommandError(
"Invalid values_specs %s" % ' '.join(values_specs))
_value_number += 1
elif _item.startswith('--'):
current_item = _item
if '=' in current_item:
_value_number = 1
else:
_value_number = 0
_values_specs.append(_item)
if current_arg is not None:
if _value_number > 1:
current_arg.update({'nargs': '+'})
elif _value_number == 0:
current_arg.update({'action': 'store_true'})
_parser = argparse.ArgumentParser(add_help=False)
for opt, optspec in _options.iteritems():
_parser.add_argument(opt, **optspec)
_args = _parser.parse_args(_values_specs)
result_dict = {}
for opt in _options.iterkeys():
_opt = opt.split('--', 2)[1]
_value = getattr(_args, _opt.replace('-', '_'))
if _value is not None:
result_dict.update({_opt: _value})
return result_dict
class QuantumCommand(command.OpenStackCommand):
api = 'network'
log = logging.getLogger(__name__ + '.QuantumCommand')
def get_client(self):
return self.app.client_manager.quantum
def get_parser(self, prog_name):
parser = super(QuantumCommand, self).get_parser(prog_name)
parser.add_argument(
'--request-format',
help=_('the xml or json request format'),
default='json',
choices=['json', 'xml', ], )
return parser
class CreateCommand(QuantumCommand, show.ShowOne):
"""Create a resource for a given tenant
"""
api = 'network'
resource = None
log = None
def get_parser(self, prog_name):
parser = super(CreateCommand, self).get_parser(prog_name)
parser.add_argument(
'--tenant-id', metavar='tenant-id',
help=_('the owner tenant ID'), )
self.add_known_arguments(parser)
add_extra_argument(parser, 'value_specs',
'new values for the %s' % self.resource)
return parser
def add_known_arguments(self, parser):
pass
def args2body(self, parsed_args):
return {}
def get_data(self, parsed_args):
self.log.debug('get_data(%s)' % parsed_args)
quantum_client = self.get_client()
quantum_client.format = parsed_args.request_format
body = self.args2body(parsed_args)
_extra_values = parse_args_to_dict(parsed_args.value_specs)
body[self.resource].update(_extra_values)
obj_creator = getattr(quantum_client,
"create_%s" % self.resource)
data = obj_creator(body)
# {u'network': {u'id': u'e9424a76-6db4-4c93-97b6-ec311cd51f19'}}
info = self.resource in data and data[self.resource] or None
if info:
print >>self.app.stdout, _('Created a new %s:' % self.resource)
else:
info = {'': ''}
return zip(*sorted(info.iteritems()))
class UpdateCommand(QuantumCommand):
"""Update resource's information
"""
api = 'network'
resource = None
log = None
def get_parser(self, prog_name):
parser = super(UpdateCommand, self).get_parser(prog_name)
parser.add_argument(
'id', metavar='%s-id' % self.resource,
help='ID of %s to update' % self.resource)
add_extra_argument(parser, 'value_specs',
'new values for the %s' % self.resource)
return parser
def run(self, parsed_args):
self.log.debug('run(%s)' % parsed_args)
quantum_client = self.get_client()
quantum_client.format = parsed_args.request_format
value_specs = parsed_args.value_specs
if not value_specs:
raise exceptions.CommandError(
"Must specify new values to update %s" % self.resource)
data = {self.resource: parse_args_to_dict(value_specs)}
obj_updator = getattr(quantum_client,
"update_%s" % self.resource)
obj_updator(parsed_args.id, data)
print >>self.app.stdout, (
_('Updated %(resource)s: %(id)s') %
{'id': parsed_args.id, 'resource': self.resource})
return
class DeleteCommand(QuantumCommand):
"""Delete a given resource
"""
api = 'network'
resource = None
log = None
def get_parser(self, prog_name):
parser = super(DeleteCommand, self).get_parser(prog_name)
parser.add_argument(
'id', metavar='%s-id' % self.resource,
help='ID of %s to delete' % self.resource)
return parser
def run(self, parsed_args):
self.log.debug('run(%s)' % parsed_args)
quantum_client = self.get_client()
quantum_client.format = parsed_args.request_format
obj_deleter = getattr(quantum_client,
"delete_%s" % self.resource)
obj_deleter(parsed_args.id)
print >>self.app.stdout, (_('Deleted %(resource)s: %(id)s')
% {'id': parsed_args.id,
'resource': self.resource})
return
class ListCommand(QuantumCommand, lister.Lister):
"""List resourcs that belong to a given tenant
"""
api = 'network'
resource = None
log = None
_formatters = None
def get_parser(self, prog_name):
parser = super(ListCommand, self).get_parser(prog_name)
add_show_list_common_argument(parser)
add_extra_argument(parser, 'filter_specs', 'filters options')
return parser
def get_data(self, parsed_args):
self.log.debug('get_data(%s)' % parsed_args)
quantum_client = self.get_client()
search_opts = parse_args_to_dict(parsed_args.filter_specs)
self.log.debug('search options: %s', search_opts)
quantum_client.format = parsed_args.request_format
fields = parsed_args.fields
extra_fields = search_opts.get('fields', [])
if extra_fields:
if isinstance(extra_fields, list):
fields.extend(extra_fields)
else:
fields.append(extra_fields)
if fields:
search_opts.update({'fields': fields})
if parsed_args.show_details:
search_opts.update({'verbose': 'True'})
obj_lister = getattr(quantum_client,
"list_%ss" % self.resource)
data = obj_lister(**search_opts)
info = []
collection = self.resource + "s"
if collection in data:
info = data[collection]
_columns = len(info) > 0 and sorted(info[0].keys()) or []
return (_columns, (utils.get_item_properties(
s, _columns, formatters=self._formatters, )
for s in info), )
class ShowCommand(QuantumCommand, show.ShowOne):
"""Show information of a given resource
"""
api = 'network'
resource = None
log = None
def get_parser(self, prog_name):
parser = super(ShowCommand, self).get_parser(prog_name)
add_show_list_common_argument(parser)
parser.add_argument(
'id', metavar='%s-id' % self.resource,
help='ID of %s to look up' % self.resource)
return parser
def get_data(self, parsed_args):
self.log.debug('get_data(%s)' % parsed_args)
quantum_client = self.get_client()
quantum_client.format = parsed_args.request_format
params = {}
if parsed_args.show_details:
params = {'verbose': 'True'}
if parsed_args.fields:
params = {'fields': parsed_args.fields}
obj_showor = getattr(quantum_client,
"show_%s" % self.resource)
data = obj_showor(parsed_args.id, **params)
if self.resource in data:
return zip(*sorted(data[self.resource].iteritems()))
else:
return None

View File

@ -0,0 +1,99 @@
# Copyright 2012 OpenStack LLC.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# vim: tabstop=4 shiftwidth=4 softtabstop=4
import logging
from quantumclient.quantum.v2_0 import CreateCommand
from quantumclient.quantum.v2_0 import DeleteCommand
from quantumclient.quantum.v2_0 import ListCommand
from quantumclient.quantum.v2_0 import UpdateCommand
from quantumclient.quantum.v2_0 import ShowCommand
def _format_subnets(network):
try:
return '\n'.join(network['subnets'])
except Exception:
return ''
class ListNetwork(ListCommand):
"""List networks that belong to a given tenant
Sample: list_nets -D -- --name=test4 --tag a b
"""
resource = 'network'
log = logging.getLogger(__name__ + '.ListNetwork')
_formatters = {'subnets': _format_subnets, }
class ShowNetwork(ShowCommand):
"""Show information of a given network
Sample: show_net -D <network_id>
"""
resource = 'network'
log = logging.getLogger(__name__ + '.ShowNetwork')
class CreateNetwork(CreateCommand):
"""Create a network for a given tenant
Sample create_net --tenant-id xxx --admin-state-down <net_name> --tag x y
"""
resource = 'network'
log = logging.getLogger(__name__ + '.CreateNetwork')
def add_known_arguments(self, parser):
parser.add_argument(
'--admin-state-down',
default=True, action='store_false',
help='Set Admin State Up to false')
parser.add_argument(
'name', metavar='name',
help='Name of network to create')
def args2body(self, parsed_args):
body = {'network': {
'name': parsed_args.name,
'admin_state_up': parsed_args.admin_state_down, }, }
if parsed_args.tenant_id:
body['network'].update({'tenant_id': parsed_args.tenant_id})
return body
class DeleteNetwork(DeleteCommand):
"""Delete a given network
Sample: delete_net <network_id>
"""
log = logging.getLogger(__name__ + '.DeleteNetwork')
resource = 'network'
class UpdateNetwork(UpdateCommand):
"""Update network's information
Sample: update_net <network_id> --name=test --admin_state_up type=bool True
"""
log = logging.getLogger(__name__ + '.UpdateNetwork')
resource = 'network'

View File

@ -0,0 +1,110 @@
# Copyright 2012 OpenStack LLC.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# vim: tabstop=4 shiftwidth=4 softtabstop=4
import logging
from quantumclient.quantum.v2_0 import CreateCommand
from quantumclient.quantum.v2_0 import DeleteCommand
from quantumclient.quantum.v2_0 import ListCommand
from quantumclient.quantum.v2_0 import ShowCommand
from quantumclient.quantum.v2_0 import UpdateCommand
def _format_fixed_ips(port):
try:
return '\n'.join(port['fixed_ips'])
except Exception:
return ''
class ListPort(ListCommand):
"""List networks that belong to a given tenant
Sample: list_ports -D -- --name=test4 --tag a b
"""
resource = 'port'
log = logging.getLogger(__name__ + '.ListPort')
_formatters = {'fixed_ips': _format_fixed_ips, }
class ShowPort(ShowCommand):
"""Show information of a given port
Sample: show_port -D <port_id>
"""
resource = 'port'
log = logging.getLogger(__name__ + '.ShowPort')
class CreatePort(CreateCommand):
"""Create a port for a given tenant
Sample create_port --tenant-id xxx --admin-state-down \
--mac_address mac --device_id deviceid <network_id>
"""
resource = 'port'
log = logging.getLogger(__name__ + '.CreatePort')
def add_known_arguments(self, parser):
parser.add_argument(
'--admin-state-down',
default=True, action='store_false',
help='set admin state up to false')
parser.add_argument(
'--mac-address',
help='mac address of port')
parser.add_argument(
'--device-id',
help='device id of this port')
parser.add_argument(
'network_id',
help='Network id of this port belongs to')
def args2body(self, parsed_args):
body = {'port': {'admin_state_up': parsed_args.admin_state_down,
'network_id': parsed_args.network_id, }, }
if parsed_args.mac_address:
body['port'].update({'mac_address': parsed_args.mac_address})
if parsed_args.device_id:
body['port'].update({'device_id': parsed_args.device_id})
if parsed_args.tenant_id:
body['port'].update({'tenant_id': parsed_args.tenant_id})
return body
class DeletePort(DeleteCommand):
"""Delete a given port
Sample: delete_port <port_id>
"""
resource = 'port'
log = logging.getLogger(__name__ + '.DeletePort')
class UpdatePort(UpdateCommand):
"""Update port's information
Sample: update_port <port_id> --name=test --admin_state_up type=bool True
"""
resource = 'port'
log = logging.getLogger(__name__ + '.UpdatePort')

View File

@ -0,0 +1,101 @@
# Copyright 2012 OpenStack LLC.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# vim: tabstop=4 shiftwidth=4 softtabstop=4
import logging
from quantumclient.quantum.v2_0 import CreateCommand
from quantumclient.quantum.v2_0 import DeleteCommand
from quantumclient.quantum.v2_0 import ListCommand
from quantumclient.quantum.v2_0 import ShowCommand
from quantumclient.quantum.v2_0 import UpdateCommand
class ListSubnet(ListCommand):
"""List networks that belong to a given tenant
Sample: list_subnets -D -- --name=test4 --tag a b
"""
resource = 'subnet'
log = logging.getLogger(__name__ + '.ListSubnet')
_formatters = {}
class ShowSubnet(ShowCommand):
"""Show information of a given subnet
Sample: show_subnet -D <subnet_id>
"""
resource = 'subnet'
log = logging.getLogger(__name__ + '.ShowSubnet')
class CreateSubnet(CreateCommand):
"""Create a subnet for a given tenant
Sample create_subnet --tenant-id xxx --ip-version 4\
<network_id> <cidr> --tag x y --otherfield value
"""
resource = 'subnet'
log = logging.getLogger(__name__ + '.CreateSubnet')
def add_known_arguments(self, parser):
parser.add_argument('--ip-version', type=int,
default=4, choices=[4, 6],
help='IP version with default 4')
parser.add_argument(
'--gateway', metavar='gateway',
help='gateway ip of this subnet')
parser.add_argument(
'network_id',
help='Network id of this subnet belongs to')
parser.add_argument(
'cidr', metavar='cidr',
help='cidr of subnet to create')
def args2body(self, parsed_args):
body = {'subnet': {'cidr': parsed_args.cidr,
'network_id': parsed_args.network_id,
'ip_version': parsed_args.ip_version, }, }
if parsed_args.gateway:
body['subnet'].update({'gateway_ip': parsed_args.gateway})
if parsed_args.tenant_id:
body['subnet'].update({'tenant_id': parsed_args.tenant_id})
return body
class DeleteSubnet(DeleteCommand):
"""Delete a given subnet
Sample: delete_subnet <subnet-id>
"""
resource = 'subnet'
log = logging.getLogger(__name__ + '.DeleteSubnet')
class UpdateSubnet(UpdateCommand):
"""Update subnet's information
Sample:
update_subnet <subnet-id> --name=test --admin_state_up type=bool True
"""
resource = 'subnet'
log = logging.getLogger(__name__ + '.UpdateSubnet')

383
quantumclient/shell.py Normal file
View File

@ -0,0 +1,383 @@
# Copyright 2012 OpenStack LLC.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# vim: tabstop=4 shiftwidth=4 softtabstop=4
"""
Command-line interface to the Quantum APIs
"""
import argparse
import gettext
import itertools
import logging
import os
import sys
from cliff.app import App
from cliff.commandmanager import CommandManager
from quantumclient.common import clientmanager
from quantumclient.common import exceptions as exc
from quantumclient.common import utils
gettext.install('quantum', unicode=1)
VERSION = '2.0'
def env(*vars, **kwargs):
"""Search for the first defined of possibly many env vars
Returns the first environment variable defined in vars, or
returns the default defined in kwargs.
"""
for v in vars:
value = os.environ.get(v, None)
if value:
return value
return kwargs.get('default', '')
COMMAND_V1 = {
'list_nets': utils.import_class(
'quantumclient.quantum.v1_1.network.ListNetwork'),
'show_net': utils.import_class(
'quantumclient.quantum.v1_1.network.ShowNetwork'),
'create_net': utils.import_class(
'quantumclient.quantum.v1_1.network.CreateNetwork'),
'delete_net': utils.import_class(
'quantumclient.quantum.v1_1.network.DeleteNetwork'),
'update_net': utils.import_class(
'quantumclient.quantum.v1_1.network.UpdateNetwork'),
'list_ports': utils.import_class(
'quantumclient.quantum.v1_1.port.ListPort'),
'show_port': utils.import_class(
'quantumclient.quantum.v1_1.port.ShowPort'),
'create_port': utils.import_class(
'quantumclient.quantum.v1_1.port.CreatePort'),
'delete_port': utils.import_class(
'quantumclient.quantum.v1_1.port.DeletePort'),
'update_port': utils.import_class(
'quantumclient.quantum.v1_1.port.UpdatePort'),
'plug_iface': utils.import_class(
'quantumclient.quantum.v1_1.interface.PlugInterface'),
'unplug_iface': utils.import_class(
'quantumclient.quantum.v1_1.interface.UnPlugInterface'),
'show_iface': utils.import_class(
'quantumclient.quantum.v1_1.interface.ShowInterface'), }
COMMAND_V2 = {
'list_nets': utils.import_class(
'quantumclient.quantum.v2_0.network.ListNetwork'),
'show_net': utils.import_class(
'quantumclient.quantum.v2_0.network.ShowNetwork'),
'create_net': utils.import_class(
'quantumclient.quantum.v2_0.network.CreateNetwork'),
'delete_net': utils.import_class(
'quantumclient.quantum.v2_0.network.DeleteNetwork'),
'update_net': utils.import_class(
'quantumclient.quantum.v2_0.network.UpdateNetwork'),
'list_subnets': utils.import_class(
'quantumclient.quantum.v2_0.subnet.ListSubnet'),
'show_subnet': utils.import_class(
'quantumclient.quantum.v2_0.subnet.ShowSubnet'),
'create_subnet': utils.import_class(
'quantumclient.quantum.v2_0.subnet.CreateSubnet'),
'delete_subnet': utils.import_class(
'quantumclient.quantum.v2_0.subnet.DeleteSubnet'),
'update_subnet': utils.import_class(
'quantumclient.quantum.v2_0.subnet.UpdateSubnet'),
'list_ports': utils.import_class(
'quantumclient.quantum.v2_0.port.ListPort'),
'show_port': utils.import_class(
'quantumclient.quantum.v2_0.port.ShowPort'),
'create_port': utils.import_class(
'quantumclient.quantum.v2_0.port.CreatePort'),
'delete_port': utils.import_class(
'quantumclient.quantum.v2_0.port.DeletePort'),
'update_port': utils.import_class(
'quantumclient.quantum.v2_0.port.UpdatePort'), }
COMMANDS = {'1.0': COMMAND_V1,
'1.1': COMMAND_V1,
'2.0': COMMAND_V2, }
class HelpAction(argparse.Action):
"""Provide a custom action so the -h and --help options
to the main app will print a list of the commands.
The commands are determined by checking the CommandManager
instance, passed in as the "default" value for the action.
"""
def __call__(self, parser, namespace, values, option_string=None):
app = self.default
parser.print_help(app.stdout)
app.stdout.write('\nCommands for API v%s:\n' % app.api_version)
command_manager = app.command_manager
for name, ep in sorted(command_manager):
factory = ep.load()
cmd = factory(self, None)
one_liner = cmd.get_description().split('\n')[0]
app.stdout.write(' %-13s %s\n' % (name, one_liner))
sys.exit(0)
class QuantumShell(App):
CONSOLE_MESSAGE_FORMAT = '%(levelname)s: %(name)s %(message)s'
log = logging.getLogger(__name__)
def __init__(self, apiversion):
super(QuantumShell, self).__init__(
description=__doc__.strip(),
version=VERSION,
command_manager=CommandManager('quantum.cli'), )
for k, v in COMMANDS[apiversion].items():
self.command_manager.add_command(k, v)
# This is instantiated in initialize_app() only when using
# password flow auth
self.auth_client = None
self.api_version = apiversion
def build_option_parser(self, description, version):
"""Return an argparse option parser for this application.
Subclasses may override this method to extend
the parser with more global options.
:param description: full description of the application
:paramtype description: str
:param version: version number for the application
:paramtype version: str
"""
parser = argparse.ArgumentParser(
description=description,
add_help=False, )
parser.add_argument(
'--version',
action='version',
version='%(prog)s {0}'.format(version), )
parser.add_argument(
'-v', '--verbose',
action='count',
dest='verbose_level',
default=self.DEFAULT_VERBOSE_LEVEL,
help='Increase verbosity of output. Can be repeated.', )
parser.add_argument(
'-q', '--quiet',
action='store_const',
dest='verbose_level',
const=0,
help='suppress output except warnings and errors', )
parser.add_argument(
'-H', '--Help',
action=HelpAction,
nargs=0,
default=self, # tricky
help="show this help message and exit", )
parser.add_argument(
'--debug',
default=False,
action='store_true',
help='show tracebacks on errors', )
# Global arguments
parser.add_argument(
'--os-auth-url', metavar='<auth-url>',
default=env('OS_AUTH_URL'),
help='Authentication URL (Env: OS_AUTH_URL)')
parser.add_argument(
'--os-tenant-name', metavar='<auth-tenant-name>',
default=env('OS_TENANT_NAME'),
help='Authentication tenant name (Env: OS_TENANT_NAME)')
parser.add_argument(
'--os-username', metavar='<auth-username>',
default=utils.env('OS_USERNAME'),
help='Authentication username (Env: OS_USERNAME)')
parser.add_argument(
'--os-password', metavar='<auth-password>',
default=utils.env('OS_PASSWORD'),
help='Authentication password (Env: OS_PASSWORD)')
parser.add_argument(
'--os-region-name', metavar='<auth-region-name>',
default=env('OS_REGION_NAME'),
help='Authentication region name (Env: OS_REGION_NAME)')
parser.add_argument(
'--os-quantum-api-version',
metavar='<quantum-api-version>',
default=env('OS_QUANTUM_API_VERSION', default='2.0'),
help='QAUNTUM API version, default=2.0 '
'(Env: OS_QUANTUM_API_VERSION)')
parser.add_argument(
'--os-token', metavar='<token>',
default=env('OS_TOKEN'),
help='Defaults to env[OS_TOKEN]')
parser.add_argument(
'--os-url', metavar='<url>',
default=env('OS_URL'),
help='Defaults to env[OS_URL]')
return parser
def run(self, argv):
"""Equivalent to the main program for the application.
:param argv: input arguments and options
:paramtype argv: list of str
"""
try:
self.options, remainder = self.parser.parse_known_args(argv)
self.configure_logging()
self.interactive_mode = not remainder
self.initialize_app(remainder)
except Exception as err:
if self.options.debug:
self.log.exception(err)
raise
else:
self.log.error(err)
return 1
result = 1
if self.interactive_mode:
_argv = [sys.argv[0]]
sys.argv = _argv
result = self.interact()
else:
result = self.run_subcommand(remainder)
return result
def authenticate_user(self):
"""Make sure the user has provided all of the authentication
info we need.
"""
if self.options.os_token or self.options.os_url:
# Token flow auth takes priority
if not self.options.os_token:
raise exc.CommandError(
"You must provide a token via"
" either --os-token or env[OS_TOKEN]")
if not self.options.os_url:
raise exc.CommandError(
"You must provide a service URL via"
" either --os-url or env[OS_URL]")
else:
# Validate password flow auth
if not self.options.os_username:
raise exc.CommandError(
"You must provide a username via"
" either --os-username or env[OS_USERNAME]")
if not self.options.os_password:
raise exc.CommandError(
"You must provide a password via"
" either --os-password or env[OS_PASSWORD]")
if not (self.options.os_tenant_name):
raise exc.CommandError(
"You must provide a tenant_name via"
" either --os-tenant-name or via env[OS_TENANT_NAME]")
if not self.options.os_auth_url:
raise exc.CommandError(
"You must provide an auth url via"
" either --os-auth-url or via env[OS_AUTH_URL]")
self.client_manager = clientmanager.ClientManager(
token=self.options.os_token,
url=self.options.os_url,
auth_url=self.options.os_auth_url,
tenant_name=self.options.os_tenant_name,
username=self.options.os_username,
password=self.options.os_password,
region_name=self.options.os_region_name,
api_version=self.api_version, )
return
def initialize_app(self, argv):
"""Global app init bits:
* set up API versions
* validate authentication info
"""
super(QuantumShell, self).initialize_app(argv)
self.api_version = {
'network': self.options.os_quantum_api_version, }
# If the user is not asking for help, make sure they
# have given us auth.
cmd_name = None
if argv:
cmd_info = self.command_manager.find_command(argv)
cmd_factory, cmd_name, sub_argv = cmd_info
if self.interactive_mode or cmd_name != 'help':
self.authenticate_user()
def clean_up(self, cmd, result, err):
self.log.debug('clean_up %s', cmd.__class__.__name__)
if err:
self.log.debug('got an error: %s', err)
def itertools_compressdef(data, selectors):
# patch 2.6 compress('ABCDEF', [1,0,1,0,1,1]) --> A C E F
return (d for d, s in itertools.izip(data, selectors) if s)
def main(argv=sys.argv[1:]):
apiVersion = None
versionFlag = False
for argitem in argv:
if argitem.startswith('--os-quantum-api-version='):
apiVersion = argitem.split('=', 2)[1]
break
elif '--os-quantum-api-version' == argitem:
versionFlag = True
elif versionFlag:
apiVersion = argitem
break
if apiVersion and apiVersion not in COMMANDS.keys():
print ("Invalid API version or API version '%s' is not supported" %
apiVersion)
sys.exit(1)
if not apiVersion:
apiVersion = env('OS_QUANTUM_API_VERSION', default='2.0')
try:
if not getattr(itertools, 'compress', None):
setattr(itertools, 'compress', itertools_compressdef)
return QuantumShell(apiVersion).run(argv)
except exc.QuantumClientException:
return 1
except Exception as e:
print e
return 1
if __name__ == "__main__":
sys.exit(main(sys.argv[1:]))

View File

@ -17,6 +17,7 @@
from quantum import api as server
from quantum.openstack.common import cfg
from quantum.tests.unit import testlib_api
@ -40,9 +41,8 @@ class FakeHTTPConnection:
def __init__(self, _1, _2):
# Ignore host and port parameters
self._req = None
plugin = 'quantum.plugins.sample.SamplePlugin.FakePlugin'
options = dict(plugin_provider=plugin)
self._api = server.APIRouterV11(options)
cfg.CONF.core_plugin = 'quantum.plugins.sample.SamplePlugin.FakePlugin'
self._api = server.APIRouterV11()
def request(self, method, action, body, headers):
# TODO: remove version prefix from action!

View File

@ -0,0 +1,59 @@
# Copyright 2012 OpenStack LLC.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# vim: tabstop=4 shiftwidth=4 softtabstop=4
import unittest
from quantumclient.common import exceptions
from quantumclient.quantum import v2_0 as quantumV20
class CLITestArgs(unittest.TestCase):
def test_empty(self):
_mydict = quantumV20.parse_args_to_dict([])
self.assertEqual({}, _mydict)
def test_default_bool(self):
_specs = ['--my_bool', '--arg1', 'value1']
_mydict = quantumV20.parse_args_to_dict(_specs)
self.assertTrue(_mydict['my_bool'])
def test_bool_true(self):
_specs = ['--my-bool', 'type=bool', 'true', '--arg1', 'value1']
_mydict = quantumV20.parse_args_to_dict(_specs)
self.assertTrue(_mydict['my-bool'])
def test_bool_false(self):
_specs = ['--my_bool', 'type=bool', 'false', '--arg1', 'value1']
_mydict = quantumV20.parse_args_to_dict(_specs)
self.assertFalse(_mydict['my_bool'])
def test_nargs(self):
_specs = ['--tag', 'x', 'y', '--arg1', 'value1']
_mydict = quantumV20.parse_args_to_dict(_specs)
self.assertTrue('x' in _mydict['tag'])
self.assertTrue('y' in _mydict['tag'])
def test_badarg(self):
_specs = ['--tag=t', 'x', 'y', '--arg1', 'value1']
self.assertRaises(exceptions.CommandError,
quantumV20.parse_args_to_dict, _specs)
def test_arg(self):
_specs = ['--tag=t', '--arg1', 'value1']
self.assertEqual('value1',
quantumV20.parse_args_to_dict(_specs)['arg1'])

View File

@ -26,12 +26,12 @@ import sys
import unittest
from quantum import api as server
from quantum.db import api as db
from quantumclient import ClientV11
from quantumclient import cli_lib as cli
from quantumclient import Client
from quantum.db import api as db
from quantum.openstack.common import cfg
from quantumclient.tests.unit import stubs as client_stubs
LOG = logging.getLogger('quantumclient.tests.test_cli')
API_VERSION = "1.1"
FORMAT = 'json'
@ -41,21 +41,19 @@ class CLITest(unittest.TestCase):
def setUp(self):
"""Prepare the test environment"""
options = {}
options['plugin_provider'] = (
'quantum.plugins.sample.SamplePlugin.FakePlugin')
#TODO: make the version of the API router configurable
self.api = server.APIRouterV11(options)
cfg.CONF.core_plugin = 'quantum.plugins.sample.SamplePlugin.FakePlugin'
self.api = server.APIRouterV11()
self.tenant_id = "test_tenant"
self.network_name_1 = "test_network_1"
self.network_name_2 = "test_network_2"
self.version = API_VERSION
# Prepare client and plugin manager
self.client = Client(tenant=self.tenant_id,
format=FORMAT,
testingStub=client_stubs.FakeHTTPConnection,
version=self.version)
self.client = ClientV11(tenant=self.tenant_id,
format=FORMAT,
testingStub=client_stubs.FakeHTTPConnection,
version=self.version)
# Redirect stdout
self.fake_stdout = client_stubs.FakeStdout()
sys.stdout = self.fake_stdout
@ -69,7 +67,7 @@ class CLITest(unittest.TestCase):
# Verification - get raw result from db
nw_list = db.network_list(self.tenant_id)
networks = [{'id': nw.uuid, 'name': nw.name}
for nw in nw_list]
for nw in nw_list]
# Fill CLI template
output = cli.prepare_output('list_nets',
self.tenant_id,
@ -92,6 +90,23 @@ class CLITest(unittest.TestCase):
# Must add newline at the end to match effect of print call
self.assertEquals(self.fake_stdout.make_string(), output + '\n')
def _verify_list_networks_details_name_filter(self, name):
# Verification - get raw result from db
nw_list = db.network_list(self.tenant_id)
nw_filtered = []
for nw in nw_list:
if nw.name == name:
nw_filtered.append(nw)
networks = [dict(id=nw.uuid, name=nw.name) for nw in nw_filtered]
# Fill CLI template
output = cli.prepare_output('list_nets_detail',
self.tenant_id,
dict(networks=networks),
self.version)
# Verify!
# Must add newline at the end to match effect of print call
self.assertEquals(self.fake_stdout.make_string(), output + '\n')
def _verify_create_network(self):
# Verification - get raw result from db
nw_list = db.network_list(self.tenant_id)
@ -160,14 +175,10 @@ class CLITest(unittest.TestCase):
port_list = db.port_list(nw.uuid)
if not port_list:
network['ports'] = [
{
'id': '<none>',
'state': '<none>',
'attachment': {
'id': '<none>',
},
},
]
{'id': '<none>',
'state': '<none>',
'attachment': {
'id': '<none>', }, }, ]
else:
network['ports'] = []
for port in port_list:
@ -175,9 +186,7 @@ class CLITest(unittest.TestCase):
'id': port.uuid,
'state': port.state,
'attachment': {
'id': port.interface_id or '<none>',
},
})
'id': port.interface_id or '<none>', }, })
# Fill CLI template
output = cli.prepare_output('show_net_detail',
@ -416,6 +425,25 @@ class CLITest(unittest.TestCase):
LOG.debug(self.fake_stdout.content)
self._verify_list_networks_details()
def test_list_networks_details_v11_name_filter(self):
try:
# Pre-populate data for testing using db api
db.network_create(self.tenant_id, self.network_name_1)
db.network_create(self.tenant_id, self.network_name_2)
#TODO: test filters
cli.list_nets_detail_v11(self.client,
self.tenant_id,
self.version,
{'name': self.network_name_1, })
except:
LOG.exception("Exception caught: %s", sys.exc_info())
self.fail("test_list_networks_details_v11 failed due to " +
"an exception")
LOG.debug("Operation completed. Verifying result")
LOG.debug(self.fake_stdout.content)
self._verify_list_networks_details_name_filter(self.network_name_1)
def test_create_network(self):
try:
cli.create_net(self.client,

View File

@ -0,0 +1,290 @@
# Copyright 2012 OpenStack LLC.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# vim: tabstop=4 shiftwidth=4 softtabstop=4
import sys
import unittest
import mox
from mox import ContainsKeyValue
from mox import Comparator
from quantumclient.v2_0.client import Client
API_VERSION = "2.0"
FORMAT = 'json'
TOKEN = 'testtoken'
ENDURL = 'localurl'
class FakeStdout:
def __init__(self):
self.content = []
def write(self, text):
self.content.append(text)
def make_string(self):
result = ''
for line in self.content:
result = result + line
return result
class MyResp(object):
def __init__(self, status):
self.status = status
class MyApp(object):
def __init__(self, _stdout):
self.stdout = _stdout
class MyComparator(Comparator):
def __init__(self, lhs, client):
self.lhs = lhs
self.client = client
def _com_dict(self, lhs, rhs):
if len(lhs) != len(rhs):
return False
for key, value in lhs.iteritems():
if key not in rhs:
return False
rhs_value = rhs[key]
if not self._com(value, rhs_value):
return False
return True
def _com_list(self, lhs, rhs):
if len(lhs) != len(rhs):
return False
for lhs_value in lhs:
if lhs_value not in rhs:
return False
return True
def _com(self, lhs, rhs):
if lhs is None:
return rhs is None
if isinstance(lhs, dict):
if not isinstance(rhs, dict):
return False
return self._com_dict(lhs, rhs)
if isinstance(lhs, list):
if not isinstance(rhs, list):
return False
return self._com_list(lhs, rhs)
if isinstance(lhs, tuple):
if not isinstance(rhs, tuple):
return False
return self._com_list(lhs, rhs)
return lhs == rhs
def equals(self, rhs):
if self.client:
rhs = self.client.deserialize(rhs, 200)
return self._com(self.lhs, rhs)
def __repr__(self):
return str(self.lhs)
class CLITestV20Base(unittest.TestCase):
def _url(self, path, query=None):
_url_str = self.endurl + "/v" + API_VERSION + path + "." + FORMAT
return query and _url_str + "?" + query or _url_str
def setUp(self):
"""Prepare the test environment"""
self.mox = mox.Mox()
self.endurl = ENDURL
self.client = Client(token=TOKEN, endpoint_url=self.endurl)
self.fake_stdout = FakeStdout()
sys.stdout = self.fake_stdout
def tearDown(self):
"""Clear the test environment"""
sys.stdout = sys.__stdout__
def _test_create_resource(self, resource, cmd,
name, myid, args,
position_names, position_values, tenant_id=None,
tags=None, admin_state_up=True):
self.mox.StubOutWithMock(cmd, "get_client")
self.mox.StubOutWithMock(self.client.httpclient, "request")
cmd.get_client().MultipleTimes().AndReturn(self.client)
if resource == 'subnet':
body = {resource: {}, }
else:
body = {resource: {'admin_state_up': admin_state_up, }, }
if tenant_id:
body[resource].update({'tenant_id': tenant_id})
if tags:
body[resource].update({'tags': tags})
for i in xrange(len(position_names)):
body[resource].update({position_names[i]: position_values[i]})
ress = {resource:
{'id': myid,
'name': name, }, }
resstr = self.client.serialize(ress)
# url method body
path = getattr(self.client, resource + "s_path")
self.client.httpclient.request(
self._url(path), 'POST',
body=MyComparator(body, self.client),
headers=ContainsKeyValue('X-Auth-Token',
TOKEN)).AndReturn((MyResp(200),
resstr))
self.mox.ReplayAll()
cmd_parser = cmd.get_parser('create_' + resource)
parsed_args = cmd_parser.parse_args(args)
cmd.run(parsed_args)
self.mox.VerifyAll()
self.mox.UnsetStubs()
_str = self.fake_stdout.make_string()
self.assertTrue(myid, _str)
self.assertTrue(name, _str)
def _test_list_resources(self, resources, cmd, detail=False, tags=[],
fields_1=[], fields_2=[]):
self.mox.StubOutWithMock(cmd, "get_client")
self.mox.StubOutWithMock(self.client.httpclient, "request")
cmd.get_client().MultipleTimes().AndReturn(self.client)
reses = {resources: [{'id': 'myid1', },
{'id': 'myid2', }, ], }
resstr = self.client.serialize(reses)
# url method body
query = ""
args = detail and ['-D', ] or []
if fields_1:
for field in fields_1:
args.append('--fields')
args.append(field)
if tags:
args.append('--')
args.append("--tag")
for tag in tags:
if query:
query += "&tag=" + tag
else:
query = "tag=" + tag
args.append(tag)
if (not tags) and fields_2:
args.append('--')
if fields_2:
args.append("--fields")
for field in fields_2:
args.append(field)
if detail:
query = query and query + '&verbose=True' or 'verbose=True'
fields_1.extend(fields_2)
for field in fields_1:
if query:
query += "&fields=" + field
else:
query = "fields=" + field
path = getattr(self.client, resources + "_path")
self.client.httpclient.request(
self._url(path, query), 'GET',
body=None,
headers=ContainsKeyValue('X-Auth-Token',
TOKEN)).AndReturn((MyResp(200), resstr))
self.mox.ReplayAll()
cmd_parser = cmd.get_parser("list_" + resources)
parsed_args = cmd_parser.parse_args(args)
cmd.run(parsed_args)
self.mox.VerifyAll()
self.mox.UnsetStubs()
_str = self.fake_stdout.make_string()
self.assertTrue('myid1' in _str)
def _test_update_resource(self, resource, cmd, myid, args, extrafields):
self.mox.StubOutWithMock(cmd, "get_client")
self.mox.StubOutWithMock(self.client.httpclient, "request")
cmd.get_client().MultipleTimes().AndReturn(self.client)
body = {resource: extrafields}
path = getattr(self.client, resource + "_path")
self.client.httpclient.request(
self._url(path % myid), 'PUT',
body=MyComparator(body, self.client),
headers=ContainsKeyValue('X-Auth-Token',
TOKEN)).AndReturn((MyResp(204), None))
self.mox.ReplayAll()
cmd_parser = cmd.get_parser("update_" + resource)
parsed_args = cmd_parser.parse_args(args)
cmd.run(parsed_args)
self.mox.VerifyAll()
self.mox.UnsetStubs()
_str = self.fake_stdout.make_string()
self.assertTrue(myid in _str)
def _test_show_resource(self, resource, cmd, myid, args, fields=[]):
self.mox.StubOutWithMock(cmd, "get_client")
self.mox.StubOutWithMock(self.client.httpclient, "request")
cmd.get_client().MultipleTimes().AndReturn(self.client)
query = None
for field in fields:
if query:
query += "&fields=" + field
else:
query = "fields=" + field
resnetworks = {resource:
{'id': myid,
'name': 'myname', }, }
resstr = self.client.serialize(resnetworks)
path = getattr(self.client, resource + "_path")
self.client.httpclient.request(
self._url(path % myid, query), 'GET',
body=None,
headers=ContainsKeyValue('X-Auth-Token',
TOKEN)).AndReturn((MyResp(200), resstr))
self.mox.ReplayAll()
cmd_parser = cmd.get_parser("show_" + resource)
parsed_args = cmd_parser.parse_args(args)
cmd.run(parsed_args)
self.mox.VerifyAll()
self.mox.UnsetStubs()
_str = self.fake_stdout.make_string()
self.assertTrue(myid in _str)
self.assertTrue('myname' in _str)
def _test_delete_resource(self, resource, cmd, myid, args):
self.mox.StubOutWithMock(cmd, "get_client")
self.mox.StubOutWithMock(self.client.httpclient, "request")
cmd.get_client().MultipleTimes().AndReturn(self.client)
path = getattr(self.client, resource + "_path")
self.client.httpclient.request(
self._url(path % myid), 'DELETE',
body=None,
headers=ContainsKeyValue('X-Auth-Token',
TOKEN)).AndReturn((MyResp(204), None))
self.mox.ReplayAll()
cmd_parser = cmd.get_parser("delete_" + resource)
parsed_args = cmd_parser.parse_args(args)
cmd.run(parsed_args)
self.mox.VerifyAll()
self.mox.UnsetStubs()
_str = self.fake_stdout.make_string()
self.assertTrue(myid in _str)

View File

@ -0,0 +1,140 @@
# Copyright 2012 OpenStack LLC.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# vim: tabstop=4 shiftwidth=4 softtabstop=4
import sys
from quantumclient.common import exceptions
from quantumclient.tests.unit.test_cli20 import CLITestV20Base
from quantumclient.tests.unit.test_cli20 import MyApp
from quantumclient.quantum.v2_0.network import CreateNetwork
from quantumclient.quantum.v2_0.network import ListNetwork
from quantumclient.quantum.v2_0.network import UpdateNetwork
from quantumclient.quantum.v2_0.network import ShowNetwork
from quantumclient.quantum.v2_0.network import DeleteNetwork
class CLITestV20Network(CLITestV20Base):
def test_create_network(self):
""" create_net myname"""
resource = 'network'
cmd = CreateNetwork(MyApp(sys.stdout), None)
name = 'myname'
myid = 'myid'
args = [name, ]
position_names = ['name', ]
position_values = [name, ]
_str = self._test_create_resource(resource, cmd, name, myid, args,
position_names, position_values)
def test_create_network_tenant(self):
"""create_net --tenant-id tenantid myname"""
resource = 'network'
cmd = CreateNetwork(MyApp(sys.stdout), None)
name = 'myname'
myid = 'myid'
args = ['--tenant-id', 'tenantid', name]
position_names = ['name', ]
position_values = [name, ]
_str = self._test_create_resource(resource, cmd, name, myid, args,
position_names, position_values,
tenant_id='tenantid')
def test_create_network_tags(self):
""" create_net myname --tags a b"""
resource = 'network'
cmd = CreateNetwork(MyApp(sys.stdout), None)
name = 'myname'
myid = 'myid'
args = [name, '--tags', 'a', 'b']
position_names = ['name', ]
position_values = [name, ]
_str = self._test_create_resource(resource, cmd, name, myid, args,
position_names, position_values,
tags=['a', 'b'])
def test_create_network_state(self):
""" create_net --admin-state-down myname"""
resource = 'network'
cmd = CreateNetwork(MyApp(sys.stdout), None)
name = 'myname'
myid = 'myid'
args = ['--admin-state-down', name, ]
position_names = ['name', ]
position_values = [name, ]
_str = self._test_create_resource(resource, cmd, name, myid, args,
position_names, position_values,
admin_state_up=False)
def test_list_nets_detail(self):
"""list_nets -D"""
resources = "networks"
cmd = ListNetwork(MyApp(sys.stdout), None)
self._test_list_resources(resources, cmd, True)
def test_list_nets_tags(self):
"""list_nets -- --tags a b"""
resources = "networks"
cmd = ListNetwork(MyApp(sys.stdout), None)
self._test_list_resources(resources, cmd, tags=['a', 'b'])
def test_list_nets_detail_tags(self):
"""list_nets -D -- --tags a b"""
resources = "networks"
cmd = ListNetwork(MyApp(sys.stdout), None)
self._test_list_resources(resources, cmd, detail=True, tags=['a', 'b'])
def test_list_nets_fields(self):
"""list_nets --fields a --fields b -- --fields c d"""
resources = "networks"
cmd = ListNetwork(MyApp(sys.stdout), None)
self._test_list_resources(resources, cmd,
fields_1=['a', 'b'], fields_2=['c', 'd'])
def test_update_network_exception(self):
""" update_net myid"""
resource = 'network'
cmd = UpdateNetwork(MyApp(sys.stdout), None)
self.assertRaises(exceptions.CommandError, self._test_update_resource,
resource, cmd, 'myid', ['myid'], {})
def test_update_network(self):
""" update_net myid --name myname --tags a b"""
resource = 'network'
cmd = UpdateNetwork(MyApp(sys.stdout), None)
self._test_update_resource(resource, cmd, 'myid',
['myid', '--name', 'myname',
'--tags', 'a', 'b'],
{'name': 'myname', 'tags': ['a', 'b'], }
)
def test_show_network(self):
""" show_net --fields id --fields name myid """
resource = 'network'
cmd = ShowNetwork(MyApp(sys.stdout), None)
myid = 'myid'
args = ['--fields', 'id', '--fields', 'name', myid]
self._test_show_resource(resource, cmd, myid, args, ['id', 'name'])
def test_delete_network(self):
"""
delete_net myid
"""
resource = 'network'
cmd = DeleteNetwork(MyApp(sys.stdout), None)
myid = 'myid'
args = [myid]
self._test_delete_resource(resource, cmd, myid, args)

View File

@ -0,0 +1,139 @@
# Copyright 2012 OpenStack LLC.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# vim: tabstop=4 shiftwidth=4 softtabstop=4
import sys
from quantumclient.tests.unit.test_cli20 import CLITestV20Base
from quantumclient.tests.unit.test_cli20 import MyApp
from quantumclient.quantum.v2_0.port import CreatePort
from quantumclient.quantum.v2_0.port import ListPort
from quantumclient.quantum.v2_0.port import UpdatePort
from quantumclient.quantum.v2_0.port import ShowPort
from quantumclient.quantum.v2_0.port import DeletePort
class CLITestV20Port(CLITestV20Base):
def test_create_port(self):
""" create_port netid"""
resource = 'port'
cmd = CreatePort(MyApp(sys.stdout), None)
name = 'myname'
myid = 'myid'
netid = 'netid'
args = [netid]
position_names = ['network_id']
position_values = []
position_values.extend([netid])
_str = self._test_create_resource(resource, cmd, name, myid, args,
position_names, position_values)
def test_create_port_full(self):
""" create_port --mac-address mac --device-id deviceid netid"""
resource = 'port'
cmd = CreatePort(MyApp(sys.stdout), None)
name = 'myname'
myid = 'myid'
netid = 'netid'
args = ['--mac-address', 'mac', '--device-id', 'deviceid', netid]
position_names = ['network_id', 'mac_address', 'device_id']
position_values = [netid, 'mac', 'deviceid']
_str = self._test_create_resource(resource, cmd, name, myid, args,
position_names, position_values)
def test_create_port_tenant(self):
"""create_port --tenant-id tenantid netid"""
resource = 'port'
cmd = CreatePort(MyApp(sys.stdout), None)
name = 'myname'
myid = 'myid'
netid = 'netid'
args = ['--tenant-id', 'tenantid', netid, ]
position_names = ['network_id']
position_values = []
position_values.extend([netid])
_str = self._test_create_resource(resource, cmd, name, myid, args,
position_names, position_values,
tenant_id='tenantid')
def test_create_port_tags(self):
""" create_port netid mac_address device_id --tags a b"""
resource = 'port'
cmd = CreatePort(MyApp(sys.stdout), None)
name = 'myname'
myid = 'myid'
netid = 'netid'
args = [netid, '--tags', 'a', 'b']
position_names = ['network_id']
position_values = []
position_values.extend([netid])
_str = self._test_create_resource(resource, cmd, name, myid, args,
position_names, position_values,
tags=['a', 'b'])
def test_list_ports_detail(self):
"""list_ports -D"""
resources = "ports"
cmd = ListPort(MyApp(sys.stdout), None)
self._test_list_resources(resources, cmd, True)
def test_list_ports_tags(self):
"""list_ports -- --tags a b"""
resources = "ports"
cmd = ListPort(MyApp(sys.stdout), None)
self._test_list_resources(resources, cmd, tags=['a', 'b'])
def test_list_ports_detail_tags(self):
"""list_ports -D -- --tags a b"""
resources = "ports"
cmd = ListPort(MyApp(sys.stdout), None)
self._test_list_resources(resources, cmd, detail=True, tags=['a', 'b'])
def test_list_ports_fields(self):
"""list_ports --fields a --fields b -- --fields c d"""
resources = "ports"
cmd = ListPort(MyApp(sys.stdout), None)
self._test_list_resources(resources, cmd,
fields_1=['a', 'b'], fields_2=['c', 'd'])
def test_update_port(self):
""" update_port myid --name myname --tags a b"""
resource = 'port'
cmd = UpdatePort(MyApp(sys.stdout), None)
self._test_update_resource(resource, cmd, 'myid',
['myid', '--name', 'myname',
'--tags', 'a', 'b'],
{'name': 'myname', 'tags': ['a', 'b'], }
)
def test_show_port(self):
""" show_port --fields id --fields name myid """
resource = 'port'
cmd = ShowPort(MyApp(sys.stdout), None)
myid = 'myid'
args = ['--fields', 'id', '--fields', 'name', myid]
self._test_show_resource(resource, cmd, myid, args, ['id', 'name'])
def test_delete_port(self):
"""
delete_port myid
"""
resource = 'port'
cmd = DeletePort(MyApp(sys.stdout), None)
myid = 'myid'
args = [myid]
self._test_delete_resource(resource, cmd, myid, args)

View File

@ -0,0 +1,130 @@
# Copyright 2012 OpenStack LLC.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# vim: tabstop=4 shiftwidth=4 softtabstop=4
import sys
from quantumclient.tests.unit.test_cli20 import CLITestV20Base
from quantumclient.tests.unit.test_cli20 import MyApp
from quantumclient.quantum.v2_0.subnet import CreateSubnet
from quantumclient.quantum.v2_0.subnet import ListSubnet
from quantumclient.quantum.v2_0.subnet import UpdateSubnet
from quantumclient.quantum.v2_0.subnet import ShowSubnet
from quantumclient.quantum.v2_0.subnet import DeleteSubnet
class CLITestV20Subnet(CLITestV20Base):
def test_create_subnet(self):
""" create_subnet --gateway gateway netid cidr"""
resource = 'subnet'
cmd = CreateSubnet(MyApp(sys.stdout), None)
name = 'myname'
myid = 'myid'
netid = 'netid'
cidr = 'cidrvalue'
gateway = 'gatewayvalue'
args = ['--gateway', gateway, netid, cidr]
position_names = ['ip_version', 'network_id', 'cidr', 'gateway_ip']
position_values = [4, ]
position_values.extend([netid, cidr, gateway])
_str = self._test_create_resource(resource, cmd, name, myid, args,
position_names, position_values)
def test_create_subnet_tenant(self):
"""create_subnet --tenant-id tenantid netid cidr"""
resource = 'subnet'
cmd = CreateSubnet(MyApp(sys.stdout), None)
name = 'myname'
myid = 'myid'
netid = 'netid'
cidr = 'prefixvalue'
args = ['--tenant-id', 'tenantid', netid, cidr]
position_names = ['ip_version', 'network_id', 'cidr']
position_values = [4, ]
position_values.extend([netid, cidr])
_str = self._test_create_resource(resource, cmd, name, myid, args,
position_names, position_values,
tenant_id='tenantid')
def test_create_subnet_tags(self):
""" create_subnet netid cidr --tags a b"""
resource = 'subnet'
cmd = CreateSubnet(MyApp(sys.stdout), None)
name = 'myname'
myid = 'myid'
netid = 'netid'
cidr = 'prefixvalue'
args = [netid, cidr, '--tags', 'a', 'b']
position_names = ['ip_version', 'network_id', 'cidr']
position_values = [4, ]
position_values.extend([netid, cidr])
_str = self._test_create_resource(resource, cmd, name, myid, args,
position_names, position_values,
tags=['a', 'b'])
def test_list_subnets_detail(self):
"""list_subnets -D"""
resources = "subnets"
cmd = ListSubnet(MyApp(sys.stdout), None)
self._test_list_resources(resources, cmd, True)
def test_list_subnets_tags(self):
"""list_subnets -- --tags a b"""
resources = "subnets"
cmd = ListSubnet(MyApp(sys.stdout), None)
self._test_list_resources(resources, cmd, tags=['a', 'b'])
def test_list_subnets_detail_tags(self):
"""list_subnets -D -- --tags a b"""
resources = "subnets"
cmd = ListSubnet(MyApp(sys.stdout), None)
self._test_list_resources(resources, cmd, detail=True, tags=['a', 'b'])
def test_list_subnets_fields(self):
"""list_subnets --fields a --fields b -- --fields c d"""
resources = "subnets"
cmd = ListSubnet(MyApp(sys.stdout), None)
self._test_list_resources(resources, cmd,
fields_1=['a', 'b'], fields_2=['c', 'd'])
def test_update_subnet(self):
""" update_subnet myid --name myname --tags a b"""
resource = 'subnet'
cmd = UpdateSubnet(MyApp(sys.stdout), None)
self._test_update_resource(resource, cmd, 'myid',
['myid', '--name', 'myname',
'--tags', 'a', 'b'],
{'name': 'myname', 'tags': ['a', 'b'], }
)
def test_show_subnet(self):
""" show_subnet --fields id --fields name myid """
resource = 'subnet'
cmd = ShowSubnet(MyApp(sys.stdout), None)
myid = 'myid'
args = ['--fields', 'id', '--fields', 'name', myid]
self._test_show_resource(resource, cmd, myid, args, ['id', 'name'])
def test_delete_subnet(self):
"""
delete_subnet myid
"""
resource = 'subnet'
cmd = DeleteSubnet(MyApp(sys.stdout), None)
myid = 'myid'
args = [myid]
self._test_delete_resource(resource, cmd, myid, args)

View File

@ -349,7 +349,7 @@ class APITest(unittest.TestCase):
"PUT",
"networks/001/ports/001/attachment",
data=["001", "001",
{'resource': {'id': '1234'}}],
{'resource': {'id': '1234'}}],
params={'tenant': tenant, 'format': format})
LOG.debug("_test_attach_resource - tenant:%s "

View File

@ -0,0 +1,14 @@
# Copyright 2012 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

View File

@ -0,0 +1,398 @@
# Copyright 2012 OpenStack LLC.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# vim: tabstop=4 shiftwidth=4 softtabstop=4
import httplib
import logging
import time
import urllib
from quantumclient.client import HTTPClient
from quantumclient.common import exceptions
from quantumclient.common.serializer import Serializer
_logger = logging.getLogger(__name__)
def exception_handler_v20(status_code, error_content):
""" Exception handler for API v2.0 client
This routine generates the appropriate
Quantum exception according to the contents of the
response body
:param status_code: HTTP error status code
:param error_content: deserialized body of error response
"""
quantum_errors = {
'NetworkNotFound': exceptions.NetworkNotFoundClient,
'NetworkInUse': exceptions.NetworkInUseClient,
'PortNotFound': exceptions.PortNotFoundClient,
'RequestedStateInvalid': exceptions.StateInvalidClient,
'PortInUse': exceptions.PortInUseClient,
'AlreadyAttached': exceptions.AlreadyAttachedClient, }
error_dict = None
if isinstance(error_content, dict):
error_dict = error_content.get('QuantumError')
# Find real error type
bad_quantum_error_flag = False
if error_dict:
# If QuantumError key is found, it will definitely contain
# a 'message' and 'type' keys?
try:
error_type = error_dict['type']
error_message = (error_dict['message'] + "\n" +
error_dict['detail'])
except Exception:
bad_quantum_error_flag = True
if not bad_quantum_error_flag:
ex = None
try:
# raise the appropriate error!
ex = quantum_errors[error_type](message=error_message)
ex.args = ([dict(status_code=status_code,
message=error_message)], )
except Exception:
pass
if ex:
raise ex
else:
raise exceptions.QuantumClientException(message=error_dict)
else:
message = None
if isinstance(error_content, dict):
message = error_content.get('message', None)
if message:
raise exceptions.QuantumClientException(message=message)
# If we end up here the exception was not a quantum error
msg = "%s-%s" % (status_code, error_content)
raise exceptions.QuantumClientException(message=msg)
class APIParamsCall(object):
"""A Decorator to add support for format and tenant overriding
and filters
"""
def __init__(self, function):
self.function = function
def __get__(self, instance, owner):
def with_params(*args, **kwargs):
_format = instance.format
if 'format' in kwargs:
instance.format = kwargs['format']
ret = self.function(instance, *args, **kwargs)
instance.forma = _format
return ret
return with_params
class Client(object):
"""Client for the OpenStack Quantum v2.0 API.
:param string username: Username for authentication. (optional)
:param string password: Password for authentication. (optional)
:param string token: Token for authentication. (optional)
:param string tenant_name: Tenant name. (optional)
:param string auth_url: Keystone service endpoint for authorization.
:param string region_name: Name of a region to select when choosing an
endpoint from the service catalog.
:param string endpoint_url: A user-supplied endpoint URL for the quantum
service. Lazy-authentication is possible for API
service calls if endpoint is set at
instantiation.(optional)
:param integer timeout: Allows customization of the timeout for client
http requests. (optional)
:param insecure: ssl certificate validation. (optional)
Example::
>>> from quantumclient.v2_0 import client
>>> quantum = client.Client(username=USER,
password=PASS,
tenant_name=TENANT_NAME,
auth_url=KEYSTONE_URL)
>>> nets = quantum.list_nets()
...
"""
#Metadata for deserializing xml
_serialization_metadata = {
"application/xml": {
"attributes": {
"network": ["id", "name"],
"port": ["id", "mac_address"],
"subnet": ["id", "prefix"]},
"plurals": {
"networks": "network",
"ports": "port",
"subnets": "subnet", }, }, }
networks_path = "/networks"
network_path = "/networks/%s"
ports_path = "/ports"
port_path = "/ports/%s"
subnets_path = "/subnets"
subnet_path = "/subnets/%s"
@APIParamsCall
def list_ports(self, **_params):
"""
Fetches a list of all networks for a tenant
"""
# Pass filters in "params" argument to do_request
return self.get(self.ports_path, params=_params)
@APIParamsCall
def show_port(self, port, **_params):
"""
Fetches information of a certain network
"""
return self.get(self.port_path % (port), params=_params)
@APIParamsCall
def create_port(self, body=None):
"""
Creates a new port
"""
return self.post(self.ports_path, body=body)
@APIParamsCall
def update_port(self, port, body=None):
"""
Updates a port
"""
return self.put(self.port_path % (port), body=body)
@APIParamsCall
def delete_port(self, port):
"""
Deletes the specified port
"""
return self.delete(self.port_path % (port))
@APIParamsCall
def list_networks(self, **_params):
"""
Fetches a list of all networks for a tenant
"""
# Pass filters in "params" argument to do_request
return self.get(self.networks_path, params=_params)
@APIParamsCall
def show_network(self, network, **_params):
"""
Fetches information of a certain network
"""
return self.get(self.network_path % (network), params=_params)
@APIParamsCall
def create_network(self, body=None):
"""
Creates a new network
"""
return self.post(self.networks_path, body=body)
@APIParamsCall
def update_network(self, network, body=None):
"""
Updates a network
"""
return self.put(self.network_path % (network), body=body)
@APIParamsCall
def delete_network(self, network):
"""
Deletes the specified network
"""
return self.delete(self.network_path % (network))
@APIParamsCall
def list_subnets(self, **_params):
"""
Fetches a list of all networks for a tenant
"""
return self.get(self.subnets_path, params=_params)
@APIParamsCall
def show_subnet(self, subnet, **_params):
"""
Fetches information of a certain subnet
"""
return self.get(self.subnet_path % (subnet), params=_params)
@APIParamsCall
def create_subnet(self, body=None):
"""
Creates a new subnet
"""
return self.post(self.subnets_path, body=body)
@APIParamsCall
def update_subnet(self, subnet, body=None):
"""
Updates a subnet
"""
return self.put(self.subnet_path % (subnet), body=body)
@APIParamsCall
def delete_subnet(self, subnet):
"""
Deletes the specified subnet
"""
return self.delete(self.subnet_path % (subnet))
def __init__(self, **kwargs):
""" Initialize a new client for the Quantum v2.0 API. """
super(Client, self).__init__()
self.httpclient = HTTPClient(**kwargs)
self.version = '2.0'
self.format = 'json'
self.action_prefix = "/v%s" % (self.version)
self.retries = 0
self.retry_interval = 1
def _handle_fault_response(self, status_code, response_body):
# Create exception with HTTP status code and message
error_message = response_body
_logger.debug("Error message: %s", error_message)
# Add deserialized error message to exception arguments
try:
des_error_body = Serializer().deserialize(error_message,
self.content_type())
except:
# If unable to deserialized body it is probably not a
# Quantum error
des_error_body = {'message': error_message}
# Raise the appropriate exception
exception_handler_v20(status_code, des_error_body)
def do_request(self, method, action, body=None, headers=None, params=None):
# Add format and tenant_id
action += ".%s" % self.format
action = self.action_prefix + action
if type(params) is dict:
action += '?' + urllib.urlencode(params, doseq=1)
if body:
body = self.serialize(body)
self.httpclient.content_type = self.content_type()
resp, replybody = self.httpclient.do_request(action, method, body=body)
status_code = self.get_status_code(resp)
if status_code in (httplib.OK,
httplib.CREATED,
httplib.ACCEPTED,
httplib.NO_CONTENT):
return self.deserialize(replybody, status_code)
else:
self._handle_fault_response(status_code, replybody)
def get_status_code(self, response):
"""
Returns the integer status code from the response, which
can be either a Webob.Response (used in testing) or httplib.Response
"""
if hasattr(response, 'status_int'):
return response.status_int
else:
return response.status
def serialize(self, data):
"""
Serializes a dictionary with a single key (which can contain any
structure) into either xml or json
"""
if data is None:
return None
elif type(data) is dict:
return Serializer().serialize(data, self.content_type())
else:
raise Exception("unable to serialize object of type = '%s'" %
type(data))
def deserialize(self, data, status_code):
"""
Deserializes a an xml or json string into a dictionary
"""
if status_code == 204:
return data
return Serializer(self._serialization_metadata).deserialize(
data, self.content_type())
def content_type(self, format=None):
"""
Returns the mime-type for either 'xml' or 'json'. Defaults to the
currently set format
"""
if not format:
format = self.format
return "application/%s" % (format)
def retry_request(self, method, action, body=None,
headers=None, params=None):
"""
Call do_request with the default retry configuration. Only
idempotent requests should retry failed connection attempts.
:raises: ConnectionFailed if the maximum # of retries is exceeded
"""
max_attempts = self.retries + 1
for i in xrange(max_attempts):
try:
return self.do_request(method, action, body=body,
headers=headers, params=params)
except exceptions.ConnectionFailed:
# Exception has already been logged by do_request()
if i < self.retries:
_logger.debug(_('Retrying connection to quantum service'))
time.sleep(self.retry_interval)
raise exceptions.ConnectionFailed(reason=_("Maximum attempts reached"))
def delete(self, action, body=None, headers=None, params=None):
return self.retry_request("DELETE", action, body=body,
headers=headers, params=params)
def get(self, action, body=None, headers=None, params=None):
return self.retry_request("GET", action, body=body,
headers=headers, params=params)
def post(self, action, body=None, headers=None, params=None):
# Do not retry POST requests to avoid the orphan objects problem.
return self.do_request("POST", action, body=body,
headers=headers, params=params)
def put(self, action, body=None, headers=None, params=None):
return self.retry_request("PUT", action, body=body,
headers=headers, params=params)
#if __name__ == '__main__':
#
# client20 = Client(username='admin',
# password='password',
# auth_url='http://localhost:5000/v2.0',
# tenant_name='admin')
# client20 = Client(token='ec796583fcad4aa690b723bc0b25270e',
# endpoint_url='http://localhost:9696')
#
# client20.tenant = 'default'
# client20.format = 'json'
# nets = client20.list_networks()
# print nets

View File

@ -61,12 +61,13 @@ setuptools.setup(
tests_require=tests_require,
cmdclass=setup.get_cmdclass(),
include_package_data=False,
packages=setuptools.find_packages(exclude=['tests', 'tests.*']),
packages=setuptools.find_packages('.'),
package_data=PackageData,
eager_resources=EagerResources,
entry_points={
'console_scripts': [
'quantum = quantumclient.cli:main'
'quantum = quantumclient.cli:main',
'quantumv2 = quantumclient.shell:main',
]
},
)

7
tools/pip-requires Normal file
View File

@ -0,0 +1,7 @@
cliff>=0.6.0
argparse
httplib2
prettytable>=0.6.0
simplejson

View File

@ -1,11 +1,15 @@
distribute>=0.6.24
cliff>=0.6.0
argparse
httplib2
prettytable>=0.6.0
simplejson
mox
nose
nose-exclude
nosexcover
openstack.nose_plugin
pep8==0.6.1
pep8
sphinx>=1.1.2
https://github.com/openstack/quantum/zipball/master#egg=quantum