Working script to create a jenkins server

I'm still short of adding the port and mapping the floating ip,
but close.  I also intended to use userdata to do an apt-get
install jenkins.

Change-Id: If86e3f381bb951de48cc984c0e863b896ed33353
This commit is contained in:
Terry Howe 2014-10-26 09:04:47 -06:00
parent 8baa156ea6
commit dede226a4e
26 changed files with 1666 additions and 45 deletions

53
examples/cloud-init.sh Normal file
View File

@ -0,0 +1,53 @@
#!/bin/bash -x
echo '*** start cloud-init ***'
wget -q -O - https://jenkins-ci.org/debian/jenkins-ci.org.key | sudo apt-key add -
echo deb http://pkg.jenkins-ci.org/debian binary/ > /etc/apt/sources.list.d/jenkins.list
apt-get update
apt-get install -y jenkins
echo 'JENKINS_ARGS="${JENKINS_ARGS} --argumentsRealm.passwd.jenkins=admin --argumentsRealm.roles.jenkins=admin"' >> /etc/default/jenkins
cat >/var/lib/jenkins/config.xml <<!
<?xml version='1.0' encoding='UTF-8'?>
<hudson>
<disabledAdministrativeMonitors/>
<version>1.0</version>
<numExecutors>2</numExecutors>
<mode>NORMAL</mode>
<useSecurity>true</useSecurity>
<authorizationStrategy class="hudson.security.LegacyAuthorizationStrategy"/>
<securityRealm class="hudson.security.LegacySecurityRealm"/>
<disableRememberMe>false</disableRememberMe>
<projectNamingStrategy class="jenkins.model.ProjectNamingStrategy\$DefaultProjectNamingStrategy"/>
<workspaceDir>\${ITEM_ROOTDIR}/workspace</workspaceDir>
<buildsDir>\${ITEM_ROOTDIR}/builds</buildsDir>
<markupFormatter class="hudson.markup.EscapedMarkupFormatter"/>
<jdks/>
<viewsTabBar class="hudson.views.DefaultViewsTabBar"/>
<myViewsTabBar class="hudson.views.DefaultMyViewsTabBar"/>
<clouds/>
<slaves/>
<scmCheckoutRetryCount>0</scmCheckoutRetryCount>
<views>
<hudson.model.AllView>
<owner class="hudson" reference="../../.."/>
<name>All</name>
<filterExecutors>false</filterExecutors>
<filterQueue>false</filterQueue>
<properties class="hudson.model.View\$PropertyList"/>
</hudson.model.AllView>
</views>
<primaryView>All</primaryView>
<slaveAgentPort>0</slaveAgentPort>
<label></label>
<nodeProperties/>
<globalNodeProperties/>
</hudson>
!
cat >/var/lib/jenkins/jenkins.security.QueueItemAuthenticatorConfiguration.xml <<!
<?xml version='1.0' encoding='UTF-8'?>
<jenkins.security.QueueItemAuthenticatorConfiguration>
<authenticators/>
</jenkins.security.QueueItemAuthenticatorConfiguration>
!
service jenkins restart
echo '*** stop cloud-init ***'\n"

View File

