GBP Client repo init

Adding essential code artifacts for bootstrapping GBP client
repo.

Change-Id: I3e09ed06a0cf719ccfeacb240829900da17d9f65
This commit is contained in:
Sumit Naiksatam 2014-09-28 21:20:48 -07:00
parent 4e0fcf16ab
commit 2b08b68478
24 changed files with 1780 additions and 0 deletions

7
.coveragerc Normal file
View File

@ -0,0 +1,7 @@
[run]
branch = True
source = gbpclient
omit = gbpclient/openstack/*,gbpclient/tests/*
[report]
ignore-errors = True

21
.gitignore vendored Normal file
View File

@ -0,0 +1,21 @@
*.pyc
*.DS_Store
*.egg
*.sw?
AUTHORS
ChangeLog
build/*
build-stamp
cover/*
doc/build/
doc/source/api/
python_group_based_policy_client.egg-info/*
gbp/vcsversion.py
gbpclient/versioninfo
run_tests.err.log
run_tests.log
.autogenerated
.coverage
.testrepository/
.tox/
.venv/

39
.pylintrc Normal file
View File

@ -0,0 +1,39 @@
# The format of this file isn't really documented; just use --generate-rcfile
[MASTER]
# Add <file or directory> to the black list. It should be a base name, not a
# path. You may set this option multiple times.
ignore=test
[Messages Control]
# NOTE(justinsb): We might want to have a 2nd strict pylintrc in future
# C0111: Don't require docstrings on every method
# W0511: TODOs in code comments are fine.
# W0142: *args and **kwargs are fine.
# W0622: Redefining id is fine.
disable=C0111,W0511,W0142,W0622
[Basic]
# Variable names can be 1 to 31 characters long, with lowercase and underscores
variable-rgx=[a-z_][a-z0-9_]{0,30}$
# Argument names can be 2 to 31 characters long, with lowercase and underscores
argument-rgx=[a-z_][a-z0-9_]{1,30}$
# Method names should be at least 3 characters long
# and be lowecased with underscores
method-rgx=([a-z_][a-z0-9_]{2,50}|setUp|tearDown)$
# Don't require docstrings on tests.
no-docstring-rgx=((__.*__)|([tT]est.*)|setUp|tearDown)$
[Design]
max-public-methods=100
min-public-methods=0
max-args=6
[Variables]
# List of additional names supposed to be defined in builtins. Remember that
# you should avoid to define new builtins when possible.
# _ is used by our localization
additional-builtins=_

4
.testr.conf Normal file
View File

@ -0,0 +1,4 @@
[DEFAULT]
test_command=OS_STDOUT_CAPTURE=1 OS_STDERR_CAPTURE=1 ${PYTHON:-python} -m subunit.run discover -t ./ . $LISTOPT $IDOPTION
test_id_option=--load-list $IDFILE
test_list_option=--list

16
CONTRIBUTING.rst Normal file
View File

@ -0,0 +1,16 @@
If you would like to contribute to the development of OpenStack,
you must follow the steps documented at:
https://wiki.openstack.org/wiki/How_To_Contribute#If_you.27re_a_developer
Once those steps have been completed, changes to OpenStack
should be submitted for review via the Gerrit tool, following
the workflow documented at:
https://wiki.openstack.org/GerritWorkflow
Pull requests submitted through GitHub will be ignored.
Bugs should be filed on Launchpad, not GitHub:
https://bugs.launchpad.net/python-group-based-policy-client

6
MANIFEST.in Normal file
View File

@ -0,0 +1,6 @@
include tox.ini
include LICENSE README.rst HACKING.rst
include AUTHORS
include ChangeLog
include tools/*
recursive-include tests *

1
README.rst Normal file
View File

@ -0,0 +1 @@
This is the client API library for Group Based Policy.

52
doc/source/conf.py Normal file
View File

@ -0,0 +1,52 @@
# -*- coding: utf-8 -*-
#
project = 'python-group-based-policy-client'
# -- General configuration ---------------------------------------------
# Add any Sphinx extension module names here, as strings. They can be
# extensions coming with Sphinx (named 'sphinx.ext.*') or your custom ones.
extensions = ['sphinx.ext.autodoc', 'oslosphinx']
# Add any paths that contain templates here, relative to this directory.
templates_path = ['_templates']
# The suffix of source filenames.
source_suffix = '.rst'
# The master toctree document.
master_doc = 'index'
# General information about the project.
copyright = u'OpenStack Foundation'
# If true, '()' will be appended to :func: etc. cross-reference text.
add_function_parentheses = True
# If true, the current module name will be prepended to all description
# unit titles (such as .. function::).
add_module_names = True
# The name of the Pygments (syntax highlighting) style to use.
pygments_style = 'sphinx'
# -- Options for HTML output ---------------------------------------------
# The theme to use for HTML and HTML Help pages. Major themes that come with
# Sphinx are currently 'default' and 'sphinxdoc'.
# html_theme = 'nature'
# Output file base name for HTML help builder.
htmlhelp_basename = '%sdoc' % project
# Grouping the document tree into LaTeX files. List of tuples
# (source start file, target name, title, author,
# documentclass [howto/manual]).
latex_documents = [
('index',
'%s.tex' % project,
u'%s Documentation' % project,
u'OpenStack Foundation', 'manual'),
]

41
doc/source/index.rst Normal file
View File

@ -0,0 +1,41 @@
Python bindings to the Group Based Policy API
=============================================
In order to use the python group-based-policy- client directly, you must first obtain an auth token and identify which endpoint you wish to speak to. Once you have done so, you can use the API like so::
>>> import logging
>>> from gbpclient.gbp import client
>>> logging.basicConfig(level=logging.DEBUG)
>>> gbp = client.Client('2.0', endpoint_url=OS_URL, token=OS_TOKEN)
>>> gbp.format = 'json'
>>> ptg = {'name': 'my_ptg'}
>>> gbp.create_policy_target_group({'policy_target_group':ptg})
>>> policy_target_groups = gbp.list_policy_target_groups(name='my_ptg')
>>> print policy_target_groups
>>> ptg_id = policy_target_groups['policy_target_groups'][0]['id']
>>> gbp.delete_policy_target_group(ptg_id)
Command-line Tool
=================
In order to use the CLI, you must provide your OpenStack username, password, tenant, and auth endpoint. Use the corresponding configuration options (``--os-username``, ``--os-password``, ``--os-tenant-name``, and ``--os-auth-url``) or set them in environment variables::
export OS_USERNAME=user
export OS_PASSWORD=pass
export OS_TENANT_NAME=tenant
export OS_AUTH_URL=http://auth.example.com:5000/v2.0
The command line tool will attempt to reauthenticate using your provided credentials for every request. You can override this behavior by manually supplying an auth token using ``--os-url`` and ``--os-auth-token``. You can alternatively set these environment variables::
export OS_URL=http://neutron.example.org:9696/
export OS_TOKEN=3bcc3d3a03f44e3d8377f9247b0ad155
If neutron server does not require authentication, besides these two arguments or environment variables (We can use any value as token.), we need manually supply ``--os-auth-strategy`` or set the environment variable::
export OS_AUTH_STRATEGY=noauth
Once you've configured your authentication parameters, you can run ``gbp -h`` to see a complete listing of available commands.
Release Notes
=============

51
gbp_test.sh Executable file
View File

@ -0,0 +1,51 @@
#!/bin/bash
set -x
function die() {
local exitcode=$?
set +o xtrace
echo $@
cleanup
exit $exitcode
}
ptg_name=myptg1
## TODO Sumit: Test for other resources as well after renaming
function cleanup() {
echo Removing test ptg...
gbp endpointgroup-delete ptg_name
}
noauth_tenant_id=me
if [ "$1" == "noauth" ]; then
NOAUTH="--tenant_id $noauth_tenant_id"
else
NOAUTH=
fi
echo "NOTE: User should be admin in order to perform all operations."
sleep 3
FORMAT=" --request-format xml"
# test the CRUD of network
ptg=$ptg_name
gbp endpointgroup-create $FORMAT $NOAUTH $ptg || die "fail to create ptg $ptg"
temp=`gbp endpointgroup-list $FORMAT -- --name $ptg --fields id | wc -l`
echo $temp
if [ $temp -ne 5 ]; then
die "PTGs with name $ptg is not unique or found"
fi
ptg_id=`gbp gbp-list -- --name $ptg --fields id | tail -n 2 | head -n 1 | cut -d' ' -f 2`
echo "ID of PTG with name $ptg is $ptg_id"
gbp endpointgroup-show $FORMAT $ptg || die "fail to show PTG $ptg"
gbp endpointgroup-show $FORMAT $ptg_id || die "fail to show PTG $ptg_id"
gbp endpointgroup-update $FORMAT $ptg --description "desc" || die "fail to update PTG $ptg"
gbp endpointgroup-update $FORMAT $ptg_id --description "new" || die "fail to update PTG $ptg_id"
gbp endpointgroup-list $FORMAT -c id -- --id fakeid || die "fail to list PTGs with column selection on empty list"
cleanup
echo "Success! :)"

0
gbpclient/__init__.py Normal file
View File

View File

View File

View File

@ -0,0 +1,567 @@
# Copyright 2012 NEC Corporation
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import json
import uuid
import fixtures
import httpretty
from mox3 import mox
import requests
import six
import testtools
from keystoneclient.auth.identity import v2 as ks_v2_auth
from keystoneclient.auth.identity import v3 as ks_v3_auth
from keystoneclient import exceptions as ks_exceptions
from keystoneclient.fixture import v2 as ks_v2_fixture
from keystoneclient.fixture import v3 as ks_v3_fixture
from keystoneclient import session
from neutronclient import client
from neutronclient.common import exceptions
from neutronclient.common import utils
from neutronclient.openstack.common import jsonutils
USERNAME = 'testuser'
USER_ID = 'testuser_id'
TENANT_NAME = 'testtenant'
TENANT_ID = 'testtenant_id'
PASSWORD = 'password'
ENDPOINT_URL = 'localurl'
PUBLIC_ENDPOINT_URL = 'public_%s' % ENDPOINT_URL
ADMIN_ENDPOINT_URL = 'admin_%s' % ENDPOINT_URL
INTERNAL_ENDPOINT_URL = 'internal_%s' % ENDPOINT_URL
ENDPOINT_OVERRIDE = 'otherurl'
TOKEN = 'tokentoken'
TOKENID = uuid.uuid4().hex
REGION = 'RegionOne'
NOAUTH = 'noauth'
KS_TOKEN_RESULT = {
'access': {
'token': {'id': TOKEN,
'expires': '2012-08-11T07:49:01Z',
'tenant': {'id': str(uuid.uuid1())}},
'user': {'id': str(uuid.uuid1())},
'serviceCatalog': [
{'endpoints_links': [],
'endpoints': [{'adminURL': ENDPOINT_URL,
'internalURL': ENDPOINT_URL,
'publicURL': ENDPOINT_URL,
'region': REGION}],
'type': 'network',
'name': 'Neutron Service'}
]
}
}
ENDPOINTS_RESULT = {
'endpoints': [{
'type': 'network',
'name': 'Neutron Service',
'region': REGION,
'adminURL': ENDPOINT_URL,
'internalURL': ENDPOINT_URL,
'publicURL': ENDPOINT_URL
}]
}
BASE_HOST = 'http://keystone.example.com'
BASE_URL = "%s:5000/" % BASE_HOST
UPDATED = '2013-03-06T00:00:00Z'
# FIXME (bklei): A future release of keystoneclient will support
# a discovery fixture which can replace these constants and clean
# this up.
V2_URL = "%sv2.0" % BASE_URL
V2_DESCRIBED_BY_HTML = {'href': 'http://docs.openstack.org/api/'
'openstack-identity-service/2.0/content/',
'rel': 'describedby',
'type': 'text/html'}
V2_DESCRIBED_BY_PDF = {'href': 'http://docs.openstack.org/api/openstack-ident'
'ity-service/2.0/identity-dev-guide-2.0.pdf',
'rel': 'describedby',
'type': 'application/pdf'}
V2_VERSION = {'id': 'v2.0',
'links': [{'href': V2_URL, 'rel': 'self'},
V2_DESCRIBED_BY_HTML, V2_DESCRIBED_BY_PDF],
'status': 'stable',
'updated': UPDATED}
V3_URL = "%sv3" % BASE_URL
V3_MEDIA_TYPES = [{'base': 'application/json',
'type': 'application/vnd.openstack.identity-v3+json'},
{'base': 'application/xml',
'type': 'application/vnd.openstack.identity-v3+xml'}]
V3_VERSION = {'id': 'v3.0',
'links': [{'href': V3_URL, 'rel': 'self'}],
'media-types': V3_MEDIA_TYPES,
'status': 'stable',
'updated': UPDATED}
def _create_version_entry(version):
return jsonutils.dumps({'version': version})
def _create_version_list(versions):
return jsonutils.dumps({'versions': {'values': versions}})
V3_VERSION_LIST = _create_version_list([V3_VERSION, V2_VERSION])
V3_VERSION_ENTRY = _create_version_entry(V3_VERSION)
V2_VERSION_ENTRY = _create_version_entry(V2_VERSION)
def get_response(status_code, headers=None):
response = mox.Mox().CreateMock(requests.Response)
response.headers = headers or {}
response.status_code = status_code
return response
def setup_keystone_v2():
v2_token = ks_v2_fixture.Token(token_id=TOKENID)
service = v2_token.add_service('network')
service.add_endpoint(PUBLIC_ENDPOINT_URL, region=REGION)
httpretty.register_uri(httpretty.POST,
'%s/tokens' % (V2_URL),
body=json.dumps(v2_token))
auth_session = session.Session()
auth_plugin = ks_v2_auth.Password(V2_URL, 'xx', 'xx')
return auth_session, auth_plugin
def setup_keystone_v3():
httpretty.register_uri(httpretty.GET,
V3_URL,
body=V3_VERSION_ENTRY)
v3_token = ks_v3_fixture.Token()
service = v3_token.add_service('network')
service.add_standard_endpoints(public=PUBLIC_ENDPOINT_URL,
admin=ADMIN_ENDPOINT_URL,
internal=INTERNAL_ENDPOINT_URL,
region=REGION)
httpretty.register_uri(httpretty.POST,
'%s/auth/tokens' % (V3_URL),
body=json.dumps(v3_token),
adding_headers={'X-Subject-Token': TOKENID})
auth_session = session.Session()
auth_plugin = ks_v3_auth.Password(V3_URL,
username='xx',
user_id='xx',
user_domain_name='xx',
user_domain_id='xx')
return auth_session, auth_plugin
AUTH_URL = V2_URL
class CLITestAuthNoAuth(testtools.TestCase):
def setUp(self):
"""Prepare the test environment."""
super(CLITestAuthNoAuth, self).setUp()
self.mox = mox.Mox()
self.client = client.HTTPClient(username=USERNAME,
tenant_name=TENANT_NAME,
password=PASSWORD,
endpoint_url=ENDPOINT_URL,
auth_strategy=NOAUTH,
region_name=REGION)
self.addCleanup(self.mox.VerifyAll)
self.addCleanup(self.mox.UnsetStubs)
def test_get_noauth(self):
self.mox.StubOutWithMock(self.client, "request")
res200 = get_response(200)
self.client.request(
mox.StrContains(ENDPOINT_URL + '/resource'), 'GET',
headers=mox.IsA(dict),
).AndReturn((res200, ''))
self.mox.ReplayAll()
self.client.do_request('/resource', 'GET')
self.assertEqual(self.client.endpoint_url, ENDPOINT_URL)
class CLITestAuthKeystone(testtools.TestCase):
def setUp(self):
"""Prepare the test environment."""
super(CLITestAuthKeystone, self).setUp()
self.mox = mox.Mox()
for var in ('http_proxy', 'HTTP_PROXY'):
self.useFixture(fixtures.EnvironmentVariableFixture(var))
self.client = client.construct_http_client(
username=USERNAME,
tenant_name=TENANT_NAME,
password=PASSWORD,
auth_url=AUTH_URL,
region_name=REGION)
self.addCleanup(self.mox.VerifyAll)
self.addCleanup(self.mox.UnsetStubs)
def test_reused_token_get_auth_info(self):
"""Test that Client.get_auth_info() works even if client was
instantiated with predefined token.
"""
client_ = client.HTTPClient(username=USERNAME,
tenant_name=TENANT_NAME,
token=TOKEN,
password=PASSWORD,
auth_url=AUTH_URL,
region_name=REGION)
expected = {'auth_token': TOKEN,
'auth_tenant_id': None,
'auth_user_id': None,
'endpoint_url': self.client.endpoint_url}
self.assertEqual(client_.get_auth_info(), expected)
@httpretty.activate
def test_get_token(self):
auth_session, auth_plugin = setup_keystone_v2()
self.client = client.construct_http_client(
username=USERNAME,
tenant_name=TENANT_NAME,
password=PASSWORD,
auth_url=AUTH_URL,
region_name=REGION,
session=auth_session,
auth=auth_plugin)
self.mox.StubOutWithMock(self.client, "request")
res200 = get_response(200)
self.client.request(
'/resource', 'GET',
authenticated=True
).AndReturn((res200, ''))
self.mox.ReplayAll()
self.client.do_request('/resource', 'GET')
def test_refresh_token(self):
self.mox.StubOutWithMock(self.client, "request")
self.client.auth_token = TOKEN
self.client.endpoint_url = ENDPOINT_URL
res200 = get_response(200)
res401 = get_response(401)
# If a token is expired, neutron server retruns 401
self.client.request(
mox.StrContains(ENDPOINT_URL + '/resource'), 'GET',
headers=mox.ContainsKeyValue('X-Auth-Token', TOKEN)
).AndReturn((res401, ''))
self.client.request(
AUTH_URL + '/tokens', 'POST',
body=mox.IsA(str), headers=mox.IsA(dict)
).AndReturn((res200, json.dumps(KS_TOKEN_RESULT)))
self.client.request(
mox.StrContains(ENDPOINT_URL + '/resource'), 'GET',
headers=mox.ContainsKeyValue('X-Auth-Token', TOKEN)
).AndReturn((res200, ''))
self.mox.ReplayAll()
self.client.do_request('/resource', 'GET')
def test_refresh_token_no_auth_url(self):
self.mox.StubOutWithMock(self.client, "request")
self.client.auth_url = None
self.client.auth_token = TOKEN
self.client.endpoint_url = ENDPOINT_URL
res401 = get_response(401)
# If a token is expired, neutron server returns 401
self.client.request(
mox.StrContains(ENDPOINT_URL + '/resource'), 'GET',
headers=mox.ContainsKeyValue('X-Auth-Token', TOKEN)
).AndReturn((res401, ''))
self.mox.ReplayAll()
self.assertRaises(exceptions.NoAuthURLProvided,
self.client.do_request,
'/resource',
'GET')
def test_get_endpoint_url_with_invalid_auth_url(self):
# Handle the case when auth_url is not provided
self.client.auth_url = None
self.assertRaises(exceptions.NoAuthURLProvided,
self.client._get_endpoint_url)
def test_get_endpoint_url(self):
self.mox.StubOutWithMock(self.client, "request")
self.client.auth_token = TOKEN
res200 = get_response(200)
self.client.request(
mox.StrContains(AUTH_URL + '/tokens/%s/endpoints' % TOKEN), 'GET',
headers=mox.IsA(dict)
).AndReturn((res200, json.dumps(ENDPOINTS_RESULT)))
self.client.request(
mox.StrContains(ENDPOINT_URL + '/resource'), 'GET',
headers=mox.ContainsKeyValue('X-Auth-Token', TOKEN)
).AndReturn((res200, ''))
self.mox.ReplayAll()
self.client.do_request('/resource', 'GET')
def test_use_given_endpoint_url(self):
self.client = client.HTTPClient(
username=USERNAME, tenant_name=TENANT_NAME, password=PASSWORD,
auth_url=AUTH_URL, region_name=REGION,
endpoint_url=ENDPOINT_OVERRIDE)
self.assertEqual(self.client.endpoint_url, ENDPOINT_OVERRIDE)
self.mox.StubOutWithMock(self.client, "request")
self.client.auth_token = TOKEN
res200 = get_response(200)
self.client.request(
mox.StrContains(ENDPOINT_OVERRIDE + '/resource'), 'GET',
headers=mox.ContainsKeyValue('X-Auth-Token', TOKEN)
).AndReturn((res200, ''))
self.mox.ReplayAll()
self.client.do_request('/resource', 'GET')
self.assertEqual(self.client.endpoint_url, ENDPOINT_OVERRIDE)
def test_get_endpoint_url_other(self):
self.client = client.HTTPClient(
username=USERNAME, tenant_name=TENANT_NAME, password=PASSWORD,
auth_url=AUTH_URL, region_name=REGION, endpoint_type='otherURL')
self.mox.StubOutWithMock(self.client, "request")
self.client.auth_token = TOKEN
res200 = get_response(200)
self.client.request(
mox.StrContains(AUTH_URL + '/tokens/%s/endpoints' % TOKEN), 'GET',
headers=mox.IsA(dict)
).AndReturn((res200, json.dumps(ENDPOINTS_RESULT)))
self.mox.ReplayAll()
self.assertRaises(exceptions.EndpointTypeNotFound,
self.client.do_request,
'/resource',
'GET')
def test_get_endpoint_url_failed(self):
self.mox.StubOutWithMock(self.client, "request")
self.client.auth_token = TOKEN
res200 = get_response(200)
res401 = get_response(401)
self.client.request(
mox.StrContains(AUTH_URL + '/tokens/%s/endpoints' % TOKEN), 'GET',
headers=mox.IsA(dict)
).AndReturn((res401, ''))
self.client.request(
AUTH_URL + '/tokens', 'POST',
body=mox.IsA(str), headers=mox.IsA(dict)
).AndReturn((res200, json.dumps(KS_TOKEN_RESULT)))
self.client.request(
mox.StrContains(ENDPOINT_URL + '/resource'), 'GET',
headers=mox.ContainsKeyValue('X-Auth-Token', TOKEN)
).AndReturn((res200, ''))
self.mox.ReplayAll()
self.client.do_request('/resource', 'GET')
@httpretty.activate
def test_endpoint_type(self):
auth_session, auth_plugin = setup_keystone_v3()
# Test default behavior is to choose public.
self.client = client.construct_http_client(
username=USERNAME, tenant_name=TENANT_NAME, password=PASSWORD,
auth_url=AUTH_URL, region_name=REGION,
session=auth_session, auth=auth_plugin)
self.client.authenticate()
self.assertEqual(self.client.endpoint_url, PUBLIC_ENDPOINT_URL)
# Test admin url
self.client = client.construct_http_client(
username=USERNAME, tenant_name=TENANT_NAME, password=PASSWORD,
auth_url=AUTH_URL, region_name=REGION, endpoint_type='adminURL',
session=auth_session, auth=auth_plugin)
self.client.authenticate()
self.assertEqual(self.client.endpoint_url, ADMIN_ENDPOINT_URL)
# Test public url
self.client = client.construct_http_client(
username=USERNAME, tenant_name=TENANT_NAME, password=PASSWORD,
auth_url=AUTH_URL, region_name=REGION, endpoint_type='publicURL',
session=auth_session, auth=auth_plugin)
self.client.authenticate()
self.assertEqual(self.client.endpoint_url, PUBLIC_ENDPOINT_URL)
# Test internal url
self.client = client.construct_http_client(
username=USERNAME, tenant_name=TENANT_NAME, password=PASSWORD,
auth_url=AUTH_URL, region_name=REGION, endpoint_type='internalURL',
session=auth_session, auth=auth_plugin)
self.client.authenticate()
self.assertEqual(self.client.endpoint_url, INTERNAL_ENDPOINT_URL)
# Test url that isn't found in the service catalog
self.client = client.construct_http_client(
username=USERNAME, tenant_name=TENANT_NAME, password=PASSWORD,
auth_url=AUTH_URL, region_name=REGION, endpoint_type='privateURL',
session=auth_session, auth=auth_plugin)
self.assertRaises(
ks_exceptions.EndpointNotFound,
self.client.authenticate)
def test_strip_credentials_from_log(self):
def verify_no_credentials(kwargs):
return ('REDACTED' in kwargs['body']) and (
self.client.password not in kwargs['body'])
def verify_credentials(body):
return 'REDACTED' not in body and self.client.password in body
self.mox.StubOutWithMock(self.client, "request")
self.mox.StubOutWithMock(utils, "http_log_req")
res200 = get_response(200)
utils.http_log_req(mox.IgnoreArg(), mox.IgnoreArg(), mox.Func(
verify_no_credentials))
self.client.request(
mox.IsA(six.string_types), mox.IsA(six.string_types),
body=mox.Func(verify_credentials),
headers=mox.IgnoreArg()
).AndReturn((res200, json.dumps(KS_TOKEN_RESULT)))
utils.http_log_req(mox.IgnoreArg(), mox.IgnoreArg(), mox.IgnoreArg())
self.client.request(
mox.IsA(six.string_types), mox.IsA(six.string_types),
headers=mox.IsA(dict)
).AndReturn((res200, ''))
self.mox.ReplayAll()
self.client.do_request('/resource', 'GET')
class CLITestAuthKeystoneWithId(CLITestAuthKeystone):
def setUp(self):
"""Prepare the test environment."""
super(CLITestAuthKeystoneWithId, self).setUp()
self.client = client.HTTPClient(user_id=USER_ID,
tenant_id=TENANT_ID,
password=PASSWORD,
auth_url=AUTH_URL,
region_name=REGION)
class CLITestAuthKeystoneWithIdandName(CLITestAuthKeystone):
def setUp(self):
"""Prepare the test environment."""
super(CLITestAuthKeystoneWithIdandName, self).setUp()
self.client = client.HTTPClient(username=USERNAME,
user_id=USER_ID,
tenant_id=TENANT_ID,
tenant_name=TENANT_NAME,
password=PASSWORD,
auth_url=AUTH_URL,
region_name=REGION)
class TestKeystoneClientVersions(testtools.TestCase):
def setUp(self):
"""Prepare the test environment."""
super(TestKeystoneClientVersions, self).setUp()
self.mox = mox.Mox()
self.addCleanup(self.mox.VerifyAll)
self.addCleanup(self.mox.UnsetStubs)
@httpretty.activate
def test_v2_auth(self):
auth_session, auth_plugin = setup_keystone_v2()
res200 = get_response(200)
self.client = client.construct_http_client(
username=USERNAME,
tenant_name=TENANT_NAME,
password=PASSWORD,
auth_url=AUTH_URL,
region_name=REGION,
session=auth_session,
auth=auth_plugin)
self.mox.StubOutWithMock(self.client, "request")
self.client.request(
'/resource', 'GET',
authenticated=True
).AndReturn((res200, ''))
self.mox.ReplayAll()
self.client.do_request('/resource', 'GET')
@httpretty.activate
def test_v3_auth(self):
auth_session, auth_plugin = setup_keystone_v3()
res200 = get_response(200)
self.client = client.construct_http_client(
user_id=USER_ID,
tenant_id=TENANT_ID,
password=PASSWORD,
auth_url=V3_URL,
region_name=REGION,
session=auth_session,
auth=auth_plugin)
self.mox.StubOutWithMock(self.client, "request")
self.client.request(
'/resource', 'GET',
authenticated=True
).AndReturn((res200, ''))
self.mox.ReplayAll()
self.client.do_request('/resource', 'GET')

View File

@ -0,0 +1,766 @@
# Copyright 2012 OpenStack Foundation.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import contextlib
import itertools
import sys
import urllib
import fixtures
from mox3 import mox
from oslotest import base
import requests
import six
import six.moves.urllib.parse as urlparse
from neutronclient.common import constants
from neutronclient.common import exceptions
from neutronclient.neutron import v2_0 as neutronV2_0
from neutronclient import shell
from neutronclient.v2_0 import client
API_VERSION = "2.0"
FORMAT = 'json'
TOKEN = 'testtoken'
ENDURL = 'localurl'
@contextlib.contextmanager
def capture_std_streams():
fake_stdout, fake_stderr = six.StringIO(), six.StringIO()
stdout, stderr = sys.stdout, sys.stderr
try:
sys.stdout, sys.stderr = fake_stdout, fake_stderr
yield fake_stdout, fake_stderr
finally:
sys.stdout, sys.stderr = stdout, stderr
class FakeStdout:
def __init__(self):
self.content = []
def write(self, text):
self.content.append(text)
def make_string(self):
result = ''
for line in self.content:
result = result + line
return result
class MyResp(object):
def __init__(self, status_code, headers=None, reason=None):
self.status_code = status_code
self.headers = headers or {}
self.reason = reason
class MyApp(object):
def __init__(self, _stdout):
self.stdout = _stdout
def end_url(path, query=None, format=FORMAT):
_url_str = ENDURL + "/v" + API_VERSION + path + "." + format
return query and _url_str + "?" + query or _url_str
class MyUrlComparator(mox.Comparator):
def __init__(self, lhs, client):
self.lhs = lhs
self.client = client
def equals(self, rhs):
lhsp = urlparse.urlparse(self.lhs)
rhsp = urlparse.urlparse(rhs)
return (lhsp.scheme == rhsp.scheme and
lhsp.netloc == rhsp.netloc and
lhsp.path == rhsp.path and
urlparse.parse_qs(lhsp.query) == urlparse.parse_qs(rhsp.query))
def __str__(self):
if self.client and self.client.format != FORMAT:
lhs_parts = self.lhs.split("?", 1)
if len(lhs_parts) == 2:
lhs = ("%s.%s?%s" % (lhs_parts[0][:-4],
self.client.format,
lhs_parts[1]))
else:
lhs = ("%s.%s" % (lhs_parts[0][:-4],
self.client.format))
return lhs
return self.lhs
def __repr__(self):
return str(self)
class MyComparator(mox.Comparator):
def __init__(self, lhs, client):
self.lhs = lhs
self.client = client
def _com_dict(self, lhs, rhs):
if len(lhs) != len(rhs):
return False
for key, value in six.iteritems(lhs):
if key not in rhs:
return False
rhs_value = rhs[key]
if not self._com(value, rhs_value):
return False
return True
def _com_list(self, lhs, rhs):
if len(lhs) != len(rhs):
return False
for lhs_value in lhs:
if lhs_value not in rhs:
return False
return True
def _com(self, lhs, rhs):
if lhs is None:
return rhs is None
if isinstance(lhs, dict):
if not isinstance(rhs, dict):
return False
return self._com_dict(lhs, rhs)
if isinstance(lhs, list):
if not isinstance(rhs, list):
return False
return self._com_list(lhs, rhs)
if isinstance(lhs, tuple):
if not isinstance(rhs, tuple):
return False
return self._com_list(lhs, rhs)
return lhs == rhs
def equals(self, rhs):
if self.client:
rhs = self.client.deserialize(rhs, 200)
return self._com(self.lhs, rhs)
def __repr__(self):
if self.client:
return self.client.serialize(self.lhs)
return str(self.lhs)
class CLITestV20Base(base.BaseTestCase):
format = 'json'
test_id = 'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa'
id_field = 'id'
def _find_resourceid(self, client, resource, name_or_id,
cmd_resource=None, parent_id=None):
return name_or_id
def _get_attr_metadata(self):
return self.metadata
client.Client.EXTED_PLURALS.update(constants.PLURALS)
client.Client.EXTED_PLURALS.update({'tags': 'tag'})
return {'plurals': client.Client.EXTED_PLURALS,
'xmlns': constants.XML_NS_V20,
constants.EXT_NS: {'prefix': 'http://xxxx.yy.com'}}
def setUp(self, plurals=None):
"""Prepare the test environment."""
super(CLITestV20Base, self).setUp()
client.Client.EXTED_PLURALS.update(constants.PLURALS)
if plurals is not None:
client.Client.EXTED_PLURALS.update(plurals)
self.metadata = {'plurals': client.Client.EXTED_PLURALS,
'xmlns': constants.XML_NS_V20,
constants.EXT_NS: {'prefix':
'http://xxxx.yy.com'}}
self.mox = mox.Mox()
self.endurl = ENDURL
self.fake_stdout = FakeStdout()
self.useFixture(fixtures.MonkeyPatch('sys.stdout', self.fake_stdout))
self.useFixture(fixtures.MonkeyPatch(
'neutronclient.neutron.v2_0.find_resourceid_by_name_or_id',
self._find_resourceid))
self.useFixture(fixtures.MonkeyPatch(
'neutronclient.neutron.v2_0.find_resourceid_by_id',
self._find_resourceid))
self.useFixture(fixtures.MonkeyPatch(
'neutronclient.v2_0.client.Client.get_attr_metadata',
self._get_attr_metadata))
self.client = client.Client(token=TOKEN, endpoint_url=self.endurl)
def _test_create_resource(self, resource, cmd, name, myid, args,
position_names, position_values,
tenant_id=None, tags=None, admin_state_up=True,
extra_body=None, cmd_resource=None,
parent_id=None, **kwargs):
self.mox.StubOutWithMock(cmd, "get_client")
self.mox.StubOutWithMock(self.client.httpclient, "request")
cmd.get_client().MultipleTimes().AndReturn(self.client)
non_admin_status_resources = ['subnet', 'floatingip', 'security_group',
'security_group_rule', 'qos_queue',
'network_gateway', 'gateway_device',
'credential', 'network_profile',
'policy_profile', 'ikepolicy',
'ipsecpolicy', 'metering_label',
'metering_label_rule', 'net_partition']
if not cmd_resource:
cmd_resource = resource
if (resource in non_admin_status_resources):
body = {resource: {}, }
else:
body = {resource: {'admin_state_up': admin_state_up, }, }
if tenant_id:
body[resource].update({'tenant_id': tenant_id})
if tags:
body[resource].update({'tags': tags})
if extra_body:
body[resource].update(extra_body)
body[resource].update(kwargs)
for i in range(len(position_names)):
body[resource].update({position_names[i]: position_values[i]})
ress = {resource:
{self.id_field: myid}, }
if name:
ress[resource].update({'name': name})
self.client.format = self.format
resstr = self.client.serialize(ress)
# url method body
resource_plural = neutronV2_0._get_resource_plural(cmd_resource,
self.client)
path = getattr(self.client, resource_plural + "_path")
if parent_id:
path = path % parent_id
# Work around for LP #1217791. XML deserializer called from
# MyComparator does not decodes XML string correctly.
if self.format == 'json':
mox_body = MyComparator(body, self.client)
else:
mox_body = self.client.serialize(body)
self.client.httpclient.request(
end_url(path, format=self.format), 'POST',
body=mox_body,
headers=mox.ContainsKeyValue(
'X-Auth-Token', TOKEN)).AndReturn((MyResp(200), resstr))
args.extend(['--request-format', self.format])
self.mox.ReplayAll()
cmd_parser = cmd.get_parser('create_' + resource)
shell.run_command(cmd, cmd_parser, args)
self.mox.VerifyAll()
self.mox.UnsetStubs()
_str = self.fake_stdout.make_string()
self.assertIn(myid, _str)
if name:
self.assertIn(name, _str)
def _test_list_columns(self, cmd, resources,
resources_out, args=('-f', 'json'),
cmd_resources=None, parent_id=None):
self.mox.StubOutWithMock(cmd, "get_client")
self.mox.StubOutWithMock(self.client.httpclient, "request")
cmd.get_client().MultipleTimes().AndReturn(self.client)
self.client.format = self.format
if not cmd_resources:
cmd_resources = resources
resstr = self.client.serialize(resources_out)
path = getattr(self.client, cmd_resources + "_path")
if parent_id:
path = path % parent_id
self.client.httpclient.request(
end_url(path, format=self.format), 'GET',
body=None,
headers=mox.ContainsKeyValue(
'X-Auth-Token', TOKEN)).AndReturn((MyResp(200), resstr))
args = tuple(args) + ('--request-format', self.format)
self.mox.ReplayAll()
cmd_parser = cmd.get_parser("list_" + cmd_resources)
shell.run_command(cmd, cmd_parser, args)
self.mox.VerifyAll()
self.mox.UnsetStubs()
def _test_list_resources(self, resources, cmd, detail=False, tags=(),
fields_1=(), fields_2=(), page_size=None,
sort_key=(), sort_dir=(), response_contents=None,
base_args=None, path=None, cmd_resources=None,
parent_id=None):
self.mox.StubOutWithMock(cmd, "get_client")
self.mox.StubOutWithMock(self.client.httpclient, "request")
cmd.get_client().MultipleTimes().AndReturn(self.client)
if not cmd_resources:
cmd_resources = resources
if response_contents is None:
contents = [{self.id_field: 'myid1', },
{self.id_field: 'myid2', }, ]
else:
contents = response_contents
reses = {resources: contents}
self.client.format = self.format
resstr = self.client.serialize(reses)
# url method body
query = ""
args = base_args if base_args is not None else []
if detail:
args.append('-D')
args.extend(['--request-format', self.format])
if fields_1:
for field in fields_1:
args.append('--fields')
args.append(field)
if tags:
args.append('--')
args.append("--tag")
for tag in tags:
args.append(tag)
if isinstance(tag, unicode):
tag = urllib.quote(tag.encode('utf-8'))
if query:
query += "&tag=" + tag
else:
query = "tag=" + tag
if (not tags) and fields_2:
args.append('--')
if fields_2:
args.append("--fields")
for field in fields_2:
args.append(field)
if detail:
query = query and query + '&verbose=True' or 'verbose=True'
for field in itertools.chain(fields_1, fields_2):
if query:
query += "&fields=" + field
else:
query = "fields=" + field
if page_size:
args.append("--page-size")
args.append(str(page_size))
if query:
query += "&limit=%s" % page_size
else:
query = "limit=%s" % page_size
if sort_key:
for key in sort_key:
args.append('--sort-key')
args.append(key)
if query:
query += '&'
query += 'sort_key=%s' % key
if sort_dir:
len_diff = len(sort_key) - len(sort_dir)
if len_diff > 0:
sort_dir = tuple(sort_dir) + ('asc',) * len_diff
elif len_diff < 0:
sort_dir = sort_dir[:len(sort_key)]
for dir in sort_dir:
args.append('--sort-dir')
args.append(dir)
if query:
query += '&'
query += 'sort_dir=%s' % dir
if path is None:
path = getattr(self.client, cmd_resources + "_path")
if parent_id:
path = path % parent_id
self.client.httpclient.request(
MyUrlComparator(end_url(path, query, format=self.format),
self.client),
'GET',
body=None,
headers=mox.ContainsKeyValue(
'X-Auth-Token', TOKEN)).AndReturn((MyResp(200), resstr))
self.mox.ReplayAll()
cmd_parser = cmd.get_parser("list_" + cmd_resources)
shell.run_command(cmd, cmd_parser, args)
self.mox.VerifyAll()
self.mox.UnsetStubs()
_str = self.fake_stdout.make_string()
if response_contents is None:
self.assertIn('myid1', _str)
return _str
def _test_list_resources_with_pagination(self, resources, cmd,
cmd_resources=None,
parent_id=None):
self.mox.StubOutWithMock(cmd, "get_client")
self.mox.StubOutWithMock(self.client.httpclient, "request")
cmd.get_client().MultipleTimes().AndReturn(self.client)
if not cmd_resources:
cmd_resources = resources
path = getattr(self.client, cmd_resources + "_path")
if parent_id:
path = path % parent_id
fake_query = "marker=myid2&limit=2"
reses1 = {resources: [{'id': 'myid1', },
{'id': 'myid2', }],
'%s_links' % resources: [{'href': end_url(path, fake_query),
'rel': 'next'}]}
reses2 = {resources: [{'id': 'myid3', },
{'id': 'myid4', }]}
self.client.format = self.format
resstr1 = self.client.serialize(reses1)
resstr2 = self.client.serialize(reses2)
self.client.httpclient.request(
end_url(path, "", format=self.format), 'GET',
body=None,
headers=mox.ContainsKeyValue(
'X-Auth-Token', TOKEN)).AndReturn((MyResp(200), resstr1))
self.client.httpclient.request(
MyUrlComparator(end_url(path, fake_query, format=self.format),
self.client), 'GET',
body=None,
headers=mox.ContainsKeyValue(
'X-Auth-Token', TOKEN)).AndReturn((MyResp(200), resstr2))
self.mox.ReplayAll()
cmd_parser = cmd.get_parser("list_" + cmd_resources)
args = ['--request-format', self.format]
shell.run_command(cmd, cmd_parser, args)
self.mox.VerifyAll()
self.mox.UnsetStubs()
def _test_update_resource(self, resource, cmd, myid, args, extrafields,
cmd_resource=None, parent_id=None):
self.mox.StubOutWithMock(cmd, "get_client")
self.mox.StubOutWithMock(self.client.httpclient, "request")
cmd.get_client().MultipleTimes().AndReturn(self.client)
if not cmd_resource:
cmd_resource = resource
body = {resource: extrafields}
path = getattr(self.client, cmd_resource + "_path")
if parent_id:
path = path % (parent_id, myid)
else:
path = path % myid
self.client.format = self.format
# Work around for LP #1217791. XML deserializer called from
# MyComparator does not decodes XML string correctly.
if self.format == 'json':
mox_body = MyComparator(body, self.client)
else:
mox_body = self.client.serialize(body)
self.client.httpclient.request(
MyUrlComparator(end_url(path, format=self.format),
self.client),
'PUT',
body=mox_body,
headers=mox.ContainsKeyValue(
'X-Auth-Token', TOKEN)).AndReturn((MyResp(204), None))
args.extend(['--request-format', self.format])
self.mox.ReplayAll()
cmd_parser = cmd.get_parser("update_" + cmd_resource)
shell.run_command(cmd, cmd_parser, args)
self.mox.VerifyAll()
self.mox.UnsetStubs()
_str = self.fake_stdout.make_string()
self.assertIn(myid, _str)
def _test_show_resource(self, resource, cmd, myid, args, fields=(),
cmd_resource=None, parent_id=None):
self.mox.StubOutWithMock(cmd, "get_client")
self.mox.StubOutWithMock(self.client.httpclient, "request")
cmd.get_client().MultipleTimes().AndReturn(self.client)
if not cmd_resource:
cmd_resource = resource
query = "&".join(["fields=%s" % field for field in fields])
expected_res = {resource:
{self.id_field: myid,
'name': 'myname', }, }
self.client.format = self.format
resstr = self.client.serialize(expected_res)
path = getattr(self.client, cmd_resource + "_path")
if parent_id:
path = path % (parent_id, myid)
else:
path = path % myid
self.client.httpclient.request(
end_url(path, query, format=self.format), 'GET',
body=None,
headers=mox.ContainsKeyValue(
'X-Auth-Token', TOKEN)).AndReturn((MyResp(200), resstr))
args.extend(['--request-format', self.format])
self.mox.ReplayAll()
cmd_parser = cmd.get_parser("show_" + cmd_resource)
shell.run_command(cmd, cmd_parser, args)
self.mox.VerifyAll()
self.mox.UnsetStubs()
_str = self.fake_stdout.make_string()
self.assertIn(myid, _str)
self.assertIn('myname', _str)
def _test_delete_resource(self, resource, cmd, myid, args,
cmd_resource=None, parent_id=None):
self.mox.StubOutWithMock(cmd, "get_client")
self.mox.StubOutWithMock(self.client.httpclient, "request")
cmd.get_client().MultipleTimes().AndReturn(self.client)
if not cmd_resource:
cmd_resource = resource
path = getattr(self.client, cmd_resource + "_path")
if parent_id:
path = path % (parent_id, myid)
else:
path = path % (myid)
self.client.httpclient.request(
end_url(path, format=self.format), 'DELETE',
body=None,
headers=mox.ContainsKeyValue(
'X-Auth-Token', TOKEN)).AndReturn((MyResp(204), None))
args.extend(['--request-format', self.format])
self.mox.ReplayAll()
cmd_parser = cmd.get_parser("delete_" + cmd_resource)
shell.run_command(cmd, cmd_parser, args)
self.mox.VerifyAll()
self.mox.UnsetStubs()
_str = self.fake_stdout.make_string()
self.assertIn(myid, _str)
def _test_update_resource_action(self, resource, cmd, myid, action, args,
body, retval=None, cmd_resource=None):
self.mox.StubOutWithMock(cmd, "get_client")
self.mox.StubOutWithMock(self.client.httpclient, "request")
cmd.get_client().MultipleTimes().AndReturn(self.client)
if not cmd_resource:
cmd_resource = resource
path = getattr(self.client, cmd_resource + "_path")
path_action = '%s/%s' % (myid, action)
self.client.httpclient.request(
end_url(path % path_action, format=self.format), 'PUT',
body=MyComparator(body, self.client),
headers=mox.ContainsKeyValue(
'X-Auth-Token', TOKEN)).AndReturn((MyResp(204), retval))
args.extend(['--request-format', self.format])
self.mox.ReplayAll()
cmd_parser = cmd.get_parser("delete_" + cmd_resource)
shell.run_command(cmd, cmd_parser, args)
self.mox.VerifyAll()
self.mox.UnsetStubs()
_str = self.fake_stdout.make_string()
self.assertIn(myid, _str)
class ClientV2TestJson(CLITestV20Base):
def test_do_request_unicode(self):
self.client.format = self.format
self.mox.StubOutWithMock(self.client.httpclient, "request")
unicode_text = u'\u7f51\u7edc'
# url with unicode
action = u'/test'
expected_action = action.encode('utf-8')
# query string with unicode
params = {'test': unicode_text}
expect_query = urllib.urlencode({'test':
unicode_text.encode('utf-8')})
# request body with unicode
body = params
expect_body = self.client.serialize(body)
# headers with unicode
self.client.httpclient.auth_token = unicode_text
expected_auth_token = unicode_text.encode('utf-8')
self.client.httpclient.request(
end_url(expected_action, query=expect_query, format=self.format),
'PUT', body=expect_body,
headers=mox.ContainsKeyValue(
'X-Auth-Token',
expected_auth_token)).AndReturn((MyResp(200), expect_body))
self.mox.ReplayAll()
res_body = self.client.do_request('PUT', action, body=body,
params=params)
self.mox.VerifyAll()
self.mox.UnsetStubs()
# test response with unicode
self.assertEqual(res_body, body)
def test_do_request_error_without_response_body(self):
self.client.format = self.format
self.mox.StubOutWithMock(self.client.httpclient, "request")
params = {'test': 'value'}
expect_query = six.moves.urllib.parse.urlencode(params)
self.client.httpclient.auth_token = 'token'
self.client.httpclient.request(
MyUrlComparator(end_url(
'/test', query=expect_query, format=self.format), self.client),
'PUT', body='',
headers=mox.ContainsKeyValue('X-Auth-Token', 'token')
).AndReturn((MyResp(400, reason='An error'), ''))
self.mox.ReplayAll()
error = self.assertRaises(exceptions.NeutronClientException,
self.client.do_request, 'PUT', '/test',
body='', params=params)
self.assertEqual("An error", str(error))
self.mox.VerifyAll()
self.mox.UnsetStubs()
class ClientV2UnicodeTestXML(ClientV2TestJson):
format = 'xml'
class CLITestV20ExceptionHandler(CLITestV20Base):
def _test_exception_handler_v20(
self, expected_exception, status_code, expected_msg,
error_type=None, error_msg=None, error_detail=None,
error_content=None):
if error_content is None:
error_content = {'NeutronError': {'type': error_type,
'message': error_msg,
'detail': error_detail}}
e = self.assertRaises(expected_exception,
client.exception_handler_v20,
status_code, error_content)
self.assertEqual(status_code, e.status_code)
if expected_msg is None:
if error_detail:
expected_msg = '\n'.join([error_msg, error_detail])
else:
expected_msg = error_msg
self.assertEqual(expected_msg, e.message)
def test_exception_handler_v20_ip_address_in_use(self):
err_msg = ('Unable to complete operation for network '
'fake-network-uuid. The IP address fake-ip is in use.')
self._test_exception_handler_v20(
exceptions.IpAddressInUseClient, 409, err_msg,
'IpAddressInUse', err_msg, '')
def test_exception_handler_v20_neutron_known_error(self):
known_error_map = [
('NetworkNotFound', exceptions.NetworkNotFoundClient, 404),
('PortNotFound', exceptions.PortNotFoundClient, 404),
('NetworkInUse', exceptions.NetworkInUseClient, 409),
('PortInUse', exceptions.PortInUseClient, 409),
('StateInvalid', exceptions.StateInvalidClient, 400),
('IpAddressInUse', exceptions.IpAddressInUseClient, 409),
('IpAddressGenerationFailure',
exceptions.IpAddressGenerationFailureClient, 409),
('MacAddressInUse', exceptions.MacAddressInUseClient, 409),
('ExternalIpAddressExhausted',
exceptions.ExternalIpAddressExhaustedClient, 400),
('OverQuota', exceptions.OverQuotaClient, 409),
]
error_msg = 'dummy exception message'
error_detail = 'sample detail'
for server_exc, client_exc, status_code in known_error_map:
self._test_exception_handler_v20(
client_exc, status_code,
error_msg + '\n' + error_detail,
server_exc, error_msg, error_detail)
def test_exception_handler_v20_neutron_known_error_without_detail(self):
error_msg = 'Network not found'
error_detail = ''
self._test_exception_handler_v20(
exceptions.NetworkNotFoundClient, 404,
error_msg,
'NetworkNotFound', error_msg, error_detail)
def test_exception_handler_v20_unknown_error_to_per_code_exception(self):
for status_code, client_exc in exceptions.HTTP_EXCEPTION_MAP.items():
error_msg = 'Unknown error'
error_detail = 'This is detail'
self._test_exception_handler_v20(
client_exc, status_code,
error_msg + '\n' + error_detail,
'UnknownError', error_msg, error_detail)
def test_exception_handler_v20_neutron_unknown_status_code(self):
error_msg = 'Unknown error'
error_detail = 'This is detail'
self._test_exception_handler_v20(
exceptions.NeutronClientException, 501,
error_msg + '\n' + error_detail,
'UnknownError', error_msg, error_detail)
def test_exception_handler_v20_bad_neutron_error(self):
error_content = {'NeutronError': {'unknown_key': 'UNKNOWN'}}
self._test_exception_handler_v20(
exceptions.NeutronClientException, 500,
expected_msg={'unknown_key': 'UNKNOWN'},
error_content=error_content)
def test_exception_handler_v20_error_dict_contains_message(self):
error_content = {'message': 'This is an error message'}
self._test_exception_handler_v20(
exceptions.NeutronClientException, 500,
expected_msg='This is an error message',
error_content=error_content)
def test_exception_handler_v20_error_dict_not_contain_message(self):
error_content = {'error': 'This is an error message'}
expected_msg = '%s-%s' % (500, error_content)
self._test_exception_handler_v20(
exceptions.NeutronClientException, 500,
expected_msg=expected_msg,
error_content=error_content)
def test_exception_handler_v20_default_fallback(self):
error_content = 'This is an error message'
expected_msg = '%s-%s' % (500, error_content)
self._test_exception_handler_v20(
exceptions.NeutronClientException, 500,
expected_msg=expected_msg,
error_content=error_content)
def test_exception_status(self):
e = exceptions.BadRequest()
self.assertEqual(e.status_code, 400)
e = exceptions.BadRequest(status_code=499)
self.assertEqual(e.status_code, 499)
# SslCertificateValidationError has no explicit status_code,
# but should have a 'safe' defined fallback.
e = exceptions.SslCertificateValidationError()
self.assertIsNotNone(e.status_code)
e = exceptions.SslCertificateValidationError(status_code=599)
self.assertEqual(e.status_code, 599)
def test_connection_failed(self):
self.mox.StubOutWithMock(self.client.httpclient, 'request')
self.client.httpclient.auth_token = 'token'
self.client.httpclient.request(
end_url('/test'), 'GET',
headers=mox.ContainsKeyValue('X-Auth-Token', 'token')
).AndRaise(requests.exceptions.ConnectionError('Connection refused'))
self.mox.ReplayAll()
error = self.assertRaises(exceptions.ConnectionFailed,
self.client.get, '/test')
# NB: ConnectionFailed has no explicit status_code, so this
# tests that there is a fallback defined.
self.assertIsNotNone(error.status_code)
self.mox.VerifyAll()
self.mox.UnsetStubs()

19
gbpclient/version.py Normal file
View File

@ -0,0 +1,19 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# vim: tabstop=4 shiftwidth=4 softtabstop=4
import pbr.version
__version__ = pbr.version.VersionInfo(
'python-group-based-policy-client').version_string()

7
openstack-common.conf Normal file
View File

@ -0,0 +1,7 @@
[DEFAULT]
# The list of modules to copy from openstack-common
modules=gettextutils,jsonutils,strutils,timeutils
# The base module to hold the copy of openstack.common
base=gbpclient

4
requirements.txt Normal file
View File

@ -0,0 +1,4 @@
# The order of packages is significant, because pip processes them in the order
# of appearance. Changing the order has an impact on the overall integration
# process, which may cause wedges in the gate later.
python-neutronclient>=2.3.6,<3

39
setup.cfg Normal file
View File

@ -0,0 +1,39 @@
[metadata]
name = python-group-based-policy-client
summary = CLI and Client Library for Group Based Policy
description-file =
README.rst
author = Group Based Policy
author-email = openstack-dev@lists.openstack.org
home-page = http://www.openstack.org/
classifier =
Environment :: OpenStack
Intended Audience :: Developers
Intended Audience :: Information Technology
Intended Audience :: System Administrators
License :: OSI Approved :: Apache Software License
Operating System :: POSIX :: Linux
Programming Language :: Python
Programming Language :: Python :: 2
Programming Language :: Python :: 2.7
Programming Language :: Python :: 2.6
[files]
packages =
gbpclient
[global]
setup-hooks =
pbr.hooks.setup_hook
[entry_points]
console_scripts =
gbp = gbpclient.shell:main
[build_sphinx]
all_files = 1
build-dir = doc/build
source-dir = doc/source
[wheel]
universal = 1

30
setup.py Normal file
View File

@ -0,0 +1,30 @@
#!/usr/bin/env python
# Copyright (c) 2013 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# THIS FILE IS MANAGED BY THE GLOBAL REQUIREMENTS REPO - DO NOT EDIT
import setuptools
# In python < 2.7.4, a lazy loading of package `pbr` will break
# setuptools if some other modules registered functions in `atexit`.
# solution from: http://bugs.python.org/issue15881#msg170215
try:
import multiprocessing # noqa
except ImportError:
pass
setuptools.setup(
setup_requires=['pbr'],
pbr=True)

17
test-requirements.txt Normal file
View File

@ -0,0 +1,17 @@
# The order of packages is significant, because pip processes them in the order
# of appearance. Changing the order has an impact on the overall integration
# process, which may cause wedges in the gate later.
hacking>=0.8.0,<0.9
cliff-tablib>=1.0
coverage>=3.6
discover
fixtures>=0.3.14
httpretty>=0.8.0,!=0.8.1,!=0.8.2,!=0.8.3
mox3>=0.7.0
oslosphinx>=2.2.0.0a2
oslotest>=1.1.0.0a2
python-subunit>=0.0.18
sphinx>=1.1.2,!=1.2.0,<1.3
testrepository>=0.0.18
testtools>=0.9.34

27
tools/gbp.bash_completion Normal file
View File

@ -0,0 +1,27 @@
_gbp_opts="" # lazy init
_gbp_flags="" # lazy init
_gbp_opts_exp="" # lazy init
_gbp()
{
local cur prev nbc cflags
COMPREPLY=()
cur="${COMP_WORDS[COMP_CWORD]}"
prev="${COMP_WORDS[COMP_CWORD-1]}"
if [ "x$_gbp_opts" == "x" ] ; then
nbc="`gbp bash-completion`"
_gbp_opts="`echo "$nbc" | sed -e "s/--[a-z0-9_-]*//g" -e "s/\s\s*/ /g"`"
_gbp_flags="`echo " $nbc" | sed -e "s/ [^-][^-][a-z0-9_-]*//g" -e "s/\s\s*/ /g"`"
_gbp_opts_exp="`echo "$_gbp_opts" | sed -e "s/\s/|/g"`"
fi
if [[ " ${COMP_WORDS[@]} " =~ " "($_gbp_opts_exp)" " && "$prev" != "help" ]] ; then
COMPLETION_CACHE=~/.gbpclient/*/*-cache
cflags="$_gbp_flags "$(cat $COMPLETION_CACHE 2> /dev/null | tr '\n' ' ')
COMPREPLY=($(compgen -W "${cflags}" -- ${cur}))
else
COMPREPLY=($(compgen -W "${_gbp_opts}" -- ${cur}))
fi
return 0
}
complete -F _gbp gbp

View File

@ -0,0 +1,27 @@
_policy_opts="" # lazy init
_policy_flags="" # lazy init
_policy_opts_exp="" # lazy init
_policy()
{
local cur prev nbc cflags
COMPREPLY=()
cur="${COMP_WORDS[COMP_CWORD]}"
prev="${COMP_WORDS[COMP_CWORD-1]}"
if [ "x$_policy_opts" == "x" ] ; then
nbc="`policy bash-completion`"
_policy_opts="`echo "$nbc" | sed -e "s/--[a-z0-9_-]*//g" -e "s/\s\s*/ /g"`"
_policy_flags="`echo " $nbc" | sed -e "s/ [^-][^-][a-z0-9_-]*//g" -e "s/\s\s*/ /g"`"
_policy_opts_exp="`echo "$_policy_opts" | sed -e "s/\s/|/g"`"
fi
if [[ " ${COMP_WORDS[@]} " =~ " "($_policy_opts_exp)" " && "$prev" != "help" ]] ; then
COMPLETION_CACHE=~/.policyclient/*/*-cache
cflags="$_policy_flags "$(cat $COMPLETION_CACHE 2> /dev/null | tr '\n' ' ')
COMPREPLY=($(compgen -W "${cflags}" -- ${cur}))
else
COMPREPLY=($(compgen -W "${_policy_opts}" -- ${cur}))
fi
return 0
}
complete -F _policy policy

39
tox.ini Normal file
View File

@ -0,0 +1,39 @@
[tox]
envlist = py26,py27,py33,pypy,pep8
minversion = 1.6
skipsdist = True
[testenv]
setenv = VIRTUAL_ENV={envdir}
LANG=en_US.UTF-8
LANGUAGE=en_US:en
LC_ALL=C
usedevelop = True
install_command = pip install -U {opts} {packages}
deps = -r{toxinidir}/requirements.txt
-r{toxinidir}/test-requirements.txt
commands = python setup.py testr --testr-args='{posargs}'
[testenv:pep8]
commands = flake8
distribute = false
[testenv:venv]
commands = {posargs}
[testenv:cover]
commands = python setup.py testr --coverage --testr-args='{posargs}'
[testenv:docs]
commands=
python setup.py build_sphinx
[tox:jenkins]
downloadcache = ~/cache/pip
[flake8]
# E125 continuation line does not distinguish itself from next logical line
# H302 import only modules
ignore = E125,H302
show-source = true
exclude=.venv,.git,.tox,dist,doc,*openstack/common*,*lib/python*,*egg,tools