Retire Packaging Deb project repos

This commit is part of a series to retire the Packaging Deb
project. Step 2 is to remove all content from the project
repos, replacing it with a README notification where to find
ongoing work, and how to recover the repo if needed at some
future point (as in
https://docs.openstack.org/infra/manual/drivers.html#retiring-a-project).

Change-Id: I8cda39ecb8de18b79be1ec51fc8c4629e2c45a3e
This commit is contained in:
Tony Breeds 2017-09-12 16:11:22 -06:00
parent 40ee44c664
commit ca35156eab
71 changed files with 14 additions and 4920 deletions

View File

@ -1,7 +0,0 @@
[run]
branch = True
source = seamicroclient
omit = seamicroclient/openstack/*
[report]
ignore-errors = True

18
.gitignore vendored
View File

@ -1,18 +0,0 @@
.coverage
.venv
.testrepository
subunit.log
.tox
*,cover
cover
*.pyc
.idea
*.sw?
*~
build
dist
AUTHORS
ChangeLog
seamicroclient/versioninfo
*.egg
*egg-info

View File

@ -1,4 +0,0 @@
[gerrit]
host=amd_dmz_gerrit_ip
port=29418
project=openstack/python-seamicroclient.git

View File

@ -1,4 +0,0 @@
[DEFAULT]
test_command=OS_STDOUT_CAPTURE=${OS_STDOUT_CAPTURE:-1} OS_STDERR_CAPTURE=${OS_STDERR_CAPTURE:-1} ${PYTHON:-python} -m subunit.run discover -t ./ ./ $LISTOPT $IDOPTION
test_id_option=--load-list $IDFILE
test_list_option=--list

View File

@ -1,59 +0,0 @@
Seamicro Client Style Commandments
==============================
- Step 1: Read the OpenStack Style Commandments
http://docs.openstack.org/developer/hacking
- Step 2: Read on
Seamicro Client Specific Commandments
---------------------------------
None so far
Text encoding
-------------
- All text within python code should be of type 'unicode'.
WRONG:
>>> s = 'foo'
>>> s
'foo'
>>> type(s)
<type 'str'>
RIGHT:
>>> u = u'foo'
>>> u
u'foo'
>>> type(u)
<type 'unicode'>
- Transitions between internal unicode and external strings should always
be immediately and explicitly encoded or decoded.
- All external text that is not explicitly encoded (database storage,
commandline arguments, etc.) should be presumed to be encoded as utf-8.
WRONG:
>>> mystring = infile.readline()
>>> myreturnstring = do_some_magic_with(mystring)
>>> outfile.write(myreturnstring)
RIGHT:
>>> mystring = infile.readline()
>>> mytext = s.decode('utf-8')
>>> returntext = do_some_magic_with(mytext)
>>> returnstring = returntext.encode('utf-8')
>>> outfile.write(returnstring)
Running Tests
-------------
The testing system is based on a combination of tox and testr. If you just
want to run the whole suite, run `tox` and all will be fine. However, if
you'd like to dig in a bit more, you might want to learn some things about
testr itself. A basic walkthrough for OpenStack can be found at
http://wiki.openstack.org/testr

203
LICENSE
View File

@ -1,203 +0,0 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
--- License for python-novaclient versions prior to 2.1 ---
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
1. Redistributions of source code must retain the above copyright notice,
this list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the distribution.
3. Neither the name of this project nor the names of its contributors may
be used to endorse or promote products derived from this software without
specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

View File

@ -1,5 +0,0 @@
include AUTHORS
include ChangeLog
exclude .gitignore
exclude .gitreview

14
README Normal file
View File

@ -0,0 +1,14 @@
This project is no longer maintained.
The contents of this repository are still available in the Git
source code management system. To see the contents of this
repository before it reached its end of life, please check out the
previous commit with "git checkout HEAD^1".
For ongoing work on maintaining OpenStack packages in the Debian
distribution, please see the Debian OpenStack packaging team at
https://wiki.debian.org/OpenStack/.
For any further questions, please email
openstack-dev@lists.openstack.org or join #openstack-dev on
Freenode.

View File

@ -1,2 +0,0 @@
Python client for consuming SeaMicro REST API v2.0

View File

@ -1,3 +0,0 @@
To run example script
PYTHONPATH=./../seamicroclient/ python ./server_example.py

View File

@ -1,11 +0,0 @@
import json
from seamicroclient.v2 import client
seamicro = client.Client("admin", "seamicro", "http://172.16.0.25/v2.0")
servers = seamicro.servers.list()
for server in servers:
print(json.dumps(server))
print("\n\n")

View File

@ -1,11 +0,0 @@
[DEFAULT]
# The list of modules to copy from openstack-common
module=install_venv_common
module=strutils
module=timeutils
module=uuidutils
module=py3kcompat
# The base module to hold the copy of openstack.common
base=seamicroclient

View File

@ -1,8 +0,0 @@
pbr>=0.5.21,<1.0
argparse
iso8601>=0.1.8
PrettyTable>=0.7,<0.8
requests>=1.1
simplejson>=2.0.9
six>=1.4.1
Babel>=1.3

View File

@ -1,164 +0,0 @@
#!/bin/bash
set -eu
function usage {
echo "Usage: $0 [OPTION]..."
echo "Run python-seamicroclient test suite"
echo ""
echo " -V, --virtual-env Always use virtualenv. Install automatically if not present"
echo " -N, --no-virtual-env Don't use virtualenv. Run tests in local environment"
echo " -s, --no-site-packages Isolate the virtualenv from the global Python environment"
echo " -x, --stop Stop running tests after the first error or failure."
echo " -f, --force Force a clean re-build of the virtual environment. Useful when dependencies have been added."
echo " -p, --pep8 Just run pep8"
echo " -P, --no-pep8 Don't run pep8"
echo " -c, --coverage Generate coverage report"
echo " -h, --help Print this usage message"
echo " --hide-elapsed Don't print the elapsed time for each test along with slow test list"
echo ""
echo "Note: with no options specified, the script will try to run the tests in a virtual environment,"
echo " If no virtualenv is found, the script will ask if you would like to create one. If you "
echo " prefer to run tests NOT in a virtual environment, simply pass the -N option."
exit
}
function process_option {
case "$1" in
-h|--help) usage;;
-V|--virtual-env) always_venv=1; never_venv=0;;
-N|--no-virtual-env) always_venv=0; never_venv=1;;
-s|--no-site-packages) no_site_packages=1;;
-f|--force) force=1;;
-p|--pep8) just_pep8=1;;
-P|--no-pep8) no_pep8=1;;
-c|--coverage) coverage=1;;
-*) testropts="$testropts $1";;
*) testrargs="$testrargs $1"
esac
}
venv=.venv
with_venv=tools/with_venv.sh
always_venv=0
never_venv=0
force=0
no_site_packages=0
installvenvopts=
testrargs=
testropts=
wrapper=""
just_pep8=0
no_pep8=0
coverage=0
LANG=en_US.UTF-8
LANGUAGE=en_US:en
LC_ALL=C
for arg in "$@"; do
process_option $arg
done
if [ $no_site_packages -eq 1 ]; then
installvenvopts="--no-site-packages"
fi
function init_testr {
if [ ! -d .testrepository ]; then
${wrapper} testr init
fi
}
function run_tests {
# Cleanup *pyc
${wrapper} find . -type f -name "*.pyc" -delete
if [ $coverage -eq 1 ]; then
# Do not test test_coverage_ext when gathering coverage.
if [ "x$testrargs" = "x" ]; then
testrargs="^(?!.*test_coverage_ext).*$"
fi
export PYTHON="${wrapper} coverage run --source seamicroclient --parallel-mode"
fi
# Just run the test suites in current environment
set +e
TESTRTESTS="$TESTRTESTS $testrargs"
echo "Running \`${wrapper} $TESTRTESTS\`"
${wrapper} $TESTRTESTS
RESULT=$?
set -e
copy_subunit_log
return $RESULT
}
function copy_subunit_log {
LOGNAME=`cat .testrepository/next-stream`
LOGNAME=$(($LOGNAME - 1))
LOGNAME=".testrepository/${LOGNAME}"
cp $LOGNAME subunit.log
}
function run_pep8 {
echo "Running flake8 ..."
${wrapper} flake8
}
TESTRTESTS="testr run --parallel $testropts"
if [ $never_venv -eq 0 ]
then
# Remove the virtual environment if --force used
if [ $force -eq 1 ]; then
echo "Cleaning virtualenv..."
rm -rf ${venv}
fi
if [ -e ${venv} ]; then
wrapper="${with_venv}"
else
if [ $always_venv -eq 1 ]; then
# Automatically install the virtualenv
python tools/install_venv.py $installvenvopts
wrapper="${with_venv}"
else
echo -e "No virtual environment found...create one? (Y/n) \c"
read use_ve
if [ "x$use_ve" = "xY" -o "x$use_ve" = "x" -o "x$use_ve" = "xy" ]; then
# Install the virtualenv and run the test suite in it
python tools/install_venv.py $installvenvopts
wrapper=${with_venv}
fi
fi
fi
fi
# Delete old coverage data from previous runs
if [ $coverage -eq 1 ]; then
${wrapper} coverage erase
fi
if [ $just_pep8 -eq 1 ]; then
run_pep8
exit
fi
init_testr
run_tests
# NOTE(sirp): we only want to run pep8 when we're running the full-test suite,
# not when we're running tests individually. To handle this, we need to
# distinguish between options (noseopts), which begin with a '-', and
# arguments (testrargs).
if [ -z "$testrargs" ]; then
if [ $no_pep8 -eq 0 ]; then
run_pep8
fi
fi
if [ $coverage -eq 1 ]; then
echo "Generating coverage report in covhtml/"
${wrapper} coverage combine
${wrapper} coverage html --include='seamicroclient/*' --omit='seamicroclient/openstack/common/*' -d covhtml -i
fi

View File

@ -1,18 +0,0 @@
# Copyright 2012 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import pbr.version
__version__ = pbr.version.VersionInfo('python-seamicroclient').version_string()

View File

@ -1,281 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Base utilities to build API operation managers and objects on top of.
"""
import abc
import time
import six
from seamicroclient import exceptions
from seamicroclient.openstack.common import strutils
from seamicroclient import utils
# Python 3 does not have a basestring. In that case, we use str.
if 'basestring' not in dir(__builtins__):
basestring = str
def getid(obj):
"""
Abstracts the common pattern of allowing both an object or an object's ID
as a parameter when dealing with relationships.
"""
try:
return obj.id
except AttributeError:
return obj
class Manager(utils.HookableMixin):
"""
Managers interact with a particular type of API (servers, storage
etc.) and provide CRUD operations for them.
"""
resource_class = None
def __init__(self, api):
self.api = api
def _list(self, url, body=None, filters=None):
if body:
_resp, body = self.api.client.post(url, body=body)
else:
_resp, body = self.api.client.get(url)
obj_class = self.resource_class
data = body
output = []
for k, v in data.items():
if data[k]:
if type(v) != dict:
output.append(obj_class(self, data, loaded=True))
break
v.update({'id': k})
output.append(obj_class(self, v, loaded=True))
filtered_output = set()
if filters is not None:
for k, v in filters.items():
for item in output:
if isinstance(v, basestring):
if v in getattr(item, k):
output.add(item)
else:
if item in output:
output.remove(item)
elif isinstance(v, int):
if v == getattr(item, k):
output.add(item)
else:
if item in output:
output.remove(item)
else:
continue
return filtered_output
return output
def _get(self, id, url):
_resp, body = self.api.client.get(url)
body.update({'id': id})
return self.resource_class(self, body)
def _create(self, url, body, return_raw=False, **kwargs):
self.run_hooks('modify_body_for_create', body, **kwargs)
_resp, body = self.api.client.post(url, body=body)
if isinstance(body, basestring):
return body.partition('/')[-1]
if return_raw:
return body
for k, v in body.items():
v.update({'id': k})
return self.resource_class(self, v)
def _delete(self, url):
_resp, _body = self.api.client.delete(url)
def _update(self, url, body, **kwargs):
self.run_hooks('modify_body_for_update', body, **kwargs)
_resp, body = self.api.client.put(url, body=body)
if body:
if isinstance(body, basestring):
return body.partition('/')[-1]
if body == kwargs.get('action'):
return
for k, v in body.items():
v.update({'id': k})
return self.resource_class(self, v)
@six.add_metaclass(abc.ABCMeta)
class ManagerWithFind(Manager):
"""
Like a `Manager`, but with additional `find()`/`findall()` methods.
"""
@abc.abstractmethod
def list(self):
pass
def find(self, **kwargs):
"""
Find a single item with attributes matching ``**kwargs``.
This isn't very efficient: it loads the entire list then filters on
the Python side.
"""
matches = self.findall(**kwargs)
num_matches = len(matches)
if num_matches == 0:
msg = "No %s matching %s." % (self.resource_class.__name__, kwargs)
raise exceptions.NotFound(404, msg)
elif num_matches > 1:
raise exceptions.NoUniqueMatch
else:
return matches[0]
def findall(self, **kwargs):
"""
Find all items with attributes matching ``**kwargs``.
To find volume with size less than equal to 500 GB and id contains
'ironic'
kwargs = {'freeSize_le': 500, 'id_has': 'ironic', "UsedSize": 300}
Operator:
no operator required for "equal to" checks
_le: less than equal to
_ge: greater than equal to
_has: contains string
This isn't very efficient: it loads the entire list then filters on
the Python side.
"""
found = []
searches = kwargs.items()
listing = self.list()
for obj in listing:
try:
for attr, value in searches:
if attr.endswith('_eq'):
if getattr(obj, attr) == value:
found.append(obj)
elif attr.endswith('_le'):
if getattr(obj, attr) <= value:
found.append(obj)
elif attr.endswith('_ge'):
if getattr(obj, attr) >= value:
found.append(obj)
elif attr.endswith('_has'):
if value in getattr(obj, attr):
found.append(obj)
else:
if getattr(obj, attr) == value:
found.append(obj)
except AttributeError:
continue
return found
class Resource(object):
"""
A resource represents a particular instance of an object (server, flavor,
etc). This is pretty much just a bag for attributes.
:param manager: Manager object
:param info: dictionary representing resource attributes
:param loaded: prevent lazy-loading if set to True
"""
HUMAN_ID = False
NAME_ATTR = 'id'
def __init__(self, manager, info, loaded=False):
self.manager = manager
self._info = info
self._add_details(info)
self._loaded = loaded
@property
def human_id(self):
"""Subclasses may override this provide a pretty ID which can be used
for bash completion.
"""
if self.NAME_ATTR in self.__dict__ and self.HUMAN_ID:
return strutils.to_slug(getattr(self, self.NAME_ATTR))
return None
def _add_details(self, info):
for (k, v) in six.iteritems(info):
try:
setattr(self, k, v)
self._info[k] = v
except AttributeError:
# In this case we already defined the attribute on the class
pass
def __getattr__(self, k):
if k not in self.__dict__:
# NOTE(rk): disallow lazy-loading if already loaded once
if not self.is_loaded():
self.get()
return self.__getattr__(k)
raise AttributeError(k)
else:
return self.__dict__[k]
def __repr__(self):
reprkeys = sorted(k for k in self.__dict__.keys() if k[0] != '_' and
k != 'manager')
info = ", ".join("%s=%s" % (k, getattr(self, k)) for k in reprkeys)
return "<%s %s>" % (self.__class__.__name__, info)
def get(self):
# set_loaded() first ... so if we have to bail, we know we tried.
self.set_loaded(True)
if not hasattr(self.manager, 'get'):
return
new = self.manager.get(self.id)
if new:
self._add_details(new._info)
def __eq__(self, other):
if not isinstance(other, self.__class__):
return False
if hasattr(self, 'id') and hasattr(other, 'id'):
return self.id == other.id
return self._info == other._info
def is_loaded(self):
return self._loaded
def set_loaded(self, val):
self._loaded = val
def refresh(self, sleep=None):
if sleep:
time.sleep(sleep)
return self.manager.get(self.id)

View File

@ -1,201 +0,0 @@
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Seamicro Client interface. Handles the REST calls and responses.
"""
import logging
import time
import requests
try:
import json
except ImportError:
import simplejson as json
from seamicroclient import exceptions
from seamicroclient import utils
class HTTPClient(object):
USER_AGENT = 'python-seamicroclient'
def __init__(self, user, password, auth_url=None,
timeout=None, http_log_debug=False, retries=3):
self.user = user
self.password = password
self.api_endpoint = auth_url
self.auth_url = auth_url.rstrip('/')
self.version = 'v2.0'
self.http_log_debug = http_log_debug
if timeout is not None:
self.timeout = float(timeout)
else:
self.timeout = None
self.retries = int(retries)
self.times = [] # [("item", starttime, endtime), ...]
self._logger = logging.getLogger(__name__)
if self.http_log_debug and not self._logger.handlers:
# Logging level is already set on the root logger
ch = logging.StreamHandler()
self._logger.addHandler(ch)
self._logger.propagate = False
if hasattr(requests, 'logging'):
rql = requests.logging.getLogger(requests.__name__)
rql.addHandler(ch)
# Since we have already setup the root logger on debug, we
# have to set it up here on WARNING (its original level)
# otherwise we will get all the requests logging messanges
rql.setLevel(logging.WARNING)
def get_timings(self):
return self.times
def reset_timings(self):
self.times = []
def http_log_req(self, method, url, kwargs):
if not self.http_log_debug:
return
string_parts = ['curl -i']
string_parts.append(" '%s'" % url)
string_parts.append(' -X %s' % method)
for element in kwargs['headers']:
header = ' -H "%s: %s"' % (element, kwargs['headers'][element])
string_parts.append(header)
if 'data' in kwargs:
string_parts.append(" -d '%s'" % (kwargs['data']))
self._logger.debug("\nREQ: %s\n" % "".join(string_parts))
def http_log_resp(self, resp):
if not self.http_log_debug:
return
self._logger.debug(
"RESP: [%s] %s\nRESP BODY: %s\n",
resp.status_code,
resp.headers,
resp.text)
def request(self, url, method, **kwargs):
kwargs.setdefault('headers', kwargs.get('headers', {}))
kwargs['headers']['User-Agent'] = self.USER_AGENT
kwargs['headers']['Accept'] = 'application/json'
if 'body' in kwargs:
kwargs['headers']['Content-Type'] = 'application/json'
kwargs['data'] = json.dumps(kwargs['body'])
del kwargs['body']
if self.timeout is not None:
kwargs.setdefault('timeout', self.timeout)
self.http_log_req(method, url, kwargs)
resp = requests.request(
method,
url,
**kwargs)
self.http_log_resp(resp)
if resp.text:
# httplib2 returns a connection refused event as a 400 response.
# To determine if it is a bad request or refused connection we need
# to check the body. httplib2 tests check for 'Connection refused'
# or 'actively refused' in the body, so that's what we'll do.
if resp.status_code == 400:
if ('Connection refused' in resp.text or
'actively refused' in resp.text):
raise exceptions.ConnectionRefused(resp.text)
try:
body = json.loads(resp.text)
except ValueError:
body = resp.text
else:
body = None
if resp.status_code >= 400:
raise exceptions.from_response(resp, body, url, method)
return resp, body
def _time_request(self, url, method, **kwargs):
start_time = time.time()
resp, body = self.request(url, method, **kwargs)
self.times.append(("%s %s" % (method, url),
start_time, time.time()))
return resp, body
def _cs_request(self, url, method, **kwargs):
attempts = 0
retry_delay = 5
while True:
attempts += 1
if method in ['GET', 'DELETE']:
url = "%s?username=%s&password=%s" % (url, self.user,
self.password)
else:
kwargs.setdefault('body', {}).update({'username': self.user,
'password': self.password})
try:
resp, body = self._time_request(self.api_endpoint + url,
method, **kwargs)
return resp, body
except requests.exceptions.ConnectionError as e:
if attempts > self.retries:
raise
# Catch a connection refused from requests.request
# retry again with some time delay
self._logger.debug("Connection refused: %s" % e)
self._logger.debug("Failed attempt(%s of %s), "
"retrying in %s seconds" %
(attempts, self.retries,
retry_delay))
time.sleep(retry_delay)
def get(self, url, **kwargs):
return self._cs_request(url, 'GET', **kwargs)
def post(self, url, **kwargs):
return self._cs_request(url, 'POST', **kwargs)
def put(self, url, **kwargs):
return self._cs_request(url, 'PUT', **kwargs)
def delete(self, url, **kwargs):
return self._cs_request(url, 'DELETE', **kwargs)
def get_client_class(version):
version_map = {
'2': 'seamicroclient.v2.client.Client',
}
try:
client_path = version_map[str(version)]
except (KeyError, ValueError):
msg = "Invalid client version '%s'. must be one of: %s" % (
(version, ', '.join(version_map.keys())))
raise exceptions.UnsupportedVersion(msg)
return utils.import_class(client_path)
def Client(version, *args, **kwargs):
client_class = get_client_class(version)
return client_class(*args, **kwargs)

View File

@ -1,243 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Exception definitions.
"""
class UnsupportedVersion(Exception):
"""Indicates that the user is trying to use an unsupported
version of the API.
"""
pass
class CommandError(Exception):
pass
class AuthorizationFailure(Exception):
pass
class NoUniqueMatch(Exception):
pass
class AuthSystemNotFound(Exception):
"""When the user specify a AuthSystem but not installed."""
def __init__(self, auth_system):
self.auth_system = auth_system
def __str__(self):
return "AuthSystemNotFound: %s" % repr(self.auth_system)
class NoTokenLookupException(Exception):
"""This form of authentication does not support looking up
endpoints from an existing token.
"""
pass
class EndpointNotFound(Exception):
"""Could not find Service or Region in Service Catalog."""
pass
class AmbiguousEndpoints(Exception):
"""Found more than one matching endpoint in Service Catalog."""
def __init__(self, endpoints=None):
self.endpoints = endpoints
def __str__(self):
return "AmbiguousEndpoints: %s" % repr(self.endpoints)
class ConnectionRefused(Exception):
"""
Connection refused: the server refused the connection.
"""
def __init__(self, response=None):
self.response = response
def __str__(self):
return "ConnectionRefused: %s" % repr(self.response)
class ClientException(Exception):
"""
The base exception class for all exceptions this library raises.
"""
def __init__(self, code, message=None, details=None, request_id=None,
url=None, method=None):
self.code = code
self.message = message or self.__class__.message
self.details = details
self.request_id = request_id
self.url = url
self.method = method
def __str__(self):
formatted_string = "%s (HTTP %s)" % (self.message, self.code)
if self.request_id:
formatted_string += " (Request-ID: %s)" % self.request_id
return formatted_string
class BadRequest(ClientException):
"""
HTTP 400 - Bad request: you sent some malformed data.
"""
http_status = 400
message = "Bad request"
class Unauthorized(ClientException):
"""
HTTP 401 - Unauthorized: bad credentials.
"""
http_status = 401
message = "Unauthorized"
class Forbidden(ClientException):
"""
HTTP 403 - Forbidden: your credentials don't give you access to this
resource.
"""
http_status = 403
message = "Forbidden"
class NotFound(ClientException):
"""
HTTP 404 - Not found
"""
http_status = 404
message = "Not found"
class MethodNotAllowed(ClientException):
"""
HTTP 405 - Method Not Allowed
"""
http_status = 405
message = "Method Not Allowed"
class Conflict(ClientException):
"""
HTTP 409 - Conflict
"""
http_status = 409
message = "Conflict"
class OverLimit(ClientException):
"""
HTTP 413 - Over limit: you're over the API limits for this time period.
"""
http_status = 413
message = "Over limit"
def __init__(self, *args, **kwargs):
try:
self.retry_after = int(kwargs.pop('retry_after'))
except (KeyError, ValueError):
self.retry_after = 0
super(OverLimit, self).__init__(*args, **kwargs)
class RateLimit(OverLimit):
"""
HTTP 429 - Rate limit: you've sent too many requests for this time period.
"""
http_status = 429
message = "Rate limit"
# NotImplemented is a python keyword.
class HTTPNotImplemented(ClientException):
"""
HTTP 501 - Not Implemented: the server does not support this operation.
"""
http_status = 501
message = "Not Implemented"
# In Python 2.4 Exception is old-style and thus doesn't have a __subclasses__()
# so we can do this:
# _code_map = dict((c.http_status, c)
# for c in ClientException.__subclasses__())
#
# Instead, we have to hardcode it:
_error_classes = [BadRequest, Unauthorized, Forbidden, NotFound,
MethodNotAllowed, Conflict, OverLimit, RateLimit,
HTTPNotImplemented]
_code_map = dict((c.http_status, c) for c in _error_classes)
def from_response(response, body, url, method=None):
"""
Return an instance of an ClientException or subclass
based on an requests response.
Usage::
resp, body = requests.request(...)
if resp.status_code != 200:
raise exception_from_response(resp, rest.text)
"""
kwargs = {
'code': response.status_code,
'method': method,
'url': url,
}
if body:
message = "n/a"
details = "n/a"
if hasattr(body, 'keys'):
error = body[list(body)[1]]
message = error.get('message', None)
details = error.get('details', None)
kwargs['message'] = message
kwargs['details'] = details
cls = _code_map.get(response.status_code, ClientException)
return cls(**kwargs)

View File

@ -1,328 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012 Red Hat, Inc.
# Copyright 2013 IBM Corp.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
gettext for openstack-common modules.
Usual usage in an openstack.common module:
from seamicroclient.openstack.common.gettextutils import _
"""
import copy
import gettext
import logging
import os
import re
try:
import UserString as _userString
except ImportError:
import collections as _userString
from babel import localedata
import six
_localedir = os.environ.get('seamicroclient'.upper() + '_LOCALEDIR')
_t = gettext.translation('seamicroclient', localedir=_localedir, fallback=True)
_AVAILABLE_LANGUAGES = {}
USE_LAZY = False
def enable_lazy():
"""Convenience function for configuring _() to use lazy gettext
Call this at the start of execution to enable the gettextutils._
function to use lazy gettext functionality. This is useful if
your project is importing _ directly instead of using the
gettextutils.install() way of importing the _ function.
"""
global USE_LAZY
USE_LAZY = True
def _(msg):
if USE_LAZY:
return Message(msg, 'seamicroclient')
else:
return _t.ugettext(msg)
def install(domain, lazy=False):
"""Install a _() function using the given translation domain.
Given a translation domain, install a _() function using gettext's
install() function.
The main difference from gettext.install() is that we allow
overriding the default localedir (e.g. /usr/share/locale) using
a translation-domain-specific environment variable (e.g.
NOVA_LOCALEDIR).
:param domain: the translation domain
:param lazy: indicates whether or not to install the lazy _() function.
The lazy _() introduces a way to do deferred translation
of messages by installing a _ that builds Message objects,
instead of strings, which can then be lazily translated into
any available locale.
"""
if lazy:
# NOTE(mrodden): Lazy gettext functionality.
#
# The following introduces a deferred way to do translations on
# messages in OpenStack. We override the standard _() function
# and % (format string) operation to build Message objects that can
# later be translated when we have more information.
#
# Also included below is an example LocaleHandler that translates
# Messages to an associated locale, effectively allowing many logs,
# each with their own locale.
def _lazy_gettext(msg):
"""Create and return a Message object.
Lazy gettext function for a given domain, it is a factory method
for a project/module to get a lazy gettext function for its own
translation domain (i.e. seamicro etc.)
Message encapsulates a string so that we can translate
it later when needed.
"""
return Message(msg, domain)
import __builtin__
__builtin__.__dict__['_'] = _lazy_gettext
else:
localedir = '%s_LOCALEDIR' % domain.upper()
gettext.install(domain,
localedir=os.environ.get(localedir),
unicode=True)
class Message(_userString.UserString, object):
"""Class used to encapsulate translatable messages."""
def __init__(self, msg, domain):
# _msg is the gettext msgid and should never change
self._msg = msg
self._left_extra_msg = ''
self._right_extra_msg = ''
self.params = None
self.locale = None
self.domain = domain
@property
def data(self):
# NOTE(mrodden): this should always resolve to a unicode string
# that best represents the state of the message currently
localedir = os.environ.get(self.domain.upper() + '_LOCALEDIR')
if self.locale:
lang = gettext.translation(self.domain,
localedir=localedir,
languages=[self.locale],
fallback=True)
else:
# use system locale for translations
lang = gettext.translation(self.domain,
localedir=localedir,
fallback=True)
full_msg = (self._left_extra_msg +
lang.ugettext(self._msg) +
self._right_extra_msg)
if self.params is not None:
full_msg = full_msg % self.params
return six.text_type(full_msg)
def _save_dictionary_parameter(self, dict_param):
full_msg = self.data
# look for %(blah) fields in string;
# ignore %% and deal with the
# case where % is first character on the line
keys = re.findall('(?:[^%]|^)?%\((\w*)\)[a-z]', full_msg)
# if we don't find any %(blah) blocks but have a %s
if not keys and re.findall('(?:[^%]|^)%[a-z]', full_msg):
# apparently the full dictionary is the parameter
params = copy.deepcopy(dict_param)
else:
params = {}
for key in keys:
try:
params[key] = copy.deepcopy(dict_param[key])
except TypeError:
# cast uncopyable thing to unicode string
params[key] = unicode(dict_param[key])
return params
def _save_parameters(self, other):
# we check for None later to see if
# we actually have parameters to inject,
# so encapsulate if our parameter is actually None
if other is None:
self.params = (other, )
elif isinstance(other, dict):
self.params = self._save_dictionary_parameter(other)
else:
# fallback to casting to unicode,
# this will handle the problematic python code-like
# objects that cannot be deep-copied
try:
self.params = copy.deepcopy(other)
except TypeError:
self.params = unicode(other)
return self
# overrides to be more string-like
def __unicode__(self):
return self.data
def __str__(self):
return self.data.encode('utf-8')
def __getstate__(self):
to_copy = ['_msg', '_right_extra_msg', '_left_extra_msg',
'domain', 'params', 'locale']
new_dict = self.__dict__.fromkeys(to_copy)
for attr in to_copy:
new_dict[attr] = copy.deepcopy(self.__dict__[attr])
return new_dict
def __setstate__(self, state):
for (k, v) in state.items():
setattr(self, k, v)
# operator overloads
def __add__(self, other):
copied = copy.deepcopy(self)
copied._right_extra_msg += other.__str__()
return copied
def __radd__(self, other):
copied = copy.deepcopy(self)
copied._left_extra_msg += other.__str__()
return copied
def __mod__(self, other):
# do a format string to catch and raise
# any possible KeyErrors from missing parameters
self.data % other
copied = copy.deepcopy(self)
return copied._save_parameters(other)
def __mul__(self, other):
return self.data * other
def __rmul__(self, other):
return other * self.data
def __getitem__(self, key):
return self.data[key]
def __getslice__(self, start, end):
return self.data.__getslice__(start, end)
def __getattribute__(self, name):
# NOTE(mrodden): handle lossy operations that we can't deal with yet
# These override the UserString implementation, since UserString
# uses our __class__ attribute to try and build a new message
# after running the inner data string through the operation.
# At that point, we have lost the gettext message id and can just
# safely resolve to a string instead.
ops = ['capitalize', 'center', 'decode', 'encode',
'expandtabs', 'ljust', 'lstrip', 'replace', 'rjust', 'rstrip',
'strip', 'swapcase', 'title', 'translate', 'upper', 'zfill']
if name in ops:
return getattr(self.data, name)
else:
return _userString.UserString.__getattribute__(self, name)
def get_available_languages(domain):
"""Lists the available languages for the given translation domain.
:param domain: the domain to get languages for
"""
if domain in _AVAILABLE_LANGUAGES:
return copy.copy(_AVAILABLE_LANGUAGES[domain])
localedir = '%s_LOCALEDIR' % domain.upper()
find = lambda x: gettext.find(domain,
localedir=os.environ.get(localedir),
languages=[x])
# NOTE(mrodden): en_US should always be available (and first in case
# order matters) since our in-line message strings are en_US
language_list = ['en_US']
# NOTE(luisg): Babel <1.0 used a function called list(), which was
# renamed to locale_identifiers() in >=1.0, the requirements master list
# requires >=0.9.6, uncapped, so defensively work with both. We can remove
# this check when the master list updates to >=1.0, and all projects udpate
list_identifiers = (getattr(localedata, 'list', None) or
getattr(localedata, 'locale_identifiers'))
locale_identifiers = list_identifiers()
for i in locale_identifiers:
if find(i) is not None:
language_list.append(i)
_AVAILABLE_LANGUAGES[domain] = language_list
return copy.copy(language_list)
def get_localized_message(message, user_locale):
"""Gets a localized version of the given message in the given locale."""
if isinstance(message, Message):
if user_locale:
message.locale = user_locale
return unicode(message)
else:
return message
class LocaleHandler(logging.Handler):
"""Handler that can have a locale associated to translate Messages.
A quick example of how to utilize the Message class above.
LocaleHandler takes a locale and a target logging.Handler object
to forward LogRecord objects to after translating the internal Message.
"""
def __init__(self, locale, target):
"""Initialize a LocaleHandler
:param locale: locale to use for translating messages
:param target: logging.Handler object to forward
LogRecord objects to after translation
"""
logging.Handler.__init__(self)
self.locale = locale
self.target = target
def emit(self, record):
if isinstance(record.msg, Message):
# set the locale and resolve to a string
record.msg.locale = self.locale
self.target.emit(record)

View File

@ -1,17 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright 2013 Canonical Ltd.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#

View File

@ -1,49 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright 2013 Canonical Ltd.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
"""
Python2/Python3 compatibility layer for OpenStack
"""
import six
if six.PY3:
# python3
import urllib.parse
urlencode = urllib.parse.urlencode
urljoin = urllib.parse.urljoin
quote = urllib.parse.quote
parse_qsl = urllib.parse.parse_qsl
urlparse = urllib.parse.urlparse
urlsplit = urllib.parse.urlsplit
urlunsplit = urllib.parse.urlunsplit
else:
# python2
import urllib
import urlparse
urlencode = urllib.urlencode
quote = urllib.quote
parse = urlparse
parse_qsl = parse.parse_qsl
urljoin = parse.urljoin
urlparse = parse.urlparse
urlsplit = parse.urlsplit
urlunsplit = parse.urlunsplit

View File

@ -1,218 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
System-level utilities and helper functions.
"""
import re
import sys
import unicodedata
import six
from seamicroclient.openstack.common.gettextutils import _ # noqa
# Used for looking up extensions of text
# to their 'multiplied' byte amount
BYTE_MULTIPLIERS = {
'': 1,
't': 1024 ** 4,
'g': 1024 ** 3,
'm': 1024 ** 2,
'k': 1024,
}
BYTE_REGEX = re.compile(r'(^-?\d+)(\D*)')
TRUE_STRINGS = ('1', 't', 'true', 'on', 'y', 'yes')
FALSE_STRINGS = ('0', 'f', 'false', 'off', 'n', 'no')
SLUGIFY_STRIP_RE = re.compile(r"[^\w\s-]")
SLUGIFY_HYPHENATE_RE = re.compile(r"[-\s]+")
def int_from_bool_as_string(subject):
"""Interpret a string as a boolean and return either 1 or 0.
Any string value in:
('True', 'true', 'On', 'on', '1')
is interpreted as a boolean True.
Useful for JSON-decoded stuff and config file parsing
"""
return bool_from_string(subject) and 1 or 0
def bool_from_string(subject, strict=False):
"""Interpret a string as a boolean.
A case-insensitive match is performed such that strings matching 't',
'true', 'on', 'y', 'yes', or '1' are considered True and, when
`strict=False`, anything else is considered False.
Useful for JSON-decoded stuff and config file parsing.
If `strict=True`, unrecognized values, including None, will raise a
ValueError which is useful when parsing values passed in from an API call.
Strings yielding False are 'f', 'false', 'off', 'n', 'no', or '0'.
"""
if not isinstance(subject, six.string_types):
subject = str(subject)
lowered = subject.strip().lower()
if lowered in TRUE_STRINGS:
return True
elif lowered in FALSE_STRINGS:
return False
elif strict:
acceptable = ', '.join(
"'%s'" % s for s in sorted(TRUE_STRINGS + FALSE_STRINGS))
msg = _("Unrecognized value '%(val)s', acceptable values are:"
" %(acceptable)s") % {'val': subject,
'acceptable': acceptable}
raise ValueError(msg)
else:
return False
def safe_decode(text, incoming=None, errors='strict'):
"""Decodes incoming str using `incoming` if they're not already unicode.
:param incoming: Text's current encoding
:param errors: Errors handling policy. See here for valid
values http://docs.python.org/2/library/codecs.html
:returns: text or a unicode `incoming` encoded
representation of it.
:raises TypeError: If text is not an isntance of str
"""
if not isinstance(text, six.string_types):
raise TypeError("%s can't be decoded" % type(text))
if isinstance(text, six.text_type):
return text
if not incoming:
incoming = (sys.stdin.encoding or
sys.getdefaultencoding())
try:
return text.decode(incoming, errors)
except UnicodeDecodeError:
# Note(flaper87) If we get here, it means that
# sys.stdin.encoding / sys.getdefaultencoding
# didn't return a suitable encoding to decode
# text. This happens mostly when global LANG
# var is not set correctly and there's no
# default encoding. In this case, most likely
# python will use ASCII or ANSI encoders as
# default encodings but they won't be capable
# of decoding non-ASCII characters.
#
# Also, UTF-8 is being used since it's an ASCII
# extension.
return text.decode('utf-8', errors)
def safe_encode(text, incoming=None,
encoding='utf-8', errors='strict'):
"""Encodes incoming str/unicode using `encoding`.
If incoming is not specified, text is expected to be encoded with
current python's default encoding. (`sys.getdefaultencoding`)
:param incoming: Text's current encoding
:param encoding: Expected encoding for text (Default UTF-8)
:param errors: Errors handling policy. See here for valid
values http://docs.python.org/2/library/codecs.html
:returns: text or a bytestring `encoding` encoded
representation of it.
:raises TypeError: If text is not an isntance of str
"""
if not isinstance(text, six.string_types):
raise TypeError("%s can't be encoded" % type(text))
if not incoming:
incoming = (sys.stdin.encoding or
sys.getdefaultencoding())
if isinstance(text, six.text_type):
return text.encode(encoding, errors)
elif text and encoding != incoming:
# Decode text before encoding it with `encoding`
text = safe_decode(text, incoming, errors)
return text.encode(encoding, errors)
return text
def to_bytes(text, default=0):
"""Converts a string into an integer of bytes.
Looks at the last characters of the text to determine
what conversion is needed to turn the input text into a byte number.
Supports "B, K(B), M(B), G(B), and T(B)". (case insensitive)
:param text: String input for bytes size conversion.
:param default: Default return value when text is blank.
"""
match = BYTE_REGEX.search(text)
if match:
magnitude = int(match.group(1))
mult_key_org = match.group(2)
if not mult_key_org:
return magnitude
elif text:
msg = _('Invalid string format: %s') % text
raise TypeError(msg)
else:
return default
mult_key = mult_key_org.lower().replace('b', '', 1)
multiplier = BYTE_MULTIPLIERS.get(mult_key)
if multiplier is None:
msg = _('Unknown byte multiplier: %s') % mult_key_org
raise TypeError(msg)
return magnitude * multiplier
def to_slug(value, incoming=None, errors="strict"):
"""Normalize string.
Convert to lowercase, remove non-word characters, and convert spaces
to hyphens.
Inspired by Django's `slugify` filter.
:param value: Text to slugify
:param incoming: Text's current encoding
:param errors: Errors handling policy. See here for valid
values http://docs.python.org/2/library/codecs.html
:returns: slugified unicode representation of `value`
:raises TypeError: If text is not an instance of str
"""
value = safe_decode(value, incoming, errors)
# NOTE(aababilov): no need to use safe_(encode|decode) here:
# encodings are always "ascii", error handling is always "ignore"
# and types are always known (first: unicode; second: str)
value = unicodedata.normalize("NFKD", value).encode(
"ascii", "ignore").decode("ascii")
value = SLUGIFY_STRIP_RE.sub("", value).strip().lower()
return SLUGIFY_HYPHENATE_RE.sub("-", value)

View File

@ -1,194 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Time related utilities and helper functions.
"""
import calendar
import datetime
import time
import iso8601
import six
# ISO 8601 extended time format with microseconds
_ISO8601_TIME_FORMAT_SUBSECOND = '%Y-%m-%dT%H:%M:%S.%f'
_ISO8601_TIME_FORMAT = '%Y-%m-%dT%H:%M:%S'
PERFECT_TIME_FORMAT = _ISO8601_TIME_FORMAT_SUBSECOND
def isotime(at=None, subsecond=False):
"""Stringify time in ISO 8601 format."""
if not at:
at = utcnow()
st = at.strftime(_ISO8601_TIME_FORMAT
if not subsecond
else _ISO8601_TIME_FORMAT_SUBSECOND)
tz = at.tzinfo.tzname(None) if at.tzinfo else 'UTC'
st += ('Z' if tz == 'UTC' else tz)
return st
def parse_isotime(timestr):
"""Parse time from ISO 8601 format."""
try:
return iso8601.parse_date(timestr)
except iso8601.ParseError as e:
raise ValueError(unicode(e))
except TypeError as e:
raise ValueError(unicode(e))
def strtime(at=None, fmt=PERFECT_TIME_FORMAT):
"""Returns formatted utcnow."""
if not at:
at = utcnow()
return at.strftime(fmt)
def parse_strtime(timestr, fmt=PERFECT_TIME_FORMAT):
"""Turn a formatted time back into a datetime."""
return datetime.datetime.strptime(timestr, fmt)
def normalize_time(timestamp):
"""Normalize time in arbitrary timezone to UTC naive object."""
offset = timestamp.utcoffset()
if offset is None:
return timestamp
return timestamp.replace(tzinfo=None) - offset
def is_older_than(before, seconds):
"""Return True if before is older than seconds."""
if isinstance(before, six.string_types):
before = parse_strtime(before).replace(tzinfo=None)
return utcnow() - before > datetime.timedelta(seconds=seconds)
def is_newer_than(after, seconds):
"""Return True if after is newer than seconds."""
if isinstance(after, six.string_types):
after = parse_strtime(after).replace(tzinfo=None)
return after - utcnow() > datetime.timedelta(seconds=seconds)
def utcnow_ts():
"""Timestamp version of our utcnow function."""
if utcnow.override_time is None:
# NOTE(kgriffs): This is several times faster
# than going through calendar.timegm(...)
return int(time.time())
return calendar.timegm(utcnow().timetuple())
def utcnow():
"""Overridable version of utils.utcnow."""
if utcnow.override_time:
try:
return utcnow.override_time.pop(0)
except AttributeError:
return utcnow.override_time
return datetime.datetime.utcnow()
def iso8601_from_timestamp(timestamp):
"""Returns a iso8601 formated date from timestamp."""
return isotime(datetime.datetime.utcfromtimestamp(timestamp))
utcnow.override_time = None
def set_time_override(override_time=datetime.datetime.utcnow()):
"""Overrides utils.utcnow.
Make it return a constant time or a list thereof, one at a time.
"""
utcnow.override_time = override_time
def advance_time_delta(timedelta):
"""Advance overridden time using a datetime.timedelta."""
assert(not utcnow.override_time is None)
try:
for dt in utcnow.override_time:
dt += timedelta
except TypeError:
utcnow.override_time += timedelta
def advance_time_seconds(seconds):
"""Advance overridden time by seconds."""
advance_time_delta(datetime.timedelta(0, seconds))
def clear_time_override():
"""Remove the overridden time."""
utcnow.override_time = None
def marshall_now(now=None):
"""Make an rpc-safe datetime with microseconds.
Note: tzinfo is stripped, but not required for relative times.
"""
if not now:
now = utcnow()
return dict(day=now.day, month=now.month, year=now.year, hour=now.hour,
minute=now.minute, second=now.second,
microsecond=now.microsecond)
def unmarshall_time(tyme):
"""Unmarshall a datetime dict."""
return datetime.datetime(day=tyme['day'],
month=tyme['month'],
year=tyme['year'],
hour=tyme['hour'],
minute=tyme['minute'],
second=tyme['second'],
microsecond=tyme['microsecond'])
def delta_seconds(before, after):
"""Return the difference between two timing objects.
Compute the difference in seconds between two date, time, or
datetime objects (as a float, to microsecond resolution).
"""
delta = after - before
try:
return delta.total_seconds()
except AttributeError:
return ((delta.days * 24 * 3600) + delta.seconds +
float(delta.microseconds) / (10 ** 6))
def is_soon(dt, window):
"""Determines if time is going to happen in the next window seconds.
:params dt: the time
:params window: minimum seconds to remain to consider the time not soon
:return: True if expiration is within the given duration
"""
soon = (utcnow() + datetime.timedelta(seconds=window))
return normalize_time(dt) <= soon

View File

@ -1,39 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2012 Intel Corporation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
UUID related utilities and helper functions.
"""
import uuid
def generate_uuid():
return str(uuid.uuid4())
def is_uuid_like(val):
"""Returns validation of a value as a UUID.
For our purposes, a UUID is a canonical form string:
aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa
"""
try:
return str(uuid.UUID(val)) == val
except (TypeError, ValueError, AttributeError):
return False

View File

@ -1,93 +0,0 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
A fake server that "responds" to API methods with pre-canned responses.
All of these responses come from the spec, so if for some reason the spec's
wrong the tests might raise AssertionError. I've indicated in comments the
places where actual behavior differs from the spec.
"""
from seamicroclient import base
def assert_has_keys(dict, required=[], optional=[]):
keys = dict.keys()
for k in required:
try:
assert k in keys
except AssertionError:
extra_keys = set(keys).difference(set(required + optional))
raise AssertionError("found unexpected keys: %s" %
list(extra_keys))
class FakeClient(object):
def assert_called(self, method, url, body=None, pos=-1):
"""
Assert than an API method was just called.
"""
expected = (method, url)
called = self.client.callstack[pos][0:2]
assert self.client.callstack, \
"Expected %s %s but no calls were made." % expected
assert expected == called, 'Expected %s %s; got %s %s' % \
(expected + called)
if body is not None:
if self.client.callstack[pos][2] != body:
raise AssertionError('%r != %r' %
(self.client.callstack[pos][2], body))
def assert_called_anytime(self, method, url, body=None):
"""
Assert than an API method was called anytime in the test.
"""
expected = (method, url)
assert self.client.callstack, \
"Expected %s %s but no calls were made." % expected
found = False
for entry in self.client.callstack:
if expected == entry[0:2]:
found = True
break
assert found, 'Expected %s; got %s' % \
(expected, self.client.callstack)
if body is not None:
try:
assert entry[2] == body
except AssertionError:
print(entry[2])
print("!=")
print(body)
raise
self.client.callstack = []
def clear_callstack(self):
self.client.callstack = []
def authenticate(self):
pass
# Fake class that will be used as an extension
class FakeManager(base.Manager):
pass

View File

@ -1,44 +0,0 @@
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import uuid
from seamicroclient.tests import utils
from seamicroclient.v2 import Client
cs = Client("admin", "seamicro", "http://chassis/v2.0")
class PoolsTest(utils.TestCase):
def test_list_pool(self):
pool_list = cs.pools.list()
self.assertTrue(len(pool_list) > 0)
def test_list_pool_with_filter(self):
filters = {'id': 'p6-'}
for pool in cs.pools.list(filters):
for k, v in filters.iteritems():
self.assertIn(v, getattr(pool, k))
def test_list_pool_with_filter_no_match(self):
filters = {'id': str(uuid.uuid4())}
for pool in cs.pools.list(filters):
for k, v in filters.iteritems():
self.assertNotIn(getattr(pool, k), v)
def test_get_pool(self):
pool_id = cs.pools.list()[0].id
pool = cs.pools.get(pool_id)
self.assertEqual(pool.id, pool_id)

View File

@ -1,132 +0,0 @@
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import time
from seamicroclient.tests import utils
from seamicroclient.v2 import Client
STATUS_WAIT_TIMEOUT = 30
BUILD_INTERVAL = 10
SERVER_ID = '0/0'
UNTAGGED_VLAN_ID = '7'
TAGGED_VLAN_ID = '17'
cs = Client("admin", "seamicro", "http://chassis/v2.0")
class FunctionalException(Exception):
pass
class ServersTest(utils.TestCase):
@staticmethod
def wait_for_server_status(s, active):
start_time = int(time.time())
while True:
timed_out = int(time.time()) - start_time > STATUS_WAIT_TIMEOUT
if timed_out:
raise FunctionalException()
time.sleep(BUILD_INTERVAL)
s = s.refresh()
if s.active == active:
return
@staticmethod
def create_volume(size=1):
pool = cs.pools.list()[0]
return cs.volumes.create(size, pool)
def test_list_servers(self):
sl = cs.servers.list()
self.assertTrue(len(sl) > 0)
def test_get_server(self):
s = cs.servers.get(SERVER_ID)
self.assertEqual(s.id, SERVER_ID)
def test_power_on_power_off(self):
s = cs.servers.get(SERVER_ID)
self.assertEqual(s.id, SERVER_ID)
if s.active:
s.power_off()
self.wait_for_server_status(s, active=False)
s = s.refresh()
self.assertEqual(s.active, False)
s.power_on()
else:
s.power_on()
self.wait_for_server_status(s, active=True)
s = s.refresh()
self.assertEqual(s.active, True)
s.power_off()
def test_reset(self):
s = cs.servers.get(SERVER_ID)
self.assertEqual(s.id, SERVER_ID)
s.reset()
self.wait_for_server_status(s, active=True)
s = s.refresh()
self.assertEqual(s.active, True)
def test_attach_detach_volume(self):
volume_id = self.create_volume()
server = cs.servers.list()[0]
server.detach_volume()
server.attach_volume(volume_id)
server = server.refresh()
self.assertEqual(server.vdisk['0'], volume_id)
server.detach_volume()
cs.volumes.delete(volume_id)
def test_set_tagged_vlan(self):
server = cs.servers.list()[0]
server.unset_tagged_vlan(TAGGED_VLAN_ID)
server = server.refresh(10)
server.set_tagged_vlan(TAGGED_VLAN_ID)
server = server.refresh(10)
self.assertTrue(TAGGED_VLAN_ID in server.nic['0']['taggedVlan'])
server.unset_tagged_vlan(TAGGED_VLAN_ID)
def test_unset_tagged_vlan(self):
server = cs.servers.list()[0]
server.unset_tagged_vlan(TAGGED_VLAN_ID)
server = server.refresh(10)
server.set_tagged_vlan(TAGGED_VLAN_ID)
server = server.refresh(10)
server.unset_tagged_vlan(TAGGED_VLAN_ID)
server = server.refresh(10)
self.assertTrue(TAGGED_VLAN_ID not in server.nic['0']['taggedVlan'])
def test_unset_untagged_vlan(self):
server = cs.servers.list()[0]
server.unset_untagged_vlan(UNTAGGED_VLAN_ID)
server = server.refresh(10)
server.set_untagged_vlan(UNTAGGED_VLAN_ID)
server = server.refresh(10)
server.unset_untagged_vlan(UNTAGGED_VLAN_ID)
server = server.refresh(10)
self.assertTrue(UNTAGGED_VLAN_ID not in
server.nic['0']['untaggedVlan'])
def test_set_untagged_vlan(self):
server = cs.servers.list()[0]
server.unset_untagged_vlan(UNTAGGED_VLAN_ID)
server = server.refresh(10)
server.set_untagged_vlan(UNTAGGED_VLAN_ID)
server = server.refresh(10)
self.assertTrue(UNTAGGED_VLAN_ID in server.nic['0']['untaggedVlan'])
server.unset_untagged_vlan(UNTAGGED_VLAN_ID)

View File

@ -1,48 +0,0 @@
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from seamicroclient.tests import utils
from seamicroclient.v2 import Client
cs = Client("admin", "seamicro", "http://chassis/v2.0")
class VolumesTest(utils.TestCase):
@staticmethod
def create_volume(volume_size=2, pool=None):
return cs.volumes.create(volume_size, pool)
def test_list_volume(self):
volume_list = cs.volumes.list()
self.assertTrue(len(volume_list) > 0)
def test_get_volume(self):
volume_id = cs.volumes.list()[0].id
volume = cs.volumes.get(volume_id)
self.assertEqual(volume.id, volume_id)
def test_create_volume(self):
pool = cs.pools.list()[0]
volume_size = 2
volume_id = self.create_volume(volume_size, pool)
self.assertIn(pool.id, volume_id)
cs.volumes.delete(volume_id)
def test_delete_volume(self):
pool = cs.pools.list()[0]
volume = self.create_volume(pool=pool)
cs.volumes.delete(volume)
volume = cs.volumes.get(volume)
self.assertEqual(volume.actualSize, 0)

View File

@ -1,58 +0,0 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from seamicroclient import base
from seamicroclient.tests import utils
from seamicroclient.tests.v2 import fakes
from seamicroclient.v2 import servers
cs = fakes.FakeClient()
class BaseTest(utils.TestCase):
def test_resource_repr(self):
r = base.Resource(None, dict(foo="bar", baz="spam"))
self.assertEqual(repr(r), "<Resource baz=spam, foo=bar>")
def test_getid(self):
self.assertEqual(base.getid(4), 4)
class TmpObject(object):
id = 4
self.assertEqual(base.getid(TmpObject), 4)
def test_resource_lazy_getattr(self):
f = servers.Server(cs.servers, {'id': 1})
self.assertEqual(f.name, 'sample-server')
cs.assert_called('GET', '/servers/1')
# Missing stuff still fails after a second get
self.assertRaises(AttributeError, getattr, f, 'blahblah')
def test_eq(self):
# Two resources of the same type with the same id: equal
r1 = base.Resource(None, {'id': 1, 'name': 'hi'})
r2 = base.Resource(None, {'id': 1, 'name': 'hello'})
self.assertEqual(r1, r2)
# Two resoruces of different types: never equal
r1 = base.Resource(None, {'id': 1})
r2 = servers.Server(None, {'id': 1})
self.assertNotEqual(r1, r2)
# Two resources with no ID: equal if their info is equal
r1 = base.Resource(None, {'name': 'joe', 'age': 12})
r2 = base.Resource(None, {'name': 'joe', 'age': 12})
self.assertEqual(r1, r2)

View File

@ -1,33 +0,0 @@
# Copyright 2012 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import seamicroclient.client
from seamicroclient.tests import utils
import seamicroclient.v2.client
class ClientTest(utils.TestCase):
def test_client_with_timeout(self):
instance = seamicroclient.client.HTTPClient(user='user',
password='password',
timeout=2,
auth_url="http://test")
self.assertEqual(instance.timeout, 2)
def test_get_client_class_v2(self):
output = seamicroclient.client.get_client_class('2')
self.assertEqual(output, seamicroclient.v2.client.Client)

View File

@ -1,124 +0,0 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
import mock
import requests
from seamicroclient import client
from seamicroclient import exceptions
from seamicroclient.tests import utils
fake_response = utils.TestResponse({
"status_code": 200,
"text": '{"hi": "there"}',
})
mock_request = mock.Mock(return_value=(fake_response))
refused_response = utils.TestResponse({
"status_code": 400,
"text": '[Errno 111] Connection refused',
})
refused_mock_request = mock.Mock(return_value=(refused_response))
bad_req_response = utils.TestResponse({
"status_code": 400,
"text": '',
})
bad_req_mock_request = mock.Mock(return_value=(bad_req_response))
def get_client():
cl = client.HTTPClient("username", "password", "http://example.com")
return cl
def get_authed_client():
cl = get_client()
cl.auth_url = "http://example.com"
cl.auth_token = "token"
cl.user = "user"
cl.password = "password"
return cl
class ClientTest(utils.TestCase):
def test_get(self):
cl = get_authed_client()
@mock.patch.object(requests.Session, "request", mock_request)
@mock.patch('time.time', mock.Mock(return_value=1234))
def test_get_call():
resp, body = cl.get("/hi")
headers = {'Accept': 'application/json',
'User-Agent': 'python-seamicroclient'}
mock_request.assert_called_with(
"GET",
"http://example.com/hi?username=%s&password=%s" % (cl.user,
cl.password),
headers=headers)
# Automatic JSON parsing
self.assertEqual(body, {"hi": "there"})
test_get_call()
def test_post(self):
cl = get_authed_client()
@mock.patch.object(requests.Session, "request", mock_request)
def test_post_call():
body = {'k1': 'v1', 'k2': 'v2', 'authtoken': cl.auth_token}
cl.post("/hi", body=body)
headers = {'Content-Type': 'application/json', 'Accept':
'application/json',
'User-Agent': 'python-seamicroclient'}
mock_request.assert_called_with(
"POST",
"http://example.com/hi",
headers=headers,
data=json.dumps(body))
test_post_call()
def test_connection_refused(self):
cl = get_client()
@mock.patch.object(requests.Session, "request", refused_mock_request)
def test_refused_call():
self.assertRaises(exceptions.ConnectionRefused, cl.get, "/hi")
test_refused_call()
def test_bad_request(self):
cl = get_client()
@mock.patch.object(requests.Session, "request", bad_req_mock_request)
def test_refused_call():
self.assertRaises(exceptions.BadRequest, cl.get, "/hi")
test_refused_call()
def test_client_logger(self):
cl1 = client.HTTPClient("username", "password",
"http://example.com",
http_log_debug=True)
self.assertEqual(len(cl1._logger.handlers), 1)
cl2 = client.HTTPClient("username", "password",
"http://example.com",
http_log_debug=True)
self.assertEqual(len(cl2._logger.handlers), 1)

View File

@ -1,58 +0,0 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import fixtures
import requests
import testtools
class TestCase(testtools.TestCase):
def setUp(self):
super(TestCase, self).setUp()
if (os.environ.get('OS_STDOUT_CAPTURE') == 'True' or
os.environ.get('OS_STDOUT_CAPTURE') == '1'):
stdout = self.useFixture(fixtures.StringStream('stdout')).stream
self.useFixture(fixtures.MonkeyPatch('sys.stdout', stdout))
if (os.environ.get('OS_STDERR_CAPTURE') == 'True' or
os.environ.get('OS_STDERR_CAPTURE') == '1'):
stderr = self.useFixture(fixtures.StringStream('stderr')).stream
self.useFixture(fixtures.MonkeyPatch('sys.stderr', stderr))
class TestResponse(requests.Response):
"""
Class used to wrap requests.Response and provide some
convenience to initialize with a dict
"""
def __init__(self, data):
self._text = None
super(TestResponse, self)
if isinstance(data, dict):
self.status_code = data.get('status_code', None)
self.headers = data.get('headers', None)
# Fake the text attribute to streamline Response creation
self._text = data.get('text', None)
else:
self.status_code = data
def __eq__(self, other):
return self.__dict__ == other.__dict__
@property
def text(self):
return self._text

View File

@ -1,188 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from seamicroclient import client as base_client
from seamicroclient.openstack.common.py3kcompat import urlutils
from seamicroclient.tests import fakes
from seamicroclient.tests import utils
from seamicroclient.v2 import client
class FakeClient(fakes.FakeClient, client.Client):
def __init__(self, *args, **kwargs):
client.Client.__init__(self, 'username', 'password',
'auth_url')
self.client = FakeHTTPClient(**kwargs)
class FakeHTTPClient(base_client.HTTPClient):
def __init__(self, **kwargs):
self.username = 'username'
self.password = 'password'
self.auth_url = 'auth_url'
self.callstack = []
self.timings = 'timings'
self.http_log_debug = 'http_log_debug'
def _cs_request(self, url, method, **kwargs):
# Check that certain things are called correctly
if method in ['GET', 'DELETE']:
assert 'body' not in kwargs
elif method == 'PUT':
assert 'body' in kwargs
# Call the method
args = urlutils.parse_qsl(urlutils.urlparse(url)[4])
kwargs.update(args)
munged_url = url.rsplit('?', 1)[0]
munged_url = munged_url.strip('/').replace('/', '_').replace('.', '_')
munged_url = munged_url.replace('-', '_')
munged_url = munged_url.replace(' ', '_')
callback = "%s_%s" % (method.lower(), munged_url)
if not hasattr(self, callback):
raise AssertionError('Called unknown API method: %s %s, '
'expected fakes method name: %s' %
(method, url, callback))
# Note the call
self.callstack.append((method, url, kwargs.get('body', None)))
status, headers, body = getattr(self, callback)(**kwargs)
r = utils.TestResponse({
"status_code": status,
"text": body,
"headers": headers,
})
return r, body
def get_servers_1(self, **kwargs):
return (200, {}, {'id': 1234, 'name': 'sample-server'}
)
def get_servers(self, **kwargs):
return (200, {}, {'0/0': {}, '1/0': {}})
def put_servers_1(self, **kwargs):
return (200, {}, {})
def put_servers_1_nic_0_untaggedVlans(self, **kwargs):
return (200, {}, {})
def put_servers_1_nic_0_taggedVlans(self, **kwargs):
return (200, {}, {})
def get_storage_pools(self):
return (200, {}, {'0/p0-0': {}, '0/p1-1': {}})
def get_storage_pools_1(self):
return (200, {}, {'0/p0-0': {}})
def get_storage_volumes(self):
return (200, {}, {'0/p0-0/1': {}, '0/p1-1/1': {}})
def get_storage_volumes_1(self):
return (200, {}, {'1': {}})
def put_storage_volumes_0_p0_0_1(self, **kwargs):
return (200, {}, {'0/p0-0/1': {}})
def get_storage_volumes_0_p0_0_1(self, **kwargs):
return (200, {}, {'0/p0-0/1': {}})
def put_servers_1_vdisk_0(self, **kwargs):
return (200, {}, {})
def put_servers_1_vdisk_3(self, **kwargs):
return (200, {}, {})
def delete_servers_1_vdisk_0(self, **kwargs):
return (200, {}, {})
def get_storage_disks(self, **kwargs):
return (200, {}, {})
def get_storage_disks_1(self, **kwargs):
return (200, {}, {'1': {}})
def put_storage_disks_1(self, **kwargs):
return (200, {}, {'1': {}})
def get_chassis(self, **kwargs):
return (200, {}, {})
def put_chassis_system_writeMem(self, **kwargs):
return (200, {}, {})
def get_chassis_fanTray(self, **kwargs):
return (200, {}, {})
def get_chassis_fanTray_1(self, **kwargs):
return (200, {}, {})
def get_interfaces(self, **kwargs):
return (200, {}, {})
def get_interfaces_1(self, **kwargs):
return (200, {}, {})
def put_interfaces_1_shutdown(self, **kwargs):
return (200, {}, {})
def put_interfaces_1_vlans_taggedVlans(self, **kwargs):
return (200, {}, {})
def put_interfaces_1_vlans_untaggedVlans(self, **kwargs):
return (200, {}, {})
def put_storage_pools_1_pool_name(self, **kwargs):
return (200, {}, {})
def delete_storage_pools_1_pool_name(self, **kwargs):
return (200, {}, {})
def put_storage_pools_1(self, **kwargs):
return (200, {}, {})
def get_chassis_powersupply(self, **kwargs):
return (200, {}, {})
def get_chassis_powersupply_1(self, **kwargs):
return (200, {}, {})
def get_chassis_scard(self, **kwargs):
return (200, {}, {})
def get_chassis_scard_1(self, **kwargs):
return (200, {}, {})
def put_chassis_scard_1_mgmtMode(self, **kwargs):
return (200, {}, {})
def get_chassis_smcard(self, **kwargs):
return (200, {}, {})
def get_chassis_smcard_1(self, **kwargs):
return (200, {}, {})
def get_chassis_systems(self, **kwargs):
return (200, {}, {})
def put_chassis_system_switchover(self, **kwargs):
return (200, {}, {})
def put_chassis_system_reload(self, **kwargs):
return (200, {}, {})

View File

@ -1,33 +0,0 @@
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from seamicroclient.tests import utils
from seamicroclient.tests.v2 import fakes
from seamicroclient.v2 import smcards
cs = fakes.FakeClient()
class SMCardstest(utils.TestCase):
def test_list_smcards(self):
pl = cs.smcards.list()
cs.assert_called('GET', '/chassis/smcard')
[self.assertTrue(isinstance(s, smcards.SMCard)) for s in pl]
def test_get_smcards(self):
p = cs.smcards.get(1)
cs.assert_called('GET', '/chassis/smcard/1')
self.assertTrue(isinstance(p, smcards.SMCard))

View File

@ -1,32 +0,0 @@
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from seamicroclient.tests import utils
from seamicroclient.tests.v2 import fakes
from seamicroclient.v2 import chassis
cs = fakes.FakeClient()
class ChassisTest(utils.TestCase):
def test_list_chassiss(self):
sl = cs.chassis.list()
cs.assert_called('GET', '/chassis')
[self.assertTrue(isinstance(s, chassis.Chassis)) for s in sl]
def test_chassis_write_mem(self):
cs.chassis.writemem(1)
cs.assert_called('PUT', '/chassis/system/writeMem')

View File

@ -1,49 +0,0 @@
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from seamicroclient.tests import utils
from seamicroclient.tests.v2 import fakes
from seamicroclient.v2 import disks
cs = fakes.FakeClient()
class DisksTest(utils.TestCase):
def test_list_disks(self):
sl = cs.disks.list()
cs.assert_called('GET', '/storage/disks')
[self.assertTrue(isinstance(s, disks.Disk)) for s in sl]
def test_get_disk(self):
s = cs.disks.get(1)
cs.assert_called('GET', '/storage/disks/1')
self.assertTrue(isinstance(s, disks.Disk))
def test_disk_power_on(self):
cs.disks.power_on(1)
cs.assert_called('PUT', '/storage/disks/1')
def test_disk_power_off(self):
cs.disks.power_off(1)
cs.assert_called('PUT', '/storage/disks/1')
def test_disk_activate_led(self):
cs.disks.activate_led(1)
cs.assert_called('PUT', '/storage/disks/1')
def test_disk_deactivate_lef(self):
cs.disks.deactivate_led(1)
cs.assert_called('PUT', '/storage/disks/1')

View File

@ -1,32 +0,0 @@
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from seamicroclient.tests import utils
from seamicroclient.tests.v2 import fakes
from seamicroclient.v2 import fantrays
cs = fakes.FakeClient()
class FanTraysTest(utils.TestCase):
def test_list_fantrays(self):
sl = cs.fantrays.list()
cs.assert_called('GET', '/chassis/fanTray')
[self.assertTrue(isinstance(s, fantrays.FanTray)) for s in sl]
def test_fantray_get(self):
cs.fantrays.get(1)
cs.assert_called('GET', '/chassis/fanTray/1')

View File

@ -1,60 +0,0 @@
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from seamicroclient.tests import utils
from seamicroclient.tests.v2 import fakes
from seamicroclient.v2 import interfaces
cs = fakes.FakeClient()
class InterfacesTest(utils.TestCase):
def test_list_interfaces(self):
sl = cs.interfaces.list()
cs.assert_called('GET', '/interfaces')
[self.assertTrue(isinstance(s, interfaces.Interface)) for s in sl]
def test_interface_get(self):
cs.interfaces.get(1)
cs.assert_called('GET', '/interfaces/1')
def test_interface_shutdown(self):
cs.interfaces.shutdown(1)
cs.assert_called('PUT', '/interfaces/1/shutdown')
def test_interface_no_shutdown(self):
cs.interfaces.no_shutdown(1)
cs.assert_called('PUT', '/interfaces/1/shutdown')
def test_interface_add_taggedvlan_list(self):
cs.interfaces.add_tagged_vlan(1, [1, 2, 3])
cs.assert_called('PUT', '/interfaces/1/vlans/taggedVlans')
def test_interface_add_taggedvlan_single(self):
cs.interfaces.add_tagged_vlan(1, '23-25')
cs.assert_called('PUT', '/interfaces/1/vlans/taggedVlans')
def test_interface_remove_taggedvlan(self):
cs.interfaces.remove_tagged_vlan(1, '23-25')
cs.assert_called('PUT', '/interfaces/1/vlans/taggedVlans')
def test_interface_add_untaggedvlan(self):
cs.interfaces.add_untagged_vlan(1, '23')
cs.assert_called('PUT', '/interfaces/1/vlans/untaggedVlans')
def test_interface_remove_untaggedvlan(self):
cs.interfaces.remove_untagged_vlan(1, '23')
cs.assert_called('PUT', '/interfaces/1/vlans/untaggedVlans')

View File

@ -1,49 +0,0 @@
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from seamicroclient.tests import utils
from seamicroclient.tests.v2 import fakes
from seamicroclient.v2 import pools
cs = fakes.FakeClient()
class PoolsTest(utils.TestCase):
def test_list_pools(self):
pl = cs.pools.list()
cs.assert_called('GET', '/storage/pools')
[self.assertTrue(isinstance(s, pools.Pool)) for s in pl]
def test_get_pool(self):
p = cs.pools.get(1)
cs.assert_called('GET', '/storage/pools/1')
self.assertTrue(isinstance(p, pools.Pool))
def test_create_pool(self):
cs.pools.create(1, "pool-name", [1, 5, 6])
cs.assert_called('PUT', '/storage/pools/1/pool-name')
def test_delete_pool(self):
cs.pools.delete('1/pool-name')
cs.assert_called('DELETE', '/storage/pools/1/pool-name')
def test_mount_pool(self):
cs.pools.mount(1)
cs.assert_called('PUT', '/storage/pools/1')
def test_unmount_pool(self):
cs.pools.unmount(1)
cs.assert_called('PUT', '/storage/pools/1')

View File

@ -1,33 +0,0 @@
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from seamicroclient.tests import utils
from seamicroclient.tests.v2 import fakes
from seamicroclient.v2 import powersupplies
cs = fakes.FakeClient()
class PowerSuppliesTest(utils.TestCase):
def test_list_powersupplies(self):
pl = cs.powersupplies.list()
cs.assert_called('GET', '/chassis/powersupply')
[self.assertTrue(isinstance(s, powersupplies.PowerSupply)) for s in pl]
def test_get_powersupplie(self):
p = cs.powersupplies.get(1)
cs.assert_called('GET', '/chassis/powersupply/1')
self.assertTrue(isinstance(p, powersupplies.PowerSupply))

View File

@ -1,37 +0,0 @@
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from seamicroclient.tests import utils
from seamicroclient.tests.v2 import fakes
from seamicroclient.v2 import scards
cs = fakes.FakeClient()
class ScardsTest(utils.TestCase):
def test_list_scards(self):
pl = cs.scards.list()
cs.assert_called('GET', '/chassis/scard')
[self.assertTrue(isinstance(s, scards.Scard)) for s in pl]
def test_get_scard(self):
p = cs.scards.get(1)
cs.assert_called('GET', '/chassis/scard/1')
self.assertTrue(isinstance(p, scards.Scard))
def test_scard_management_mode(self):
cs.scards.set_management_mode(1, 'disk')
self.assertTrue('PUT', '/chassis/scard/1/mgmtMode')

View File

@ -1,77 +0,0 @@
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from seamicroclient.tests import utils
from seamicroclient.tests.v2 import fakes
from seamicroclient.v2 import servers
cs = fakes.FakeClient()
class ServersTest(utils.TestCase):
def test_list_servers(self):
sl = cs.servers.list()
cs.assert_called('GET', '/servers')
[self.assertTrue(isinstance(s, servers.Server)) for s in sl]
def test_get_server(self):
s = cs.servers.get(1)
cs.assert_called('GET', '/servers/1')
self.assertTrue(isinstance(s, servers.Server))
def test_server_power_on(self):
cs.servers.power_on(1)
cs.assert_called('PUT', '/servers/1')
def test_server_power_off(self):
cs.servers.power_off(1)
cs.assert_called('PUT', '/servers/1?action=power-off')
def test_server_reset(self):
cs.servers.reset(1)
cs.assert_called('PUT', '/servers/1')
def test_server_set_tagged_vlan(self):
cs.servers.set_tagged_vlan(1, '12-12')
cs.assert_called('PUT', '/servers/1/nic/0/taggedVlans')
def test_server_unset_tagged_vlan(self):
cs.servers.unset_tagged_vlan(1, '12-12')
cs.assert_called('PUT', '/servers/1/nic/0/taggedVlans')
def test_server_set_untagged_vlan(self):
cs.servers.set_untagged_vlan(1, '12-12')
cs.assert_called('PUT', '/servers/1/nic/0/untaggedVlans')
def test_server_unset_untagged_vlan(self):
cs.servers.unset_untagged_vlan(1, '12-12')
cs.assert_called('PUT', '/servers/1/nic/0/untaggedVlans')
def test_server_attach_volume_default(self):
cs.servers.attach_volume(1, '1/p6-6/vol1')
cs.assert_called('PUT', '/servers/1/vdisk/0')
def test_server_attach_volume_with_vdisk(self):
cs.servers.attach_volume(1, '1/p6-6/vol1', vdisk=3)
cs.assert_called('PUT', '/servers/1/vdisk/3')
def test_server_detach_volume(self):
cs.servers.detach_volume(1, vdisk=0)
cs.assert_called('DELETE', '/servers/1/vdisk/0')
def test_set_boot_order(self):
cs.servers.set_boot_order(1, 'pxe')
cs.assert_called('PUT', '/servers/1')

View File

@ -1,40 +0,0 @@
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from seamicroclient.tests import utils
from seamicroclient.tests.v2 import fakes
from seamicroclient.v2 import system
cs = fakes.FakeClient()
class Systemstest(utils.TestCase):
def test_list_system(self):
pl = cs.system.list()
cs.assert_called('GET', '/chassis/systems')
[self.assertTrue(isinstance(s, system.System)) for s in pl]
def test_switchover_system(self):
cs.system.switchover(1)
cs.assert_called('PUT', '/chassis/system/switchover')
def test_writemem_system(self):
cs.system.writemem(1)
cs.assert_called('PUT', '/chassis/system/writeMem')
def test_reload_system(self):
cs.system.reload(1)
cs.assert_called('PUT', '/chassis/system/reload')

View File

@ -1,38 +0,0 @@
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from seamicroclient.tests import utils
from seamicroclient.tests.v2 import fakes
from seamicroclient.v2 import volumes
cs = fakes.FakeClient()
class VolumesTest(utils.TestCase):
def test_list_volumes(self):
vl = cs.volumes.list()
cs.assert_called('GET', '/storage/volumes')
[self.assertTrue(isinstance(s, volumes.Volume)) for s in vl]
def test_get_volume(self):
p = cs.volumes.get(1)
cs.assert_called('GET', '/storage/volumes/1')
self.assertTrue(isinstance(p, volumes.Volume))
def test_create_volume(self):
v = cs.volumes.create(2, '0/p0_0', '1')
cs.assert_called('PUT', '/storage/volumes/0/p0_0/1')
self.assertTrue(isinstance(v, volumes.Volume))

View File

@ -1,175 +0,0 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import pkg_resources
import sys
import uuid
import six
from seamicroclient import exceptions
from seamicroclient.openstack.common import strutils
def env(*args, **kwargs):
"""
returns the first environment variable set
if none are non-empty, defaults to '' or keyword arg default
"""
for arg in args:
value = os.environ.get(arg, None)
if value:
return value
return kwargs.get('default', '')
def find_resource(manager, name_or_id, **find_args):
"""Helper for the _find_* methods."""
# first try to get entity as integer id
try:
return manager.get(int(name_or_id))
except (TypeError, ValueError, exceptions.NotFound):
pass
# now try to get entity as uuid
try:
tmp_id = strutils.safe_encode(name_or_id)
if six.PY3:
tmp_id = tmp_id.decode()
uuid.UUID(tmp_id)
return manager.get(tmp_id)
except (TypeError, ValueError, exceptions.NotFound):
pass
# for str id which is not uuid (for Flavor search currently)
if getattr(manager, 'is_alphanum_id_allowed', False):
try:
return manager.get(name_or_id)
except exceptions.NotFound:
pass
try:
try:
return manager.find(human_id=name_or_id, **find_args)
except exceptions.NotFound:
pass
# finally try to find entity by name
try:
resource = getattr(manager, 'resource_class', None)
name_attr = resource.NAME_ATTR if resource else 'name'
kwargs = {name_attr: name_or_id}
kwargs.update(find_args)
return manager.find(**kwargs)
except exceptions.NotFound:
msg = "No %s with a name or ID of '%s' exists." % \
(manager.resource_class.__name__.lower(), name_or_id)
raise exceptions.CommandError(msg)
except exceptions.NoUniqueMatch:
msg = ("Multiple %s matches found for '%s', use an ID to be more"
" specific." % (manager.resource_class.__name__.lower(),
name_or_id))
raise exceptions.CommandError(msg)
def _format_field_name(attr):
"""Format an object attribute in a human-friendly way."""
# Split at ':' and leave the extension name as-is.
parts = attr.rsplit(':', 1)
name = parts[-1].replace('_', ' ')
# Don't title() on mixed case
if name.isupper() or name.islower():
name = name.title()
parts[-1] = name
return ': '.join(parts)
def _make_field_formatter(attr, filters=None):
"""
Given an object attribute, return a formatted field name and a
formatter suitable for passing to print_list.
Optionally pass a dict mapping attribute names to a function. The function
will be passed the value of the attribute and should return the string to
display.
"""
filter_ = None
if filters:
filter_ = filters.get(attr)
def get_field(obj):
field = getattr(obj, attr, '')
if field and filter_:
field = filter_(field)
return field
name = _format_field_name(attr)
formatter = get_field
return name, formatter
class HookableMixin(object):
"""Mixin so classes can register and run hooks."""
_hooks_map = {}
@classmethod
def add_hook(cls, hook_type, hook_func):
if hook_type not in cls._hooks_map:
cls._hooks_map[hook_type] = []
cls._hooks_map[hook_type].append(hook_func)
@classmethod
def run_hooks(cls, hook_type, *args, **kwargs):
hook_funcs = cls._hooks_map.get(hook_type) or []
for hook_func in hook_funcs:
hook_func(*args, **kwargs)
def safe_issubclass(*args):
"""Like issubclass, but will just return False if not a class."""
try:
if issubclass(*args):
return True
except TypeError:
pass
return False
def import_class(import_str):
"""Returns a class from a string including module and class."""
mod_str, _sep, class_str = import_str.rpartition('.')
__import__(mod_str)
return getattr(sys.modules[mod_str], class_str)
def _load_entry_point(ep_name, name=None):
"""Try to load the entry point ep_name that matches name."""
for ep in pkg_resources.iter_entry_points(ep_name, name=name):
try:
return ep.load()
except (ImportError, pkg_resources.UnknownExtra, AttributeError):
continue
def is_integer_like(val):
"""Returns validation of a value as an integer."""
try:
value = int(val)
return True
except (TypeError, ValueError, AttributeError):
return False

View File

@ -1,13 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from seamicroclient.v2.client import Client # noqa

View File

@ -1,44 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Chassis interface.
"""
from seamicroclient import base
class Chassis(base.Resource):
HUMAN_ID = True
def writemem(self, **kwargs):
return self.manager.writemem(self, **kwargs)
class ChassisManager(base.ManagerWithFind):
resource_class = Chassis
def list(self, filters=None):
"""
Get a list of chassis properties.
:rtype: list of :class:`Chassis`
"""
return self._list("/chassis", filters=filters)
def writemem(self, chassis, **kwargs):
"""
Write current chassis config to flash memory
This will persist even after reboot of chassis
"""
url = "/chassis/system/writeMem"
return self.api.client.put(url, body={})

View File

@ -1,82 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from seamicroclient import client
from seamicroclient.v2 import chassis
from seamicroclient.v2 import disks
from seamicroclient.v2 import fantrays
from seamicroclient.v2 import interfaces
from seamicroclient.v2 import pools
from seamicroclient.v2 import powersupplies
from seamicroclient.v2 import scards
from seamicroclient.v2 import servers
from seamicroclient.v2 import smcards
from seamicroclient.v2 import system
from seamicroclient.v2 import volumes
class Client(object):
"""
Top-level object to access the Seamicro Chassis API.
Create an instance with your creds::
>>> client = Client(USERNAME, PASSWORD, AUTH_URL)
Then call methods on its managers::
>>> client.servers.list()
...
>>> client.chassis.list()
...
"""
def __init__(self, username, password, auth_url=None,
timeout=None, http_log_debug=False):
self.servers = servers.ServerManager(self)
self.pools = pools.PoolManager(self)
self.volumes = volumes.VolumeManager(self)
self.disks = disks.DiskManager(self)
self.chassis = chassis.ChassisManager(self)
self.fantrays = fantrays.FanTrayManager(self)
self.interfaces = interfaces.InterfaceManager(self)
self.powersupplies = powersupplies.PowerSupplyManager(self)
self.scards = scards.ScardManager(self)
self.smcards = smcards.SMCardManager(self)
self.system = system.SystemManager(self)
self.client = client.HTTPClient(username,
password,
auth_url=auth_url,
timeout=timeout,
http_log_debug=http_log_debug)
def get_timings(self):
return self.client.get_timings()
def reset_timings(self):
self.client.reset_timings()
def authenticate(self):
"""
Authenticate against the server.
Normally this is called automatically when you first access the API,
but you can call this method to force authentication right now.
Returns on success; raises :exc:`exceptions.Unauthorized` if the
credentials are wrong.
"""
self.client.authenticate()

View File

@ -1,87 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Disks interface.
"""
from seamicroclient import base
class Disk(base.Resource):
HUMAN_ID = True
def power_on(self, **kwargs):
return self.manager.power_on(self, **kwargs)
def power_off(self, **kwargs):
return self.manager.power_off(self, **kwargs)
def active_led(self, **kwargs):
return self.manager.active_led(self, **kwargs)
def deactivate_led(self, **kwargs):
return self.manager.deactivate_led(self, **kwargs)
class DiskManager(base.ManagerWithFind):
resource_class = Disk
def get(self, disk):
"""
Get a disk.
:param disk: ID of the :class:`Disk` to get.
:rtype: :class:`Disk`
"""
return self._get(base.getid(disk),
"/storage/disks/%s" % base.getid(disk))
def list(self, filters=None):
"""
Get a list of disks.
:rtype: list of :class:`Disk`
"""
return self._list("/storage/disks", filters=filters)
def power_off(self, disk, **kwargs):
"""
Power off the specified Disk
"""
url = "/storage/disks/%s" % base.getid(disk)
body = {'action': 'power-off'}
return self.api.client.put(url, body=body)
def power_on(self, disk, **kwargs):
"""
Power on the specified Disk
"""
url = "/storage/disks/%s" % base.getid(disk)
body = {'action': 'power-on'}
return self.api.client.put(url, body=body)
def activate_led(self, disk, **kwargs):
"""
Activate LED of the specified Disk
"""
url = "/storage/disks/%s" % base.getid(disk)
body = {'action': 'activate-led'}
return self.api.client.put(url, body=body)
def deactivate_led(self, disk, **kwargs):
"""
De-activate LED of the specified Disk
"""
url = "/storage/disks/%s" % base.getid(disk)
body = {'action': 'deactivate-led'}
return self.api.client.put(url, body=body)

View File

@ -1,43 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
FanTray interface.
"""
from seamicroclient import base
class FanTray(base.Resource):
HUMAN_ID = True
class FanTrayManager(base.ManagerWithFind):
resource_class = FanTray
def get(self, fantray):
"""
Get a fantray.
:param fantray: ID of the :class:`FanTray` to get.
:rtype: :class:`FanTray`
"""
return self._get(base.getid(fantray),
"/chassis/fanTray/%s" % base.getid(fantray))
def list(self, filters=None):
"""
Get a list of fantray properties.
:rtype: list of :class:`FanTray`
"""
return self._list("/chassis/fanTrays", filters=filters)

View File

@ -1,113 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Interfaces object.
"""
from seamicroclient import base
class Interface(base.Resource):
HUMAN_ID = True
def shutdown(self, **kwargs):
return self.manager.shutdown(self, **kwargs)
def no_shutdown(self, **kwargs):
return self.manager.no_shutdown(self, **kwargs)
def add_tagged_vlan(self, vlan_id, **kwargs):
return self.manager.add_tagged_vlan(self, vlan_id, **kwargs)
def remove_tagged_vlan(self, vlan_id, **kwargs):
return self.manager.remove_tagged_vlan(self, vlan_id, **kwargs)
def add_untagged_vlan(self, vlan_id, **kwargs):
return self.manager.add_untagged_vlan(self, vlan_id, **kwargs)
def remove_untagged_vlan(self, vlan_id, **kwargs):
return self.manager.remove_untagged_vlan(self, vlan_id, **kwargs)
class InterfaceManager(base.ManagerWithFind):
resource_class = Interface
def get(self, interface):
"""
Get a interface.
:param interface: ID of the :class:`Interface` to get.
:rtype: :class:`Interface`
"""
return self._get(base.getid(interface),
"/interfaces/%s" % base.getid(interface))
def list(self, filters=None):
"""
Get a list of interfaces.
:rtype: list of :class:`Interface`
"""
return self._list("/interfaces", filters=filters)
def shutdown(self, interface, **kwargs):
"""
Shutdown the specified network Interface
"""
url = "/interfaces/%s/shutdown" % base.getid(interface)
body = {'value': True}
return self.api.client.put(url, body=body)
def no_shutdown(self, interface, **kwargs):
"""
Start the specified network Interface
"""
url = "/interfaces/%s/shutdown" % base.getid(interface)
body = {'value': False}
return self.api.client.put(url, body=body)
def add_tagged_vlan(self, interface, vlan_id, **kwargs):
"""
Add tagged vlan for the given Interface
"""
url = '/interfaces/%s/vlans/taggedVlans' % base.getid(interface)
if isinstance(vlan_id, list):
vlan_id = map(lambda x: str(x), vlan_id)
body = {'add': ','.join(vlan_id)}
else:
body = {'add': str(vlan_id)}
return self.api.client.put(url, body=body)
def remove_tagged_vlan(self, interface, vlan_id, **kwargs):
"""
Remove tagged vlan for the given Interface
"""
url = '/interfaces/%s/vlans/taggedVlans' % base.getid(interface)
body = {'remove': str(vlan_id)}
return self.api.client.put(url, body=body)
def add_untagged_vlan(self, interface, vlan_id, **kwargs):
"""
Add untagged vlan for the given Interface
"""
url = '/interfaces/%s/vlans/untaggedVlans' % base.getid(interface)
body = {'add': str(vlan_id)}
return self.api.client.put(url, body=body)
def remove_untagged_vlan(self, interface, vlan_id, **kwargs):
"""
Remove untagged vlan for the given Interface
"""
url = '/interfaces/%s/vlans/untaggedVlans' % base.getid(interface)
body = {'remove': str(vlan_id)}
return self.api.client.put(url, body=body)

View File

@ -1,89 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Pool interface.
"""
from seamicroclient import base
class Pool(base.Resource):
HUMAN_ID = True
def delete(self, **kwargs):
return self.manager.delete(self, **kwargs)
def mount(self, **kwargs):
return self.manager.mount(self, **kwargs)
class PoolManager(base.ManagerWithFind):
resource_class = Pool
def get(self, pool):
"""
Get a pool.
:param pool: ID of the :class:`Pool` to get.
:rtype: :class:`Pool`
"""
return self._get(base.getid(pool),
"/storage/pools/%s" % base.getid(pool))
def list(self, filters=None):
"""
Get a list of pools.
:rtype: list of :class:`Pool`
"""
return self._list("/storage/pools", filters=filters)
def create(self, slot, pool_name, disks, raid_level=0, **kwargs):
"""
Create a pool on the given scard slot using disks of that slot.
:rtype: Instance of :class:`Pool`
"""
disks = map(lambda x: str(x), disks)
body = {'disks': ','.join(disks), 'raidLevel': raid_level}
url = '/storage/pools/%s/%s' % (base.getid(slot), pool_name)
return self.api.client.put(url, body=body)
def delete(self, pool, **kwargs):
"""
Delete the specified pool.
"""
url = '/storage/pools/%s' % base.getid(pool)
return self.api.client.delete(url)
def mount(self, pool, **kwargs):
"""
Mount the specified Pool
"""
return self._action("mount", pool, **kwargs)
def unmount(self, pool, **kwargs):
"""
UnMount the specified Pool
"""
return self._action("unmount", pool, **kwargs)
def _action(self, action, pool, info=None, **kwargs):
"""
Perform a pool "action" -- .
"""
body = {"action": action}
self.run_hooks('modify_body_for_action', body, **kwargs)
url = '/storage/pools/%s' % base.getid(pool)
return self.api.client.put(url, body=body)

View File

@ -1,43 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
PowerSupply interface.
"""
from seamicroclient import base
class PowerSupply(base.Resource):
HUMAN_ID = True
class PowerSupplyManager(base.ManagerWithFind):
resource_class = PowerSupply
def get(self, powersupply):
"""
Get a powersupply.
:param powersupply: ID of the :class:`PowerSupply` to get.
:rtype: :class:`PowerSupply`
"""
return self._get(base.getid(powersupply),
"/chassis/powersupply/%s" % base.getid(powersupply))
def list(self, filters=None):
"""
Get a list of powersupply properties.
:rtype: list of :class:`PowerSupply`
"""
return self._list("/chassis/powersupply", filters=filters)

View File

@ -1,59 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Scard interface.
"""
from seamicroclient import base
class Scard(base.Resource):
HUMAN_ID = True
def set_management_mode(self, mode, force=False, **kwargs):
return self.manager.set_management_mode(self, mode, force, **kwargs)
def volume_mode(self, value, **kwargs):
if value:
return self.manager.set_management_mode(self, 'volume', **kwargs)
else:
return self.manager.set_management_mode(self, 'disk', **kwargs)
class ScardManager(base.ManagerWithFind):
resource_class = Scard
def list(self, filters=None):
"""
Get a list of scard properties.
:rtype: list of :class:`Scard`
"""
return self._list("/chassis/scards", filters=filters)
def get(self, scard, **kwargs):
"""
Get a specific scard.
:rtype: Instance of :class:`Scard`
"""
return self._get(base.getid(scard),
'/chassis/scard/%s' % base.getid(scard))
def set_management_mode(self, scard, mode, force=False, **kwargs):
"""
Set management mode of the specified scard
"""
url = "/chassis/scard/%s/mgmtMode" % base.getid(scard)
body = {'value': mode}
return self.api.client.put(url, body=body)

View File

@ -1,237 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Server interface.
"""
from seamicroclient import base
TAGGED_VLAN = "taggedVlans"
UNTAGGED_VLAN = "untaggedVlan"
class Server(base.Resource):
HUMAN_ID = True
def power_on(self, using_pxe=False):
self.manager.power_on(self, using_pxe)
def power_off(self, force=False):
self.manager.power_off(self, force)
def reset(self, using_pxe=False):
self.manager.reset(self, using_pxe)
def set_tagged_vlan(self, vlan_id, nics=[], **kwargs):
self.manager.set_tagged_vlan(self, vlan_id, nics, **kwargs)
def unset_tagged_vlan(self, vlan_id, nics=[], **kwargs):
self.manager.unset_tagged_vlan(self, vlan_id, nics, **kwargs)
def set_untagged_vlan(self, vlan_id, nics=[], **kwargs):
self.manager.set_untagged_vlan(self, vlan_id, nics, **kwargs)
def unset_untagged_vlan(self, vlan_id, nics=[], **kwargs):
self.manager.unset_untagged_vlan(self, vlan_id, nics, **kwargs)
def attach_volume(self, volume, vdisk=0, **kwargs):
self.manager.attach_volume(self, volume, vdisk, **kwargs)
def detach_volume(self, vdisk=0, **kwargs):
self.manager.detach_volume(self, vdisk, **kwargs)
def set_boot_order(self, boot_order="hd0", **kwargs):
self.manager.set_boot_order(self, boot_order, **kwargs)
def get_boot_order(self, **kwargs):
return self.manager.get_boot_order(self, **kwargs)
class ServerManager(base.ManagerWithFind):
resource_class = Server
def get(self, server):
"""
Get a server.
:param server: ID of the :class:`Server` to get.
:rtype: :class:`Server`
"""
return self._get(base.getid(server),
"/servers/%s" % base.getid(server))
def list(self):
"""
Get a list of servers.
:rtype: list of :class:`Server`
"""
return self._list("/servers")
def attach_volume(self, server, volume, vdisk=0, **kwargs):
"""
Attach volume to vdisk # to given server
:param server: The :class:`Server` (or its ID) to power on.
:param volume: The :class:`Volume` (or its ID) that is to be attached.
:param vdisk: The vdisk number of the server to attach volume to.
:
"""
body = {"value": volume}
self.run_hooks('modify_body_for_action', body, **kwargs)
url = '/servers/%s/vdisk/%s' % (base.getid(server), vdisk)
return self.api.client.put(url, body=body)
def detach_volume(self, server, vdisk=0, **kwargs):
"""
Detach volume attached to vdisk # of given server
:param server: The :class:`Server` (or its ID) to power on.
:param vdisk: The vdisk number of the server to detach volume to.
:
"""
url = '/servers/%s/vdisk/%s' % (base.getid(server), vdisk)
return self._delete(url)
def power_on(self, server, using_pxe=False, **kwargs):
"""
Power on a server.
:param server: The :class:`Server` (or its ID) to power on.
:param using_pxe: power on server and use pxe boot.
"""
action_params = {}
if using_pxe:
action_params = {"using-pxe": using_pxe}
self._action('power-on', server, action_params)
def power_off(self, server, force=False, **kwargs):
"""
Power off a server.
:param server: The :class:`Server` (or its ID) to power off.
:param force: force the server to power off.
"""
action = 'power-off'
url = '/servers/%s?action=%s' % (base.getid(server), action)
if force:
url = '%s&force=true' % url
return self.api.client.put(url, body={})
def reset(self, server, using_pxe=False, **kwargs):
"""
Reset power of a server.
:param server: The :class:`Server` (or its ID) to power on.
:param using_pxe: reset and power on server and use pxe boot.
"""
action_params = {}
if using_pxe:
action_params = {"using-pxe": using_pxe}
self._action('reset', server, action_params)
def set_tagged_vlan(self, server, vlan_id, nics=[], **kwargs):
"""
Set the tagged vlan id for the server.
:param server: The :class:`Server` (or its ID) to power on.
:param vlan_id: The tagged vlan id for the server.
"""
self._handle_vlan(server, vlan_id, TAGGED_VLAN, nics, **kwargs)
def unset_tagged_vlan(self, server, vlan_id, nics=[], **kwargs):
"""
Unset the tagged vlan id for the server.
:param server: The :class:`Server` (or its ID) to power on.
:param vlan_id: The tagged vlan id for the server.
"""
self._handle_vlan(server, vlan_id, TAGGED_VLAN, nics, unset=True, **kwargs)
def set_untagged_vlan(self, server, vlan_id, nics=[], **kwargs):
"""
Set the untagged vlan id for the server.
:param server: The :class:`Server` (or its ID) to power on.
:param vlan_id: The untagged vlan id for the server.
"""
self._handle_vlan(server, vlan_id, UNTAGGED_VLAN, nics, **kwargs)
def unset_untagged_vlan(self, server, vlan_id, nics=[], **kwargs):
"""
Unset the untagged vlan id for the server.
:param server: The :class:`Server` (or its ID) to power on.
:param vlan_id: The untagged vlan id for the server.
"""
self._handle_vlan(server, vlan_id, UNTAGGED_VLAN, nics,
unset=True, **kwargs)
def _handle_vlan(self, server, vlan_id, vlan_type, nics=[], unset=False, **kwargs):
"""
Set/Unset tagged/untagged vlan id for the server.
:param server: The :class:`Server` (or its ID) to power on.
:param vlan_id: The tagged vlan id for the server.
:param vlan_type: tagged-vlan or untagged-vlan type.
:param unset: Boolean flag to unset the vlan_id for the server.
"""
if vlan_id is not None:
action_params = {}
if unset:
action_params.update({'remove': vlan_id})
else:
action_params.update({'add': vlan_id})
self.run_hooks('modify_body_for_action', action_params, **kwargs)
if nics:
for nic in nics:
url = '/servers/%s/nic/%s/%s' % (base.getid(server), nic[3:], vlan_type)
self.api.client.put(url, body=action_params)
return
else:
u = '/servers/%s/nic' % (base.getid(server))
nics = self._list(u)
for nic in nics:
url = '/servers/%s/nic/%s/%s' % (base.getid(server), nic.id, vlan_type)
self.api.client.put(url, body=action_params)
return
def set_boot_order(self, server, boot_order="hd0", **kwargs):
"""
Set bios boot order for the server
:param server: The :class:`Server` (or its ID)
:param boot_order: The boot order for the server
"""
action_params = {'boot-order': boot_order}
if boot_order == "pxe":
action_params.update({'boot-order': 'pxe,hd0'})
return self._action('set-bios-boot-order', server, action_params)
def get_boot_order(self, server):
url = '/server/%s/bios/bootOrder' % base.getid(server)
ret,val = self.api.client.get(url)
return (ret , val.split(" ")[0])
def _action(self, action, server, info=None, **kwargs):
"""
Perform a server "action" -- power-on/power-off/reset/etc.
"""
body = {"action": action}
body.update(info)
self.run_hooks('modify_body_for_action', body, **kwargs)
url = '/servers/%s' % base.getid(server)
return self.api.client.put(url, body=body)

View File

@ -1,43 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
SMCard interface.
"""
from seamicroclient import base
class SMCard(base.Resource):
HUMAN_ID = True
class SMCardManager(base.ManagerWithFind):
resource_class = SMCard
def get(self, smcard):
"""
Get a smcard.
:param smcard: ID of the :class:`SMCard` to get.
:rtype: :class:`SMCard`
"""
return self._get(base.getid(smcard),
"/chassis/smcard/%s" % base.getid(smcard))
def list(self, filters=None):
"""
Get a list of smcard properties.
:rtype: list of :class:`SMCard`
"""
return self._list("/chassis/smcard", filters=filters)

View File

@ -1,91 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
System interface.
"""
from seamicroclient import base
class System(base.Resource):
HUMAN_ID = True
def switchover(self, mxcard=None, **kwargs):
return self.manager.switchover(self, mxcard, **kwargs)
def writemem(self, **kwargs):
return self.manager.writemem(self, **kwargs)
def add_segment(self, vlan_id, **kwargs):
return self.manager.add_segment(self, vlan_id, **kwargs)
def remove_segment(self, vlan_id, **kwargs):
return self.manager.remove_segment(self, vlan_id, **kwargs)
class SystemManager(base.ManagerWithFind):
resource_class = System
def list(self, filters=None):
"""
Get a list of system properties.
:rtype: list of :class:`System`
"""
return self._list("/chassis/system", filters=filters)
def switchover(self, system, mxcard=None, **kwargs):
"""
Switchover system to different mxcard
"""
url = "/chassis/system/switchover"
body = {}
if mxcard is not None:
body = {'newActive': mxcard}
return self.api.client.put(url, body=body)
def writemem(self, system, **kwargs):
"""
Write current system config to flash memory
This will persist even after reboot of chassis
"""
url = "/chassis/system/writeMem"
return self.api.client.put(url, body={})
def reload(self, system, **kwargs):
"""
Reload the chassis and start the boot image
"""
url = "/chassis/system/reload"
return self.api.client.put(url, body={})
def add_segment(self, system, vlan_id, **kwargs):
"""
Create Global Network
"""
url = "/chassis/system/vlans"
if vlan_id is not None:
action_params = {}
action_params.update({'add': vlan_id})
self.run_hooks('modify_body_for_action', action_params, **kwargs)
return self.api.client.put(url, body=action_params)
def remove_segment(self, system, vlan_id, **kwargs):
"""
Remove Global Network
"""
url = "/chassis/system/vlans"
if vlan_id is not None:
action_params = {}
action_params.update({'remove': vlan_id})
self.run_hooks('modify_body_for_action', action_params, **kwargs)
return self.api.client.put(url, body=action_params)

View File

@ -1,88 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Volume interface.
"""
import binascii
import os
from seamicroclient import base
class Volume(base.Resource):
HUMAN_ID = True
def delete(self):
self.manager.delete(self)
class VolumeManager(base.ManagerWithFind):
resource_class = Volume
def get(self, volume):
"""
Get a volume.
:param volume: ID of the :class:`Volume` to get.
:rtype: :class:`Volume`
"""
return self._get(base.getid(volume),
"/storage/volumes/%s" % base.getid(volume))
def list(self):
"""
Get a list of volumes.
:rtype: list of :class:`Volume`
"""
return self._list("/storage/volumes")
def create(self, size, pool, volume_id=None, **kwargs):
"""
Create a volume of the given size in the given pool.
:param volume_id: ID of the :class: `Volume` to create
:param pool: Object of the :class: `Pool` in which the volume will be
created.
:param size: Size of the new volume in GB.
"""
create_params = {}
if volume_id is None:
volume_id = str(binascii.b2a_hex(os.urandom(6)))
if pool and volume_id and size:
create_params = {'volume-size': str(size)}
resource_url = "%s/%s" % (base.getid(pool), volume_id)
return self._create(resource_url, create_params)
def delete(self, volume):
self._delete("/storage/volume/%s" % base.getid(volume))
def _create(self, resource_url, body, **kwargs):
"""
Create a volume
"""
body.update({"action": "create"})
self.run_hooks('modify_body_for_action', body, **kwargs)
url = '/storage/volumes/%s' % resource_url
return self._update(url, body=body)
def _action(self, action, volume, info=None, **kwargs):
"""
Perform a volume "action" -- .
"""
body = {"action": action}
body.update(info)
self.run_hooks('modify_body_for_action', body, **kwargs)
url = '/storage/volumes/%s' % base.getid(volume)
return self.api.client.put(url, body=body)

View File

@ -1,22 +0,0 @@
[metadata]
name = python-seamicroclient
summary = Client library for Seamicro chassis API
description-file =
README.md
license = Apache License, Version 2.0
author = AMD
author-email = harshada.kakad@izeltech.com
home-page = https://github.com/seamicro/python-seamicroclient
classifier =
Development Status :: 4 - Beta
Environment :: Console
Environment :: Web Environment
Intended Audience :: Developers
Intended Audience :: Information Technology
License :: OSI Approved :: Apache Software License
Operating System :: OS Independent
Programming Language :: Python
[files]
packages =
seamicroclient

View File

@ -1,8 +0,0 @@
#!/usr/bin/env python
# THIS FILE IS MANAGED BY THE GLOBAL REQUIREMENTS REPO - DO NOT EDIT
import setuptools
setuptools.setup(
setup_requires=['pbr'],
pbr=True)

View File

@ -1,10 +0,0 @@
hacking>=0.8.0,<0.9
coverage>=3.6
discover
fixtures>=0.3.14
keyring>=1.6.1,<2.0
mock>=1.0
sphinx>=1.1.2,<1.2
testrepository>=0.0.17
testtools>=0.9.32

View File

@ -1,77 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Copyright 2010 OpenStack Foundation
# Copyright 2013 IBM Corp.
# Copyright (c) 2013 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import ConfigParser
import os
import sys
import install_venv_common as install_venv # flake8: noqa
def print_help(project, venv, root):
help = """
%(project)s development environment setup is complete.
%(project)s development uses virtualenv to track and manage Python
dependencies while in development and testing.
To activate the %(project)s virtualenv for the extent of your current
shell session you can run:
$ source %(venv)s/bin/activate
Or, if you prefer, you can run commands in the virtualenv on a case by
case basis by running:
$ %(root)s/tools/with_venv.sh <your command>
"""
print help % dict(project=project, venv=venv, root=root)
def main(argv):
root = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
if os.environ.get('tools_path'):
root = os.environ['tools_path']
venv = os.path.join(root, '.venv')
if os.environ.get('venv'):
venv = os.environ['venv']
pip_requires = os.path.join(root, 'requirements.txt')
test_requires = os.path.join(root, 'test-requirements.txt')
py_version = "python%s.%s" % (sys.version_info[0], sys.version_info[1])
setup_cfg = ConfigParser.ConfigParser()
setup_cfg.read('setup.cfg')
project = setup_cfg.get('metadata', 'name')
install = install_venv.InstallVenv(
root, venv, pip_requires, test_requires, py_version, project)
options = install.parse_args(argv)
install.check_python_version()
install.check_dependencies()
install.create_virtualenv(no_site_packages=options.no_site_packages)
install.install_dependencies()
install.post_process()
print_help(project, venv, root)
if __name__ == '__main__':
main(sys.argv)

View File

@ -1,214 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 OpenStack Foundation
# Copyright 2013 IBM Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Provides methods needed by installation script for OpenStack development
virtual environments.
Since this script is used to bootstrap a virtualenv from the system's Python
environment, it should be kept strictly compatible with Python 2.6.
Synced in from openstack-common
"""
from __future__ import print_function
import optparse
import os
import subprocess
import sys
class InstallVenv(object):
def __init__(self, root, venv, requirements,
test_requirements, py_version,
project):
self.root = root
self.venv = venv
self.requirements = requirements
self.test_requirements = test_requirements
self.py_version = py_version
self.project = project
def die(self, message, *args):
print(message % args, file=sys.stderr)
sys.exit(1)
def check_python_version(self):
if sys.version_info < (2, 6):
self.die("Need Python Version >= 2.6")
def run_command_with_code(self, cmd, redirect_output=True,
check_exit_code=True):
"""Runs a command in an out-of-process shell.
Returns the output of that command. Working directory is self.root.
"""
if redirect_output:
stdout = subprocess.PIPE
else:
stdout = None
proc = subprocess.Popen(cmd, cwd=self.root, stdout=stdout)
output = proc.communicate()[0]
if check_exit_code and proc.returncode != 0:
self.die('Command "%s" failed.\n%s', ' '.join(cmd), output)
return (output, proc.returncode)
def run_command(self, cmd, redirect_output=True, check_exit_code=True):
return self.run_command_with_code(cmd, redirect_output,
check_exit_code)[0]
def get_distro(self):
if (os.path.exists('/etc/fedora-release') or
os.path.exists('/etc/redhat-release')):
return Fedora(
self.root, self.venv, self.requirements,
self.test_requirements, self.py_version, self.project)
else:
return Distro(
self.root, self.venv, self.requirements,
self.test_requirements, self.py_version, self.project)
def check_dependencies(self):
self.get_distro().install_virtualenv()
def create_virtualenv(self, no_site_packages=True):
"""Creates the virtual environment and installs PIP.
Creates the virtual environment and installs PIP only into the
virtual environment.
"""
if not os.path.isdir(self.venv):
print('Creating venv...', end=' ')
if no_site_packages:
self.run_command(['virtualenv', '-q', '--no-site-packages',
self.venv])
else:
self.run_command(['virtualenv', '-q', self.venv])
print('done.')
else:
print("venv already exists...")
pass
def pip_install(self, *args):
self.run_command(['tools/with_venv.sh',
'pip', 'install', '--upgrade'] + list(args),
redirect_output=False)
def install_dependencies(self):
print('Installing dependencies with pip (this can take a while)...')
# First things first, make sure our venv has the latest pip and
# setuptools and pbr
self.pip_install('pip>=1.4')
self.pip_install('setuptools')
self.pip_install('pbr')
self.pip_install('-r', self.requirements)
self.pip_install('-r', self.test_requirements)
def post_process(self):
self.get_distro().post_process()
def parse_args(self, argv):
"""Parses command-line arguments."""
parser = optparse.OptionParser()
parser.add_option('-n', '--no-site-packages',
action='store_true',
help="Do not inherit packages from global Python "
"install")
return parser.parse_args(argv[1:])[0]
class Distro(InstallVenv):
def check_cmd(self, cmd):
return bool(self.run_command(['which', cmd],
check_exit_code=False).strip())
def install_virtualenv(self):
if self.check_cmd('virtualenv'):
return
if self.check_cmd('easy_install'):
print('Installing virtualenv via easy_install...', end=' ')
if self.run_command(['easy_install', 'virtualenv']):
print('Succeeded')
return
else:
print('Failed')
self.die('ERROR: virtualenv not found.\n\n%s development'
' requires virtualenv, please install it using your'
' favorite package management tool' % self.project)
def post_process(self):
"""Any distribution-specific post-processing gets done here.
In particular, this is useful for applying patches to code inside
the venv.
"""
pass
class Fedora(Distro):
"""This covers all Fedora-based distributions.
Includes: Fedora, RHEL, CentOS, Scientific Linux
"""
def check_pkg(self, pkg):
return self.run_command_with_code(['rpm', '-q', pkg],
check_exit_code=False)[1] == 0
def apply_patch(self, originalfile, patchfile):
self.run_command(['patch', '-N', originalfile, patchfile],
check_exit_code=False)
def install_virtualenv(self):
if self.check_cmd('virtualenv'):
return
if not self.check_pkg('python-virtualenv'):
self.die("Please install 'python-virtualenv'.")
super(Fedora, self).install_virtualenv()
def post_process(self):
"""Workaround for a bug in eventlet.
This currently affects RHEL6.1, but the fix can safely be
applied to all RHEL and Fedora distributions.
This can be removed when the fix is applied upstream.
Nova: https://bugs.launchpad.net/nova/+bug/884915
Upstream: https://bitbucket.org/eventlet/eventlet/issue/89
RHEL: https://bugzilla.redhat.com/958868
"""
if os.path.exists('contrib/redhat-eventlet.patch'):
# Install "patch" program if it's not there
if not self.check_pkg('patch'):
self.die("Please install 'patch'.")
# Apply the eventlet patch
self.apply_patch(os.path.join(self.venv, 'lib', self.py_version,
'site-packages',
'eventlet/green/subprocess.py'),
'contrib/redhat-eventlet.patch')

View File

@ -1,4 +0,0 @@
#!/bin/bash
TOOLS=`dirname $0`
VENV=$TOOLS/../.venv
source $VENV/bin/activate && $@

33
tox.ini
View File

@ -1,33 +0,0 @@
[tox]
envlist = py33,py34,py26,py27,pypy,pep8
minversion = 1.6
skipsdist = True
[testenv]
usedevelop = True
install_command = pip install -U {opts} {packages}
setenv = VIRTUAL_ENV={envdir}
LANG=en_US.UTF-8
LANGUAGE=en_US:en
LC_ALL=C
deps = -r{toxinidir}/requirements.txt
-r{toxinidir}/test-requirements.txt
commands = python setup.py testr --testr-args='{posargs}'
[testenv:pep8]
commands = flake8
[testenv:venv]
commands = {posargs}
[testenv:cover]
commands = python setup.py testr --coverage --testr-args='{posargs}'
[tox:jenkins]
downloadcache = ~/cache/pip
[flake8]
ignore = E12,F841,F811,F821,H302,H404
show-source = True
exclude=.venv,.git,.tox,dist,doc,*openstack/common*,*lib/python*,*egg,build