@ -291,7 +291,7 @@ def option_parser():
parser.add_argument(
'--data',
metavar='<data>',
default='{}',
default={},
help='Json data for command.',
)
parser.add_argument(

55
examples/connection.py Normal file
View File

@ -0,0 +1,55 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Example Connection Command
Make sure you can authenticate before running this command.
For example:
python -m examples.connection
"""
import sys
from examples import common
from openstack import connection
def make_connection(opts):
args = {
'auth_plugin': opts.auth_plugin,
'auth_url': opts.auth_url,
'project_name': opts.project_name,
'domain_name': opts.domain_name,
'project_domain_name': opts.project_domain_name,
'user_domain_name': opts.user_domain_name,
'user_name': opts.user_name,
'password': opts.password,
'verify': opts.verify,
'token': opts.token,
}
conn = connection.Connection(preference=opts.user_preferences, **args)
return conn
def run_connection(opts):
conn = make_connection(opts)
print("Connection: %s" % conn)
for flavor in conn.compute.list_flavors():
print(flavor.id + " " + flavor.name)
return
if __name__ == "__main__":
opts = common.setup()
sys.exit(common.main(opts, run_connection))

300
examples/jenkins.py Normal file
View File

@ -0,0 +1,300 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Example Create a jenkins server
Create all the pieces parts to get a jenkins server up and running.
To run:
python examples/jenkins.py
"""
import base64
import os
import sys
from examples import common
from openstack import connection
from openstack import exceptions
def create_jenkins(opts):
name = opts.data.pop('name', 'jenkins')
dns_nameservers = opts.data.pop('dns_nameservers', '206.164.176.34')
cidr = opts.data.pop('cidr', '10.3.3.0/24')
flavor = opts.data.pop('flavor', '103')
image = opts.data.pop('image', 'bec3cab5-4722-40b9-a78a-3489218e22fe')
args = vars(opts)
conn = connection.Connection(preference=opts.user_preferences, **args)
try:
network = conn.network.find_network(name)
except exceptions.ResourceNotFound:
network = conn.network.create_network(name=name)
print(str(network))
try:
subnet = conn.network.find_subnet(name)
except exceptions.ResourceNotFound:
args = {
"name": name,
"network_id": network.id,
"ip_version": "4",
"dns_nameservers": [dns_nameservers],
"cidr": cidr,
}
subnet = conn.network.create_subnet(**args)
print(str(subnet))
try:
router = conn.network.find_router(name)
except exceptions.ResourceNotFound:
extnet = conn.network.find_network("Ext-Net")
args = {
"name": name,
"external_gateway_info": {"network_id": extnet.id}
}
router = conn.network.create_router(**args)
conn.network.router_add_interface(router, subnet.id)
print(str(router))
try:
sg = conn.network.find_security_group(name)
except exceptions.ResourceNotFound:
sg = conn.network.create_security_group(name=name)
print(str(sg))
rule = {
'direction': 'ingress',
'remote_ip_prefix': '0.0.0.0/0',
'protocol': 'tcp',
'port_range_max': 9022,
'port_range_min': 9022,
'security_group_id': sg.id,
'ethertype': 'IPv4'
}
conn.network.create_security_group_rule(**rule)
print('rule allow 9022')
rule = {
'direction': 'ingress',
'remote_ip_prefix': '0.0.0.0/0',
'protocol': 'tcp',
'port_range_max': 443,
'port_range_min': 443,
'security_group_id': sg.id,
'ethertype': 'IPv4'
}
conn.network.create_security_group_rule(**rule)
print('rule allow HTTPS')
rule = {
'direction': 'ingress',
'remote_ip_prefix': '0.0.0.0/0',
'protocol': 'icmp',
'port_range_max': None,
'port_range_min': None,
'security_group_id': sg.id,
'ethertype': 'IPv4'
}
conn.network.create_security_group_rule(**rule)
print('rule allow ping')
rule = {
'direction': 'ingress',
'remote_ip_prefix': '0.0.0.0/0',
'protocol': 'tcp',
'port_range_max': 80,
'port_range_min': 80,
'security_group_id': sg.id,
'ethertype': 'IPv4'
}
conn.network.create_security_group_rule(**rule)
print('rule allow HTTP')
rule = {
'direction': 'ingress',
'remote_ip_prefix': None,
'protocol': None,
'port_range_max': None,
'port_range_min': None,
'security_group_id': sg.id,
'ethertype': 'IPv6'
}
conn.network.create_security_group_rule(**rule)
print('rule allow IPv6')
rule = {
'direction': 'ingress',
'remote_ip_prefix': '0.0.0.0/0',
'protocol': 'tcp',
'port_range_max': 8080,
'port_range_min': 8080,
'security_group_id': sg.id,
'ethertype': 'IPv4'
}
conn.network.create_security_group_rule(**rule)
print('rule allow 8080')
rule = {
'direction': 'ingress',
'remote_ip_prefix': '0.0.0.0/0',
'protocol': 'tcp',
'port_range_max': 4222,
'port_range_min': 4222,
'security_group_id': sg.id,
'ethertype': 'IPv4'
}
conn.network.create_security_group_rule(**rule)
print('rule allow 4222')
rule = {
'direction': 'ingress',
'remote_ip_prefix': '0.0.0.0/0',
'protocol': 'tcp',
'port_range_max': 22,
'port_range_min': 22,
'security_group_id': sg.id,
'ethertype': 'IPv4'
}
conn.network.create_security_group_rule(**rule)
print('rule allow ssh')
print(str(sg))
try:
kp = conn.compute.find_keypair(name)
except exceptions.ResourceNotFound:
kp = conn.compute.create_keypair(name=name)
try:
os.remove('jenkins')
except OSError:
pass
try:
os.remove('jenkins.pub')
except OSError:
pass
print(str(kp))
f = open('jenkins', 'w')
f.write("%s" % kp.private_key)
f.close()
f = open('jenkins.pub', 'w')
f.write("%s" % kp.public_key)
f.close()
print(str(kp))
try:
server = conn.compute.find_server(name)
server = conn.get(server)
except exceptions.ResourceNotFound:
f = open('examples/cloud-init.sh', 'r')
cmd = f.read()
f.close()
b64str = base64.b64encode(cmd)
args = {
"name": name,
"flavorRef": flavor,
"imageRef": image,
"imageRef": image,
"key_name": 'brat',
"networks": [{"uuid": network.id}],
"user_data": b64str,
}
server = conn.compute.create_server(**args)
print(str(server))
print('Waiting for the server to come up....')
conn.compute.wait_for_status(server)
print('Server is up.')
if len(server.get_floating_ips()) <= 0:
try:
ip = conn.network.find_available_ip()
except exceptions.ResourceNotFound:
ip = conn.network.create_ip(floating_network_id=network.id)
port = conn.network.list_ports(device_id=server.id, fields='id')[0]
conn.network.add_ip_to_port(port, ip)
print(str(port))
ip = conn.get(ip)
print("ssh -i jenkins ubuntu@%s" % ip.floating_ip_address)
print("http://%s:8080" % ip.floating_ip_address)
return
def delete_jenkins(opts):
name = opts.data.pop('name', 'jenkins')
args = vars(opts)
conn = connection.Connection(preference=opts.user_preferences, **args)
try:
server = conn.compute.find_server(name)
server = conn.get(server)
print(str(server))
ips = server.get_floating_ips()
for ip in ips:
print(str(ip))
ip = conn.network.find_ip(ip)
conn.network.remove_ip_from_port(ip)
conn.delete(ip)
conn.delete(server)
except exceptions.ResourceNotFound:
pass
try:
kp = conn.compute.find_keypair(name)
print(str(kp))
conn.delete(kp)
except exceptions.ResourceNotFound:
pass
try:
router = conn.network.find_router(name)
print(str(router))
except exceptions.ResourceNotFound:
router = None
pass
try:
subnet = conn.network.find_subnet(name)
print(str(subnet))
if router:
conn.network.router_remove_interface(router, subnet.id)
for port in conn.network.get_subnet_ports(subnet.id):
print(str(port))
conn.delete(port)
except exceptions.ResourceNotFound:
subnet = None
pass
try:
if router:
conn.delete(router)
except exceptions.ResourceNotFound:
pass
try:
if subnet:
conn.delete(subnet)
except exceptions.ResourceNotFound:
pass
try:
network = conn.network.find_network(name)
print(str(network))
conn.delete(network)
except exceptions.ResourceNotFound:
pass
def run_jenkins(opts):
argument = opts.argument
if argument == "delete":
return(delete_jenkins(opts))
return(create_jenkins(opts))
if __name__ == "__main__":
opts = common.setup()
sys.exit(common.main(opts, run_jenkins))

102
examples/object_store.py Normal file
View File

@ -0,0 +1,102 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import print_function
import glob
import os
import sys
from examples import common
from openstack import connection
CONTAINER_HEADER = ("Name{0}| Bytes Used{1}| "
"Num Objects".format(13 * " ", 1 * " "))
CONTAINER_FORMAT = ("{0.name: <16} | {0.bytes: <10} | {0.count}")
OBJECT_HEADER = ("Name{0}| Bytes {1}| "
"Content-Type".format(27 * " ", 2 * " "))
OBJECT_FORMAT = ("{0.name: <30} | {0.bytes: <7} | {0.content_type}")
def list_containers(conn):
print(CONTAINER_HEADER)
print("=" * len(CONTAINER_HEADER))
for container in conn.object_store.list_containers():
print(CONTAINER_FORMAT.format(container))
def list_objects(conn, container):
print(OBJECT_HEADER)
print("=" * len(OBJECT_HEADER))
for obj in conn.object_store.list_objects(container):
print(OBJECT_FORMAT.format(obj))
def upload_directory(conn, directory, pattern):
"""Upload a directory to object storage.
Given an OpenStack connection, a directory, and a file glob pattern,
upload all files matching the pattern from that directory into a
container named after the directory containing the files.
"""
container_name = os.path.basename(os.path.realpath(directory))
container = conn.object_store.create_container(container_name)
for root, dirs, files in os.walk(directory):
for file in glob.iglob(os.path.join(root, pattern)):
with open(file, "rb") as f:
ob = conn.object_store.create_object(container=container,
name=file, data=f.read())
print("Uploaded {0.name}".format(ob))
def main():
# Add on to the common parser with a few options of our own.
parser = common.option_parser()
parser.add_argument("--list-containers", dest="list_containers",
action="store_true")
parser.add_argument("--list-objects", dest="container")
parser.add_argument("--upload-directory", dest="directory")
parser.add_argument("--pattern", dest="pattern")
opts = parser.parse_args()
args = {
'auth_plugin': opts.auth_plugin,
'auth_url': opts.auth_url,
'project_name': opts.project_name,
'domain_name': opts.domain_name,
'project_domain_name': opts.project_domain_name,
'user_domain_name': opts.user_domain_name,
'user_name': opts.user_name,
'password': opts.password,
'verify': opts.verify,
'token': opts.token,
}
conn = connection.Connection(**args)
if opts.list_containers:
return list_containers(conn)
elif opts.container:
return list_objects(conn, opts.container)
elif opts.directory and opts.pattern:
return upload_directory(conn, opts.directory.decode("utf8"),
opts.pattern)
else:
print(parser.print_help())
return -1
if __name__ == "__main__":
sys.exit(main())

View File

@ -11,6 +11,8 @@
# under the License.
from openstack.compute.v2 import flavor
from openstack.compute.v2 import keypairs
from openstack.compute.v2 import server
class Proxy(object):
@ -18,5 +20,59 @@ class Proxy(object):
def __init__(self, session):
self.session = session
def create_flavor(self, **data):
return flavor.Flavor(data).create(self.session)
def delete_flavor(self, **data):
flavor.Flavor(data).delete(self.session)
def find_flavor(self, name_or_id):
return flavor.Flavor.find(self.session, name_or_id)
def get_flavor(self, **data):
return flavor.Flavor(data).get(self.session)
def list_flavors(self, **params):
return flavor.Flavor.list(self.session, **params)
def update_flavor(self, **data):
return flavor.Flavor(data).update(self.session)
def create_keypair(self, **data):
return keypairs.Keypairs(data).create(self.session)
def delete_keypair(self, **data):
keypairs.Keypairs(data).delete(self.session)
def get_keypair(self, **data):
return keypairs.Keypairs(data).get(self.session)
def find_keypair(self, name_or_id):
return keypairs.Keypairs.find(self.session, name_or_id)
def list_keypairs(self, **params):
return keypairs.Keypairs.list(self.session, **params)
def update_keypair(self, **data):
return keypairs.Keypairs(data).update(self.session)
def create_server(self, **data):
return server.Server(data).create(self.session)
def delete_server(self, **data):
server.Server(data).delete(self.session)
def find_server(self, name_or_id):
return server.Server.find(self.session, name_or_id)
def get_server(self, **data):
return server.Server(data).get(self.session)
def list_servers(self):
return server.Server.list(self.session)
def update_server(self, **data):
return server.Server(data).update(self.session)
def wait_for_status(self, server, status='ACTIVE', interval=2, wait=120):
return server.wait_for_status(self.session, status, interval, wait)

View File

@ -11,12 +11,14 @@
# under the License.
from openstack.compute import compute_service
from openstack import exceptions
from openstack import resource
class Keypairs(resource.Resource):
id_attribute = 'fingerprint'
resource_key = 'keypairs'
id_attribute = 'name'
name_attribute = 'fingerprint'
resource_key = 'keypair'
resources_key = 'keypairs'
base_path = '/os-keypairs'
service = compute_service.ComputeService()
@ -31,4 +33,33 @@ class Keypairs(resource.Resource):
# Properties
fingerprint = resource.prop('fingerprint')
name = resource.prop('name')
private_key = resource.prop('private_key')
public_key = resource.prop('public_key')
def __init__(self, attrs=None, loaded=False):
if attrs is not None:
if 'keypair' in attrs:
attrs = attrs['keypair']
super(Keypairs, self).__init__(attrs, loaded)
def create(self, session):
"""Create a new keypair from this instance.
This is needed because the name is the id, but we can't create one
with a PUT. That and we need the private_key out of the response.
"""
resp = self.create_by_id(session, self._attrs, None, path_args=self)
self._attrs = resp
self._reset_dirty()
return self
@classmethod
def find(cls, session, name_or_id, path_args=None, id_only=True):
"""Find a keypair by name because list filtering does not work."""
try:
return cls.get_by_id(session, name_or_id)
except exceptions.HttpException:
pass
msg = ("No %s with a name or ID of '%s' exists." %
(cls.get_resource_name(), name_or_id))
raise exceptions.ResourceNotFound(msg)

View File

@ -10,6 +10,8 @@
# License for the specific language governing permissions and limitations
# under the License.
import time
from openstack.compute import compute_service
from openstack.compute.v2 import server_ip
from openstack import resource
@ -109,3 +111,28 @@ class Server(resource.Resource):
action['metadata'] = metadata
body = {'createImage': action}
return self.action(session, body)
def wait_for_status(self, session, status='ACTIVE', interval=5, wait=120):
"""Wait for the server to be in some status."""
try:
if self.status == status:
return True
except AttributeError:
pass
total_sleep = 0
while total_sleep < wait:
self.get(session)
if self.status == status:
return True
time.sleep(interval)
total_sleep += interval
return False
def get_floating_ips(self):
"""Get the floating ips associated with this server."""
addresses = self.addresses[self.name]
result = []
for address in addresses:
if address['OS-EXT-IPS:type'] == 'floating':
result.append(address['addr'])
return result

View File

@ -179,3 +179,48 @@ class Connection(object):
setattr(self, attr_name, proxy(self.session))
except Exception as e:
_logger.warn("Unable to load %s: %s" % (module, e))
def create(self, obj):
"""Create an object.
:param obj: A resource object.
:type resource: :class:`~openstack.resource.Resource`
"""
obj.create(self.session)
return obj
def get(self, obj, include_headers=False):
"""Get an object.
:param obj: A resource object.
:type resource: :class:`~openstack.resource.Resource`
:param bool include_headers: Read object headers.
"""
obj.get(self.session, include_headers)
return obj
def head(self, obj):
"""Get an object.
:param obj: A resource object.
:type resource: :class:`~openstack.resource.Resource`
"""
obj.head(self.session)
return obj
def update(self, obj):
"""Update an object.
:param obj: A resource object.
:type resource: :class:`~openstack.resource.Resource`
"""
obj.update(self.session)
return obj
def delete(self, obj):
"""Delete an object.
:param obj: A resource object.
:type resource: :class:`~openstack.resource.Resource`
"""
obj.delete(self.session)

View File

@ -19,25 +19,19 @@ class Proxy(object):
self.session = session
def create_project(self, **data):
obj = project.Project(**data)
obj.create(self.session)
return obj
return project.Project(data).create(self.session)
def get_project(self, r_id):
obj = project.Project({'id': r_id})
obj.get(self.session)
return obj
def delete_project(self, **data):
project.Project(data).delete(self.session)
def update_project(self, **data):
obj = project.Project(**data)
obj.update(self.session)
def find_project(self, name_or_id):
return project.Project.find(self.session, name_or_id)
def delete_project(self, r_id):
obj = project.Project({'id': r_id})
obj.delete(self.session)
def get_project(self, **data):
return project.Project(data).get(self.session)
def list_projects(self, **params):
return project.Project.list(self.session, **params)
def find_project(self, name_or_id):
return project.Project.find(self.session, name_or_id)
def update_project(self, **data):
return project.Project(**data).update(self.session)

View File

@ -10,8 +10,28 @@
# License for the specific language governing permissions and limitations
# under the License.
from openstack.image.v1 import image
class Proxy(object):
def __init__(self, session):
self.session = session
def create_image(self, **data):
return image.Image(data).create(self.session)
def delete_image(self, **data):
return image.Image(data).delete(self.session)
def find_image(self, name_or_id):
return image.Image.find(self.session, name_or_id)
def get_image(self, **data):
return image.Image(data).get(self.session)
def list_images(self, **params):
return image.Image.list(self.session, **params)
def update_image(self, **data):
return image.Image(data).update(self.session)

View File

@ -10,8 +10,174 @@
# License for the specific language governing permissions and limitations
# under the License.
from openstack.network.v2 import floatingip
from openstack.network.v2 import network
from openstack.network.v2 import port
from openstack.network.v2 import router
from openstack.network.v2 import security_group
from openstack.network.v2 import security_group_rule
from openstack.network.v2 import subnet
class Proxy(object):
def __init__(self, session):
self.session = session
def create_ip(self, **data):
return floatingip.FloatingIP(data).create(self.session)
def delete_ip(self, **data):
floatingip.FloatingIP(**data).delete(self.session)
def find_available_ip(self):
return floatingip.FloatingIP.find_available(self.session)
def find_ip(self, name_or_id):
return floatingip.FloatingIP.find(self.session, name_or_id)
def get_ip(self, **data):
return floatingip.FloatingIP(**data).get(self.session)
def list_ips(self, **params):
return floatingip.FloatingIP.list(self.session, **params)
def update_ip(self, **data):
return floatingip.FloatingIP(**data).update(self.session)
def create_network(self, **data):
return network.Network(data).create(self.session)
def delete_network(self, **data):
network.Network(data).delete(self.session)
def find_network(self, name_or_id):
return network.Network.find(self.session, name_or_id)
def get_network(self, **data):
return network.Network(data).get(self.session)
def list_networks(self, **params):
return network.Network.list(self.session, **params)
def update_network(self, **data):
return network.Network(data).update(self.session)
def create_port(self, **data):
return port.Port(data).create(self.session)
def delete_port(self, **data):
return port.Port(data).delete(self.session)
def find_port(self, name_or_id):
return port.Port.find(self.session, name_or_id)
def get_port(self, **data):
return port.Port(data).get(self.session)
def list_ports(self, **params):
return port.Port.list(self.session, **params)
def update_port(self, **data):
return port.Port(data).update(self.session)
def add_ip_to_port(self, port, ip):
ip['port_id'] = port.id
return ip.update(self.session)
def remove_ip_from_port(self, ip):
ip['port_id'] = None
return ip.update(self.session)
def get_subnet_ports(self, subnet_id):
result = []
ports = self.list_ports()
for puerta in ports:
for fixed_ip in puerta.fixed_ips:
if fixed_ip['subnet_id'] == subnet_id:
result.append(puerta)
return result
def create_router(self, **data):
return router.Router(data).create(self.session)
def delete_router(self, **data):
return router.Router(**data).delete(self.session)
def find_router(self, name_or_id):
return router.Router.find(self.session, name_or_id)
def get_router(self, **data):
return router.Router(**data).get(self.session)
def list_routers(self, **params):
return router.Router.list(self.session, **params)
def update_router(self, **data):
return router.Router(**data).update(self.session)
def router_add_interface(self, router, subnet_id):
router.add_interface(self.session, subnet_id)
def router_remove_interface(self, router, subnet_id):
router.remove_interface(self.session, subnet_id)
def create_security_group(self, **data):
return security_group.SecurityGroup(data).create(self.session)
def delete_security_group(self, **data):
return security_group.SecurityGroup(**data).delete(self.session)
def find_security_group(self, name_or_id):
return security_group.SecurityGroup.find(self.session, name_or_id)
def get_security_group(self, **data):
return security_group.SecurityGroup(**data).get(self.session)
def list_security_groups(self, **params):
return security_group.SecurityGroup.list(self.session, **params)
def update_security_group(self, **data):
return security_group.SecurityGroup(**data).update(self.session)
def create_security_group_rule(self, **data):
obj = security_group_rule.SecurityGroupRule(data)
return obj.create(self.session)
def delete_security_group_rule(self, **data):
obj = security_group_rule.SecurityGroupRule(**data)
return obj.delete(self.session)
def find_security_group_rule(self, name_or_id):
return security_group_rule.SecurityGroupRule.find(self.session,
name_or_id)
def get_security_group_rule(self, **data):
obj = security_group_rule.SecurityGroupRule(**data)
return obj.get(self.session)
def list_security_group_rules(self, **params):
return security_group_rule.SecurityGroupRule.list(self.session,
**params)
def update_security_group_rule(self, **data):
obj = security_group_rule.SecurityGroupRule(**data)
return obj.update(self.session)
def create_subnet(self, **data):
return subnet.Subnet(data).create(self.session)
def delete_subnet(self, **data):
return subnet.Subnet(**data).delete(self.session)
def find_subnet(self, name_or_id):
return subnet.Subnet.find(self.session, name_or_id)
def get_subnet(self, **data):
return subnet.Subnet(**data).get(self.session)
def list_subnets(self, **params):
return subnet.Subnet.list(self.session, **params)
def update_subnet(self, **data):
return subnet.Subnet(**data).update(self.session)

View File

@ -16,8 +16,7 @@ from openstack import resource
class FloatingIP(resource.Resource):
id_attribute = "floating_ip_address"
name_attribute = None
name_attribute = "floating_ip_address"
resource_name = "floating ip"
resource_key = 'floatingip'
resources_key = 'floatingips'
@ -30,6 +29,7 @@ class FloatingIP(resource.Resource):
allow_update = True
allow_delete = True
allow_list = True
patch_update = False
# Properties
fixed_ip_address = resource.prop('fixed_ip_address')

View File

@ -38,7 +38,7 @@ class Port(resource.Resource):
device_id = resource.prop('device_id')
device_owner = resource.prop('device_owner')
extra_dhcp_opts = resource.prop('extra_dhcp_opts', type=dict)
fixed_ips = resource.prop('fixed_ips', type=dict)
fixed_ips = resource.prop('fixed_ips')
mac_address = resource.prop('mac_address')
name = resource.prop('name')
network_id = resource.prop('network_id')

View File

@ -10,8 +10,184 @@
# License for the specific language governing permissions and limitations
# under the License.
import six
from openstack.object_store.v1 import container as _container
from openstack.object_store.v1 import obj as _obj
class Proxy(object):
def __init__(self, session):
self.session = session
def get_account_metadata(self, container=None):
"""Get metatdata for this account.
:param container:
:type Container: :class:`~openstack.object_store.v1.container`
"""
if container is None:
container = _container.Container()
container.head(self.session)
return container
def set_account_metadata(self, container):
"""Set metatdata for this account.
:param container:
:type Container: :class:`~openstack.object_store.v1.container`
"""
container.update(self.session)
return container
def list_containers(self):
"""List containers for this account."""
container = _container.Container()
return container.list(self.session)
def get_container_metadata(self, container):
"""Get metatdata for a container.
:param container:
:type Container: :class:`~openstack.object_store.v1.container`
"""
container.head(self.session)
return container
def set_container_metadata(self, container):
"""Set metatdata for a container.
:param container:
:type Container: :class:`~openstack.object_store.v1.container`
"""
container.create(self.session)
return container
def create_container(self, container):
"""Create a container,
:param container:
:type Container: :class:`~openstack.object_store.v1.container`
"""
if isinstance(container, six.text_type):
cont = _container.Container()
cont.name = container
cont.create(self.session)
return cont
container.create(self.session)
return container
def delete_container(self, container):
"""Delete a container.
:param container:
:type Container: :class:`~openstack.object_store.v1.container`
"""
if isinstance(container, six.text_type):
cont = _container.Container()
cont.name = container
cont.delete(self.session)
container.delete(self.session)
def list_objects(self, container):
"""List objects inside a container.
:param container:
:type Container: :class:`~openstack.object_store.v1.container`
"""
cont_name = getattr(container, "name", None)
if cont_name is None:
cont_name = container
obj = _obj.Object()
objs = obj.list(self.session, path_args={"container": cont_name})
# TODO(brian): Objects have to know their container at this point,
# otherwise further operations like getting their metadata
# or downloading them is a hassle because the end-user would have
# to maintain both the container and the object separately.
for ob in objs:
ob.container = cont_name
return objs
def get_object_data(self, obj):
"""Retreive the data contained inside an object.
:param obj:
:type Object: :class:`~openstack.object_store.v1.obj`
"""
return obj.get(self.session)
def save_object(self, obj, path):
"""Save the data contained inside an object to disk.
:param obj:
:type Object: :class:`~openstack.object_store.v1.obj`
"param path str: Location to write the object contents.
"""
with open(path, "w") as out:
out.write(self.get_object_data(obj))
def create_object(self, **kwargs):
"""Create an object."""
# Have to have data
data = kwargs.pop("data")
# If we're given an Object...
obj = kwargs.pop("obj", False)
if obj:
obj.create(self.session, data)
return obj
# If we're given a container to house the object...
# Could be a string name, could be a Container object.
container = kwargs.pop("container", False)
container_name = getattr(container, "name", None)
if container_name is None:
container_name = container
cont = _container.Container()
cont.name = container_name
cont.create(self.session)
name = kwargs.pop("name", False)
if not name:
raise ValueError("need a `name` argument with `container`")
ob = _obj.Object()
ob.container = cont.name
ob.name = name
ob.create(self.session, data)
return ob
def copy_object(self):
"""Copy an object."""
raise NotImplementedError
def delete_object(self, obj):
"""Delete an object.
:param obj:
:type Object: :class:`~openstack.object_store.v1.obj`
"""
obj.delete(self.session)
def get_object_metadata(self, obj):
"""Get metatdata for an object.
:param obj:
:type Object: :class:`~openstack.object_store.v1.obj`
"""
obj.head(self.session)
return obj
def set_object_metadata(self, obj):
"""Set metatdata for an object.
:param obj:
:type Object: :class:`~openstack.object_store.v1.obj`
"""
obj.create(self.session)
return obj

View File

@ -91,3 +91,19 @@ class Object(resource.Resource):
headers=headers).content
return resp
def create(self, session, data=None):
"""Create a remote resource from this instance."""
if not self.allow_create:
raise exceptions.MethodNotSupported('create')
url = utils.urljoin("", self.base_path % self, self.id)
if data is not None:
resp = session.put(url, service=self.service, data=data,
accept="bytes").headers
else:
resp = session.post(url, service=self.service, data=None,
accept=None).headers
self._attrs.update(resp)

View File

@ -165,6 +165,8 @@ class Resource(collections.MutableMapping):
#: Allow head operation for this resource.
allow_head = False
patch_update = True
def __init__(self, attrs=None, loaded=False):
if attrs is None:
attrs = {}
@ -295,6 +297,7 @@ class Resource(collections.MutableMapping):
resp = self.create_by_id(session, self._attrs, self.id, path_args=self)
self._attrs[self.id_attribute] = resp[self.id_attribute]
self._reset_dirty()
return self
@classmethod
def get_data_by_id(cls, session, r_id, path_args=None,
@ -332,6 +335,7 @@ class Resource(collections.MutableMapping):
include_headers=include_headers)
self._attrs.update(body)
self._loaded = True
return self
@classmethod
def head_data_by_id(cls, session, r_id, path_args=None):
@ -377,7 +381,10 @@ class Resource(collections.MutableMapping):
else:
url = cls.base_path
url = utils.urljoin(url, r_id)
resp = session.patch(url, service=cls.service, json=body).body
if cls.patch_update:
resp = session.patch(url, service=cls.service, json=body).body
else:
resp = session.put(url, service=cls.service, json=body).body
if cls.resource_key:
resp = resp[cls.resource_key]
@ -400,6 +407,7 @@ class Resource(collections.MutableMapping):
assert resp_id == self.id
self._reset_dirty()
return self
@classmethod
def delete_by_id(cls, session, r_id, path_args=None):
@ -449,22 +457,21 @@ class Resource(collections.MutableMapping):
return [cls.existing(**data) for data in resp]
@classmethod
def find(cls, session, name_or_id, path_args=None):
def find(cls, session, name_or_id, path_args=None, id_only=True):
"""Find a resource by name or id as an instance."""
try:
args = {
cls.id_attribute: name_or_id,
'fields': cls.id_attribute,
'path_args': path_args,
}
info = cls.list(session, **args)
params = {cls.id_attribute: name_or_id}
if id_only:
params['fields'] = cls.id_attribute
info = cls.list(session, path_args=path_args, **params)
if len(info) == 1:
return info[0]
except exceptions.HttpException:
pass
if cls.name_attribute:
params = {cls.name_attribute: name_or_id,
'fields': cls.id_attribute}
params = {cls.name_attribute: name_or_id}
if id_only:
params['fields'] = cls.id_attribute
info = cls.list(session, path_args=path_args, **params)
if len(info) == 1:
return info[0]

View File

@ -10,15 +10,18 @@
# License for the specific language governing permissions and limitations
# under the License.
import mock
import testtools
from openstack.compute.v2 import keypairs
from openstack import exceptions
IDENTIFIER = 'IDENTIFIER'
EXAMPLE = {
'fingerprint': '1',
'name': '2',
'public_key': '3',
'keypair': {
'fingerprint': '1',
'name': '2',
'public_key': '3',
}
}
@ -26,7 +29,7 @@ class TestKeypairs(testtools.TestCase):
def test_basic(self):
sot = keypairs.Keypairs()
self.assertEqual('keypairs', sot.resource_key)
self.assertEqual('keypair', sot.resource_key)
self.assertEqual('keypairs', sot.resources_key)
self.assertEqual('/os-keypairs', sot.base_path)
self.assertEqual('compute', sot.service.service_type)
@ -38,6 +41,27 @@ class TestKeypairs(testtools.TestCase):
def test_make_it(self):
sot = keypairs.Keypairs(EXAMPLE)
self.assertEqual(EXAMPLE['fingerprint'], sot.fingerprint)
self.assertEqual(EXAMPLE['name'], sot.name)
self.assertEqual(EXAMPLE['public_key'], sot.public_key)
self.assertEqual(EXAMPLE['keypair']['fingerprint'], sot.fingerprint)
self.assertEqual(EXAMPLE['keypair']['name'], sot.name)
self.assertEqual(EXAMPLE['keypair']['public_key'], sot.public_key)
def test_find(self):
resp = mock.Mock()
resp.body = EXAMPLE
sess = mock.Mock()
sess.get = mock.MagicMock()
sess.get.return_value = resp
sot = keypairs.Keypairs()
result = sot.find(sess, "kato")
url = 'os-keypairs/kato'
sess.get.assert_called_with(url, service=sot.service)
self.assertEqual(EXAMPLE['keypair']['fingerprint'], result.fingerprint)
self.assertEqual(EXAMPLE['keypair']['name'], result.name)
self.assertEqual(EXAMPLE['keypair']['public_key'], result.public_key)
def test_find_not_found(self):
sess = mock.Mock()
sess.get = mock.MagicMock()
sess.get.side_effect = exceptions.HttpException("404")
sot = keypairs.Keypairs()
self.assertRaises(exceptions.ResourceNotFound, sot.find, sess, "kato")

View File

@ -0,0 +1,62 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from openstack.compute.v2 import _proxy
from openstack.tests import test_proxy_base
class TestComputeProxy(test_proxy_base.TestProxyBase):
def setUp(self):
super(TestComputeProxy, self).setUp()
self.proxy = _proxy.Proxy(self.session)
def test_flavor(self):
self.verify_create('openstack.compute.v2.flavor.Flavor.create',
self.proxy.create_flavor)
self.verify_delete('openstack.compute.v2.flavor.Flavor.delete',
self.proxy.delete_flavor)
self.verify_find('openstack.compute.v2.flavor.Flavor.find',
self.proxy.find_flavor)
self.verify_get('openstack.compute.v2.flavor.Flavor.get',
self.proxy.get_flavor)
self.verify_list('openstack.compute.v2.flavor.Flavor.list',
self.proxy.list_flavors)
self.verify_update('openstack.compute.v2.flavor.Flavor.update',
self.proxy.update_flavor)
def test_keypair(self):
self.verify_create('openstack.compute.v2.keypairs.Keypairs.create',
self.proxy.create_keypair)
self.verify_delete('openstack.compute.v2.keypairs.Keypairs.delete',
self.proxy.delete_keypair)
self.verify_find('openstack.compute.v2.keypairs.Keypairs.find',
self.proxy.find_keypair)
self.verify_get('openstack.compute.v2.keypairs.Keypairs.get',
self.proxy.get_keypair)
self.verify_list('openstack.compute.v2.keypairs.Keypairs.list',
self.proxy.list_keypairs)
self.verify_update('openstack.compute.v2.keypairs.Keypairs.update',
self.proxy.update_keypair)
def test_server(self):
self.verify_create('openstack.compute.v2.server.Server.create',
self.proxy.create_server)
self.verify_delete('openstack.compute.v2.server.Server.delete',
self.proxy.delete_server)
self.verify_find('openstack.compute.v2.server.Server.find',
self.proxy.find_server)
self.verify_get('openstack.compute.v2.server.Server.get',
self.proxy.get_server)
self.verify_list('openstack.compute.v2.server.Server.list',
self.proxy.list_servers)
self.verify_update('openstack.compute.v2.server.Server.update',
self.proxy.update_server)

View File

@ -202,3 +202,80 @@ class TestServer(testtools.TestCase):
url = 'servers/IDENTIFIER/action'
body = {"createImage": {'name': name}}
self.sess.put.assert_called_with(url, service=sot.service, json=body)
def test_wait_for_status_nothing(self):
self.sess.get = mock.MagicMock()
sot = server.Server(attrs={'id': IDENTIFIER, 'status': 'ACTIVE'})
self.assertEqual(True, sot.wait_for_status(self.sess, 'ACTIVE', 1, 2))
expected = []
self.assertEqual(expected, self.sess.get.call_args_list)
def test_wait_for_status(self):
resp1 = mock.Mock()
resp1.body = {'server': {'status': 'BUILDING'}}
resp2 = mock.Mock()
resp2.body = {'server': {'status': 'ACTIVE'}}
self.sess.get = mock.MagicMock()
self.sess.get.side_effect = [resp1, resp2]
sot = server.Server(attrs={'id': IDENTIFIER})
self.assertEqual(True, sot.wait_for_status(self.sess, 'ACTIVE', 1, 2))
url = 'servers/IDENTIFIER'
thecall = mock.call(url, service=sot.service)
expected = [thecall, thecall]
self.assertEqual(expected, self.sess.get.call_args_list)
def test_wait_for_status_timeout(self):
resp1 = mock.Mock()
resp1.body = {'server': {'status': 'BUILDING'}}
resp2 = mock.Mock()
resp2.body = {'server': {'status': 'BUILDING'}}
self.sess.get = mock.MagicMock()
self.sess.get.side_effect = [resp1, resp2]
sot = server.Server(attrs={'id': IDENTIFIER})
self.assertEqual(False, sot.wait_for_status(self.sess, 'ACTIVE', 1, 2))
url = 'servers/IDENTIFIER'
thecall = mock.call(url, service=sot.service)
expected = [thecall, thecall]
self.assertEqual(expected, self.sess.get.call_args_list)
def test_get_ips(self):
name = "jenkins"
fixed = {
"OS-EXT-IPS-MAC:mac_addr": "fa:16:3e:f9:58:b4",
"version": 4,
"addr": "10.3.3.8",
"OS-EXT-IPS:type": "fixed",
}
float1 = {
"OS-EXT-IPS-MAC:mac_addr": "fa:16:3e:f9:58:b4",
"version": 4,
"addr": "15.125.3.1",
"OS-EXT-IPS:type": "floating",
}
float2 = {
"OS-EXT-IPS-MAC:mac_addr": "fa:16:3e:f9:58:b4",
"version": 4,
"addr": "15.125.3.2",
"OS-EXT-IPS:type": "floating",
}
addresses = {name: [fixed]}
attrs = {'id': IDENTIFIER, 'name': name, 'addresses': addresses}
sot = server.Server(attrs=attrs)
self.assertEqual([], sot.get_floating_ips())
addresses = {name: [fixed, float1, float2]}
attrs = {'id': IDENTIFIER, 'name': name, 'addresses': addresses}
sot = server.Server(attrs=attrs)
self.assertEqual(["15.125.3.1", "15.125.3.2"], sot.get_floating_ips())
addresses = {name: [float1, fixed]}
attrs = {'id': IDENTIFIER, 'name': name, 'addresses': addresses}
sot = server.Server(attrs=attrs)
self.assertEqual(["15.125.3.1"], sot.get_floating_ips())

View File

@ -0,0 +1,90 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from openstack.network.v2 import _proxy
from openstack.tests import test_proxy_base
class TestNetworkProxy(test_proxy_base.TestProxyBase):
def setUp(self):
super(TestNetworkProxy, self).setUp()
self.proxy = _proxy.Proxy(self.session)
def test_ip(self):
self.verify_create('openstack.network.v2.floatingip.FloatingIP.create',
self.proxy.create_ip)
self.verify_delete('openstack.network.v2.floatingip.FloatingIP.delete',
self.proxy.delete_ip)
self.verify_find('openstack.network.v2.floatingip.FloatingIP.find',
self.proxy.find_ip)
self.verify_get('openstack.network.v2.floatingip.FloatingIP.get',
self.proxy.get_ip)
self.verify_list('openstack.network.v2.floatingip.FloatingIP.list',
self.proxy.list_ips)
self.verify_update('openstack.network.v2.floatingip.FloatingIP.update',
self.proxy.update_ip)
def test_network(self):
self.verify_create('openstack.network.v2.network.Network.create',
self.proxy.create_network)
self.verify_delete('openstack.network.v2.network.Network.delete',
self.proxy.delete_network)
self.verify_find('openstack.network.v2.network.Network.find',
self.proxy.find_network)
self.verify_get('openstack.network.v2.network.Network.get',
self.proxy.get_network)
self.verify_list('openstack.network.v2.network.Network.list',
self.proxy.list_networks)
self.verify_update('openstack.network.v2.network.Network.update',
self.proxy.update_network)
def test_port(self):
self.verify_create('openstack.network.v2.port.Port.create',
self.proxy.create_port)
self.verify_delete('openstack.network.v2.port.Port.delete',
self.proxy.delete_port)
self.verify_find('openstack.network.v2.port.Port.find',
self.proxy.find_port)
self.verify_get('openstack.network.v2.port.Port.get',
self.proxy.get_port)
self.verify_list('openstack.network.v2.port.Port.list',
self.proxy.list_ports)
self.verify_update('openstack.network.v2.port.Port.update',
self.proxy.update_port)
def test_router(self):
self.verify_create('openstack.network.v2.router.Router.create',
self.proxy.create_router)
self.verify_delete('openstack.network.v2.router.Router.delete',
self.proxy.delete_router)
self.verify_find('openstack.network.v2.router.Router.find',
self.proxy.find_router)
self.verify_get('openstack.network.v2.router.Router.get',
self.proxy.get_router)
self.verify_list('openstack.network.v2.router.Router.list',
self.proxy.list_routers)
self.verify_update('openstack.network.v2.router.Router.update',
self.proxy.update_router)
def test_subnet(self):
self.verify_create('openstack.network.v2.subnet.Subnet.create',
self.proxy.create_subnet)
self.verify_delete('openstack.network.v2.subnet.Subnet.delete',
self.proxy.delete_subnet)
self.verify_find('openstack.network.v2.subnet.Subnet.find',
self.proxy.find_subnet)
self.verify_get('openstack.network.v2.subnet.Subnet.get',
self.proxy.get_subnet)
self.verify_list('openstack.network.v2.subnet.Subnet.list',
self.proxy.list_subnets)
self.verify_update('openstack.network.v2.subnet.Subnet.update',
self.proxy.update_subnet)

View File

@ -0,0 +1,34 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from openstack.image.v1 import _proxy
from openstack.tests import test_proxy_base
class TestImageProxy(test_proxy_base.TestProxyBase):
def setUp(self):
super(TestImageProxy, self).setUp()
self.proxy = _proxy.Proxy(self.session)
def test_image(self):
self.verify_create('openstack.image.v1.image.Image.create',
self.proxy.create_image)
self.verify_delete('openstack.image.v1.image.Image.delete',
self.proxy.delete_image)
self.verify_find('openstack.image.v1.image.Image.find',
self.proxy.find_image)
self.verify_get('openstack.image.v1.image.Image.get',
self.proxy.get_image)
self.verify_list('openstack.image.v1.image.Image.list',
self.proxy.list_images)
self.verify_update('openstack.image.v1.image.Image.update',
self.proxy.update_image)

View File

@ -16,11 +16,12 @@ import testtools
from openstack import exceptions
from openstack.network.v2 import floatingip
IDENTIFIER = '10.0.0.1'
IDENTIFIER = 'IDENTIFIER'
EXAMPLE = {
'fixed_ip_address': '1',
'floating_ip_address': IDENTIFIER,
'floating_ip_address': '10.0.0.1',
'floating_network_id': '3',
'id': IDENTIFIER,
'port_id': '5',
'tenant_id': '6',
'router_id': '7',
@ -48,7 +49,7 @@ class TestFloatingIP(testtools.TestCase):
sot.floating_ip_address)
self.assertEqual(EXAMPLE['floating_network_id'],
sot.floating_network_id)
self.assertEqual(EXAMPLE['floating_ip_address'], sot.id)
self.assertEqual(EXAMPLE['id'], sot.id)
self.assertEqual(EXAMPLE['port_id'], sot.port_id)
self.assertEqual(EXAMPLE['tenant_id'], sot.project_id)
self.assertEqual(EXAMPLE['router_id'], sot.router_id)
@ -57,15 +58,15 @@ class TestFloatingIP(testtools.TestCase):
mock_session = mock.Mock()
mock_get = mock.Mock()
mock_session.get = mock_get
data = {'floating_ip_address': '10.0.0.1'}
data = {'floating_ip_address': '10.0.0.1', 'id': IDENTIFIER}
fake_response = mock.Mock()
fake_response.body = {floatingip.FloatingIP.resources_key: [data]}
mock_get.return_value = fake_response
result = floatingip.FloatingIP.find_available(mock_session)
self.assertEqual('10.0.0.1', result.id)
p = {'fields': 'floating_ip_address', 'port_id': ''}
self.assertEqual(IDENTIFIER, result.id)
p = {'fields': 'id', 'port_id': ''}
mock_get.assert_called_with(floatingip.FloatingIP.base_path,
params=p,
service=floatingip.FloatingIP.service)

View File

@ -0,0 +1,136 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from openstack.network.v2 import _proxy
from openstack.tests import test_proxy_base
class TestNetworkProxy(test_proxy_base.TestProxyBase):
def setUp(self):
super(TestNetworkProxy, self).setUp()
self.proxy = _proxy.Proxy(self.session)
def test_ip(self):
self.verify_create('openstack.network.v2.floatingip.FloatingIP.create',
self.proxy.create_ip)
self.verify_delete('openstack.network.v2.floatingip.FloatingIP.delete',
self.proxy.delete_ip)
self.verify_find('openstack.network.v2.floatingip.FloatingIP.find',
self.proxy.find_ip)
self.verify_get('openstack.network.v2.floatingip.FloatingIP.get',
self.proxy.get_ip)
self.verify_list('openstack.network.v2.floatingip.FloatingIP.list',
self.proxy.list_ips)
self.verify_update('openstack.network.v2.floatingip.FloatingIP.update',
self.proxy.update_ip)
def test_network(self):
self.verify_create('openstack.network.v2.network.Network.create',
self.proxy.create_network)
self.verify_delete('openstack.network.v2.network.Network.delete',
self.proxy.delete_network)
self.verify_find('openstack.network.v2.network.Network.find',
self.proxy.find_network)
self.verify_get('openstack.network.v2.network.Network.get',
self.proxy.get_network)
self.verify_list('openstack.network.v2.network.Network.list',
self.proxy.list_networks)
self.verify_update('openstack.network.v2.network.Network.update',
self.proxy.update_network)
def test_port(self):
self.verify_create('openstack.network.v2.port.Port.create',
self.proxy.create_port)
self.verify_delete('openstack.network.v2.port.Port.delete',
self.proxy.delete_port)
self.verify_find('openstack.network.v2.port.Port.find',
self.proxy.find_port)
self.verify_get('openstack.network.v2.port.Port.get',
self.proxy.get_port)
self.verify_list('openstack.network.v2.port.Port.list',
self.proxy.list_ports)
self.verify_update('openstack.network.v2.port.Port.update',
self.proxy.update_port)
def test_router(self):
self.verify_create('openstack.network.v2.router.Router.create',
self.proxy.create_router)
self.verify_delete('openstack.network.v2.router.Router.delete',
self.proxy.delete_router)
self.verify_find('openstack.network.v2.router.Router.find',
self.proxy.find_router)
self.verify_get('openstack.network.v2.router.Router.get',
self.proxy.get_router)
self.verify_list('openstack.network.v2.router.Router.list',
self.proxy.list_routers)
self.verify_update('openstack.network.v2.router.Router.update',
self.proxy.update_router)
def test_security_group(self):
self.verify_create(
'openstack.network.v2.security_group.SecurityGroup.create',
self.proxy.create_security_group)
self.verify_delete(
'openstack.network.v2.security_group.SecurityGroup.delete',
self.proxy.delete_security_group)
self.verify_find(
'openstack.network.v2.security_group.SecurityGroup.find',
self.proxy.find_security_group)
self.verify_get(
'openstack.network.v2.security_group.SecurityGroup.get',
self.proxy.get_security_group)
self.verify_list(
'openstack.network.v2.security_group.SecurityGroup.list',
self.proxy.list_security_groups)
self.verify_update(
'openstack.network.v2.security_group.SecurityGroup.update',
self.proxy.update_security_group)
def test_security_group_rule(self):
self.verify_create(
('openstack.network.v2.' +
'security_group_rule.SecurityGroupRule.create'),
self.proxy.create_security_group_rule)
self.verify_delete(
('openstack.network.v2.' +
'security_group_rule.SecurityGroupRule.delete'),
self.proxy.delete_security_group_rule)
self.verify_find(
('openstack.network.v2.' +
'security_group_rule.SecurityGroupRule.find'),
self.proxy.find_security_group_rule)
self.verify_get(
('openstack.network.v2.' +
'security_group_rule.SecurityGroupRule.get'),
self.proxy.get_security_group_rule)
self.verify_list(
('openstack.network.v2.' +
'security_group_rule.SecurityGroupRule.list'),
self.proxy.list_security_group_rules)
self.verify_update(
('openstack.network.v2.' +
'security_group_rule.SecurityGroupRule.update'),
self.proxy.update_security_group_rule)
def test_subnet(self):
self.verify_create('openstack.network.v2.subnet.Subnet.create',
self.proxy.create_subnet)
self.verify_delete('openstack.network.v2.subnet.Subnet.delete',
self.proxy.delete_subnet)
self.verify_find('openstack.network.v2.subnet.Subnet.find',
self.proxy.find_subnet)
self.verify_get('openstack.network.v2.subnet.Subnet.get',
self.proxy.get_subnet)
self.verify_list('openstack.network.v2.subnet.Subnet.list',
self.proxy.list_subnets)
self.verify_update('openstack.network.v2.subnet.Subnet.update',
self.proxy.update_subnet)

View File

@ -10,9 +10,13 @@
# License for the specific language governing permissions and limitations
# under the License.
import mock
from openstack.auth.identity import v2
from openstack.auth import service_filter
from openstack import connection
from openstack import exceptions
from openstack import resource
from openstack.tests import base
from openstack import transport
from openstack import user_preference
@ -24,6 +28,9 @@ class TestConnection(base.TestCase):
self.xport = transport.Transport()
self.auth = v2.Auth(auth_url='http://127.0.0.1/v2', token='b')
self.pref = user_preference.UserPreference()
self.conn = connection.Connection(authenticator=mock.MagicMock(),
transport=mock.MagicMock())
self.conn.session = mock.MagicMock()
def test_create_transport(self):
conn = connection.Connection(authenticator='2', verify=True,
@ -107,3 +114,82 @@ class TestConnection(base.TestCase):
conn.orchestration.__class__.__module__)
self.assertEqual('openstack.telemetry.v2._proxy',
conn.telemetry.__class__.__module__)
class TestService(service_filter.ServiceFilter):
valid_versions = [service_filter.ValidVersion('v2')]
def __init__(self):
super(TestService, self).__init__(service_type='test')
class TestResource(resource.Resource):
resource_key = "testable"
resources_key = "testables"
base_path = "/testables"
service = TestService()
allow_create = True
allow_retrieve = True
allow_update = True
allow_delete = True
allow_list = True
allow_head = True
name = resource.prop('name')
class TestConnectionObjectMethods(base.TestCase):
def setUp(self):
super(TestConnectionObjectMethods, self).setUp()
self.conn = connection.Connection(authenticator=mock.MagicMock(),
transport=mock.MagicMock())
self.conn.session = mock.MagicMock()
self.args = {'name': 'fee', 'id': 'fie'}
self.body = {'testable': self.args}
self.response = mock.Mock
self.response.body = self.body
def test_obj_create(self):
test = TestResource.existing(**self.args)
self.conn.session.put = mock.MagicMock()
self.conn.session.put.and_return = self.response
self.assertEqual(test, self.conn.create(test))
url = 'testables/fie'
self.conn.session.put.assert_called_with(url, json=self.body,
service=test.service)
def test_obj_get(self):
test = TestResource.existing(**self.args)
self.conn.session.get = mock.MagicMock()
self.conn.session.get.and_return = self.response
self.assertEqual(test, self.conn.get(test))
url = 'testables/fie'
self.conn.session.get.assert_called_with(url, service=test.service)
def test_obj_head(self):
test = TestResource.existing(**self.args)
self.conn.session.head = mock.MagicMock()
self.conn.session.head.and_return = self.response
self.assertEqual(test, self.conn.head(test))
url = 'testables/fie'
self.conn.session.head.assert_called_with(url, service=test.service,
accept=None)
def test_obj_update(self):
test = TestResource.existing(**self.args)
test['name'] = 'newname'
self.body = {'testable': {'name': 'newname'}}
self.conn.session.patch = mock.MagicMock()
self.conn.session.patch.and_return = self.response
self.assertEqual(test, self.conn.update(test))
url = 'testables/fie'
self.conn.session.patch.assert_called_with(url, json=self.body,
service=test.service)
def test_obj_delete(self):
test = TestResource.existing(**self.args)
self.conn.session.delete = mock.MagicMock()
self.conn.session.delete.and_return = self.response
self.assertEqual(None, self.conn.delete(test))
url = 'testables/fie'
self.conn.session.delete.assert_called_with(url, service=test.service,
accept=None)

View File

@ -0,0 +1,63 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from openstack.tests import base
class TestProxyBase(base.TestCase):
def setUp(self):
super(TestProxyBase, self).setUp()
self.session = mock.MagicMock()
def verify_create(self, mock_method, test_method):
with mock.patch(mock_method) as mockList:
expected = 'result'
mockList.return_value = expected
self.assertEqual(expected, test_method())
mockList.assert_called_with(self.session)
def verify_delete(self, mock_method, test_method):
with mock.patch(mock_method) as mockFind:
mockFind.return_value = None
self.assertEqual(None, test_method())
mockFind.assert_called_with(self.session)
def verify_get(self, mock_method, test_method):
with mock.patch(mock_method) as mockList:
expected = 'result'
mockList.return_value = expected
self.assertEqual(expected, test_method())
mockList.assert_called_with(self.session)
def verify_find(self, mock_method, test_method):
with mock.patch(mock_method) as mockFind:
expected = 'result'
name_or_id = 'name_or_id'
mockFind.return_value = expected
self.assertEqual(expected, test_method(name_or_id))
mockFind.assert_called_with(self.session, name_or_id)
def verify_list(self, mock_method, test_method):
with mock.patch(mock_method) as mockList:
expected = ['result']
mockList.return_value = expected
self.assertEqual(expected, test_method())
mockList.assert_called_with(self.session)
def verify_update(self, mock_method, test_method):
with mock.patch(mock_method) as mockList:
expected = 'result'
mockList.return_value = expected
self.assertEqual(expected, test_method())
mockList.assert_called_with(self.session)